You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/11/06 01:35:16 UTC
[1/2] TEZ-12. Support for counters. (hitesh)
Updated Branches:
refs/heads/master 83a657bb5 -> 6fddbd01b
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index fa2533b..65df726 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -20,9 +20,12 @@ package org.apache.tez.mapreduce.examples;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.StringTokenizer;
import org.apache.commons.cli.ParseException;
@@ -65,6 +68,7 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
@@ -202,7 +206,7 @@ public class OrderedWordCount {
List<Vertex> vertices = new ArrayList<Vertex>();
byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
- byte[] mapInputPayload =
+ byte[] mapInputPayload =
MRHelpers.createMRInputPayload(mapPayload, null);
int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
@@ -229,7 +233,7 @@ public class OrderedWordCount {
MRHelpers.addMRInput(mapVertex, mapInputPayload, initializerClazz);
vertices.add(mapVertex);
- Vertex ivertex = new Vertex("ivertex1", new ProcessorDescriptor(
+ Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
ReduceProcessor.class.getName()).
setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf)),
2,
@@ -277,14 +281,14 @@ public class OrderedWordCount {
System.err.println("Usage (In Session Mode):"
+ " orderedwordcount <in1> <out1> ... <inN> <outN> [-generateSplitsInClient true/<false>]");
}
-
-
+
+
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
+
boolean generateSplitsInClient = false;
-
+
SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
try {
generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
@@ -349,6 +353,11 @@ public class OrderedWordCount {
}
DAGStatus dagStatus = null;
+ DAGClient dagClient = null;
+ String[] vNames = { "initialmap", "intermediate_reducer",
+ "finalreduce" };
+
+ Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
try {
for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
if (dagIndex != 1
@@ -378,7 +387,6 @@ public class OrderedWordCount {
DAG dag = createDAG(fs, conf, null, stagingDir,
dagIndex, inputPath, outputPath, generateSplitsInClient);
- DAGClient dagClient;
if (useTezSession) {
LOG.info("Waiting for TezSession to get into ready state");
waitForTezSessionReady(tezSession);
@@ -391,7 +399,7 @@ public class OrderedWordCount {
}
while (true) {
- dagStatus = dagClient.getDAGStatus();
+ dagStatus = dagClient.getDAGStatus(statusGetOpts);
if(dagStatus.getState() == DAGStatus.State.RUNNING ||
dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
dagStatus.getState() == DAGStatus.State.FAILED ||
@@ -406,21 +414,23 @@ public class OrderedWordCount {
}
}
+
while (dagStatus.getState() == DAGStatus.State.RUNNING) {
try {
- ExampleDriver.printMRRDAGStatus(dagStatus);
+ ExampleDriver.printDAGStatus(dagClient, vNames);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue;
}
- dagStatus = dagClient.getDAGStatus();
+ dagStatus = dagClient.getDAGStatus(statusGetOpts);
} catch (TezException e) {
LOG.fatal("Failed to get application progress. Exiting");
System.exit(-1);
}
}
- ExampleDriver.printMRRDAGStatus(dagStatus);
+ ExampleDriver.printDAGStatus(dagClient, vNames,
+ true, true);
LOG.info("DAG " + dagIndex + " completed. "
+ "FinalState=" + dagStatus.getState());
}
@@ -432,7 +442,7 @@ public class OrderedWordCount {
}
if (!useTezSession) {
- ExampleDriver.printMRRDAGStatus(dagStatus);
+ ExampleDriver.printDAGStatus(dagClient, vNames);
LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index d5b61b1..e010ec3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -634,7 +634,7 @@ public class YARNRunner implements ClientProtocol {
if(dagClient == null) {
dagClient = tezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId());
}
- dagStatus = dagClient.getDAGStatus();
+ dagStatus = dagClient.getDAGStatus(null);
return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
} catch (TezException e) {
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index c2b6451..c444cec 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -496,7 +496,7 @@ public class TestMRRJobsDAGApi {
Assert.assertEquals(TezSessionStatus.RUNNING,
tezSession.getSessionStatus());
}
- DAGStatus dagStatus = dagClient.getDAGStatus();
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
LOG.info("Waiting for job to complete. Sleeping for 500ms."
+ " Current state: " + dagStatus.getState());
@@ -510,7 +510,7 @@ public class TestMRRJobsDAGApi {
dagClient.tryKillDAG();
}
}
- dagStatus = dagClient.getDAGStatus();
+ dagStatus = dagClient.getDAGStatus(null);
}
if (dagViaRPC && !reuseSession) {
tezSession.stop();
[2/2] git commit: TEZ-12. Support for counters. (hitesh)
Posted by hi...@apache.org.
TEZ-12. Support for counters. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6fddbd01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6fddbd01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6fddbd01
Branch: refs/heads/master
Commit: 6fddbd01b5050d2bd58b4edbd0464798e1b10a1f
Parents: 83a657b
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Nov 5 16:34:57 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Nov 5 16:34:57 2013 -0800
----------------------------------------------------------------------
.../apache/tez/dag/api/DagTypeConverters.java | 100 ++++++++++++++++-
.../apache/tez/dag/api/client/DAGClient.java | 15 ++-
.../apache/tez/dag/api/client/DAGStatus.java | 27 ++++-
.../tez/dag/api/client/StatusGetOpts.java | 28 +++++
.../apache/tez/dag/api/client/VertexStatus.java | 43 ++++++--
.../dag/api/client/rpc/DAGClientRPCImpl.java | 57 +++++++---
tez-api/src/main/proto/DAGApiRecords.proto | 42 +++++--
.../src/main/proto/DAGClientAMProtocol.proto | 2 +
.../tez/dag/api/client/DAGStatusBuilder.java | 21 ++--
.../tez/dag/api/client/ProgressBuilder.java | 15 ++-
.../tez/dag/api/client/VertexStatusBuilder.java | 20 +++-
...DAGClientAMProtocolBlockingPBServerImpl.java | 8 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 26 +++--
.../java/org/apache/tez/dag/app/dag/DAG.java | 7 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 11 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 20 ++--
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 80 +++++++-------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 109 ++++++++++---------
.../tez/mapreduce/examples/ExampleDriver.java | 41 ++++++-
.../mapreduce/examples/FilterLinesByWord.java | 27 ++---
.../examples/GroupByOrderByMRRTest.java | 11 +-
.../tez/mapreduce/examples/MRRSleepJob.java | 2 +-
.../mapreduce/examples/OrderedWordCount.java | 34 ++++--
.../apache/tez/mapreduce/client/YARNRunner.java | 2 +-
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 4 +-
25 files changed, 529 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 803c943..098a201 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -20,8 +20,11 @@ package org.apache.tez.dag.api;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -32,11 +35,16 @@ import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezSessionStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
@@ -50,6 +58,9 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import com.google.protobuf.ByteString;
@@ -166,9 +177,8 @@ public class DagTypeConverters {
public static String convertToDAGPlan(URL resource) {
// see above notes on HDFS URL handling
- String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort()
- + resource.getFile();
- return out;
+ return resource.getScheme() + "://" + resource.getHost()
+ + ":" + resource.getPort() + resource.getFile();
}
public static Map<String, LocalResource> createLocalResourceMapFromDAGPlan(
@@ -380,4 +390,88 @@ public class DagTypeConverters {
plr.hasPattern() ? plr.getPattern() : null);
}
+ public static TezCounters convertTezCountersFromProto(TezCountersProto proto) {
+ TezCounters counters = new TezCounters();
+ for (TezCounterGroupProto counterGroupProto : proto.getCounterGroupsList()) {
+ CounterGroup group = counters.addGroup(counterGroupProto.getName(),
+ counterGroupProto.getDisplayName());
+ for (TezCounterProto counterProto :
+ counterGroupProto.getCountersList()) {
+ TezCounter counter = group.findCounter(
+ counterProto.getName(),
+ counterProto.getDisplayName());
+ counter.setValue(counterProto.getValue());
+ }
+ }
+ return counters;
+ }
+
+ public static TezCountersProto convertTezCountersToProto(
+ TezCounters counters) {
+ TezCountersProto.Builder builder = TezCountersProto.newBuilder();
+ Iterator<CounterGroup> groupIterator = counters.iterator();
+ int groupIndex = 0;
+ while (groupIterator.hasNext()) {
+ CounterGroup counterGroup = groupIterator.next();
+ TezCounterGroupProto.Builder groupBuilder =
+ TezCounterGroupProto.newBuilder();
+ groupBuilder.setName(counterGroup.getName());
+ groupBuilder.setDisplayName(counterGroup.getDisplayName());
+ Iterator<TezCounter> counterIterator = counterGroup.iterator();
+ int counterIndex = 0;
+ while (counterIterator.hasNext()) {
+ TezCounter counter = counterIterator.next();
+ TezCounterProto tezCounterProto = TezCounterProto.newBuilder()
+ .setName(counter.getName())
+ .setDisplayName(counter.getDisplayName())
+ .setValue(counter.getValue())
+ .build();
+ groupBuilder.addCounters(counterIndex, tezCounterProto);
+ ++counterIndex;
+ }
+ builder.addCounterGroups(groupIndex, groupBuilder.build());
+ ++groupIndex;
+ }
+ return builder.build();
+ }
+
+ public static DAGProtos.StatusGetOptsProto convertStatusGetOptsToProto(
+ StatusGetOpts statusGetOpts) {
+ switch (statusGetOpts) {
+ case GET_COUNTERS:
+ return DAGProtos.StatusGetOptsProto.GET_COUNTERS;
+ }
+ throw new TezUncheckedException("Could not convert StatusGetOpts to"
+ + " proto");
+ }
+
+ public static StatusGetOpts convertStatusGetOptsFromProto(
+ DAGProtos.StatusGetOptsProto proto) {
+ switch (proto) {
+ case GET_COUNTERS:
+ return StatusGetOpts.GET_COUNTERS;
+ }
+ throw new TezUncheckedException("Could not convert to StatusGetOpts from"
+ + " proto");
+ }
+
+ public static List<DAGProtos.StatusGetOptsProto> convertStatusGetOptsToProto(
+ Set<StatusGetOpts> statusGetOpts) {
+ List<DAGProtos.StatusGetOptsProto> protos =
+ new ArrayList<DAGProtos.StatusGetOptsProto>(statusGetOpts.size());
+ for (StatusGetOpts opt : statusGetOpts) {
+ protos.add(convertStatusGetOptsToProto(opt));
+ }
+ return protos;
+ }
+
+ public static Set<StatusGetOpts> convertStatusGetOptsFromProto(
+ List<DAGProtos.StatusGetOptsProto> protoList) {
+ Set<StatusGetOpts> opts = new TreeSet<StatusGetOpts>();
+ for (DAGProtos.StatusGetOptsProto proto : protoList) {
+ opts.add(convertStatusGetOptsFromProto(proto));
+ }
+ return opts;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 9062e8e..bbb225c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.api.client;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -49,19 +50,25 @@ public interface DAGClient extends Closeable {
/**
* Get the status of the specified DAG
+ * @param statusOptions Optionally, retrieve additional information based on
+ * specified options
*/
- public DAGStatus getDAGStatus() throws IOException, TezException;
+ public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException;
/**
* Get the status of a Vertex of a DAG
+ * @param statusOptions Optionally, retrieve additional information based on
+ * specified options
*/
- public VertexStatus getVertexStatus(String vertexName)
- throws IOException, TezException;
+ public VertexStatus getVertexStatus(String vertexName,
+ Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException;
/**
* Kill a running DAG
*
*/
- public void tryKillDAG() throws TezException, IOException;
+ public void tryKillDAG() throws IOException, TezException;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index 8b7277f..b2a4d21 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -21,8 +21,11 @@ package org.apache.tez.dag.api.client;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProtoOrBuilder;
import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -40,11 +43,13 @@ public class DAGStatus {
KILLED,
FAILED,
ERROR,
- };
+ }
DAGStatusProtoOrBuilder proxy = null;
Progress progress = null;
Map<String, Progress> vertexProgress = null;
+ TezCounters dagCounters = null;
+ AtomicBoolean countersInitialized = new AtomicBoolean(false);
public DAGStatus(DAGStatusProtoOrBuilder proxy) {
this.proxy = proxy;
@@ -123,13 +128,27 @@ public class DAGStatus {
return vertexProgress;
}
+ public TezCounters getDAGCounters() {
+ if (countersInitialized.get()) {
+ return dagCounters;
+ }
+ if (proxy.hasDagCounters()) {
+ dagCounters = DagTypeConverters.convertTezCountersFromProto(
+ proxy.getDagCounters());
+ }
+ countersInitialized.set(true);
+ return dagCounters;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("status=" + getState()
- + ", progress=" + getDAGProgress()
- + ", diagnostics="
- + StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
+ + ", progress=" + getDAGProgress()
+ + ", diagnostics="
+ + StringUtils.join(LINE_SEPARATOR, getDiagnostics())
+ + ", counters="
+ + (dagCounters == null ? "null" : dagCounters.toString()));
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java
new file mode 100644
index 0000000..922ab24
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client;
+
+/**
+ * Status Get Options used when making calls like getDAGStatus and
+ * getVertexStatus in DAGClient
+ */
+public enum StatusGetOpts {
+ /** Retrieve Counters with Status */
+ GET_COUNTERS
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
index ce5dbe0..5ea190f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
@@ -19,12 +19,15 @@
package org.apache.tez.dag.api.client;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProtoOrBuilder;
import org.apache.tez.dag.api.TezUncheckedException;
public class VertexStatus {
-
+
public enum State {
INITED,
RUNNING,
@@ -33,11 +36,13 @@ public class VertexStatus {
FAILED,
ERROR,
TERMINATING,
- };
-
+ }
+
VertexStatusProtoOrBuilder proxy = null;
Progress progress = null;
-
+ TezCounters vertexCounters = null;
+ private AtomicBoolean countersInitialized = new AtomicBoolean(false);
+
public VertexStatus(VertexStatusProtoOrBuilder proxy) {
this.proxy = proxy;
}
@@ -59,9 +64,9 @@ public class VertexStatus {
case VERTEX_TERMINATING:
return VertexStatus.State.TERMINATING;
default:
- throw new TezUncheckedException("Unsupported value for VertexStatus.State : " +
- proxy.getState());
- }
+ throw new TezUncheckedException(
+ "Unsupported value for VertexStatus.State : " + proxy.getState());
+ }
}
public List<String> getDiagnostics() {
@@ -72,7 +77,29 @@ public class VertexStatus {
if(progress == null && proxy.hasProgress()) {
progress = new Progress(proxy.getProgress());
}
- return progress;
+ return progress;
+ }
+
+ public TezCounters getVertexCounters() {
+ if (countersInitialized.get()) {
+ return vertexCounters;
+ }
+ if (proxy.hasVertexCounters()) {
+ vertexCounters = DagTypeConverters.convertTezCountersFromProto(
+ proxy.getVertexCounters());
+ }
+ countersInitialized.set(true);
+ return vertexCounters;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("status=" + getState()
+ + ", progress=" + getProgress()
+ + ", counters="
+ + (vertexCounters == null ? "null" : vertexCounters.toString()));
+ return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index dae5625..06cebca 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.api.client.rpc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,11 +34,13 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
@@ -74,10 +77,11 @@ public class DAGClientRPCImpl implements DAGClient {
}
@Override
- public DAGStatus getDAGStatus() throws IOException, TezException {
+ public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException {
if(createAMProxyIfNeeded()) {
try {
- return getDAGStatusViaAM();
+ return getDAGStatusViaAM(statusOptions);
} catch (TezException e) {
resetProxy(e); // create proxy again
}
@@ -88,11 +92,12 @@ public class DAGClientRPCImpl implements DAGClient {
}
@Override
- public VertexStatus getVertexStatus(String vertexName)
- throws IOException, TezException {
+ public VertexStatus getVertexStatus(String vertexName,
+ Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException {
if(createAMProxyIfNeeded()) {
try {
- return getVertexStatusViaAM(vertexName);
+ return getVertexStatusViaAM(vertexName, statusOptions);
} catch (TezException e) {
resetProxy(e); // create proxy again
}
@@ -102,6 +107,8 @@ public class DAGClientRPCImpl implements DAGClient {
return null;
}
+
+
@Override
public void tryKillDAG() throws TezException, IOException {
if(LOG.isDebugEnabled()) {
@@ -141,23 +148,30 @@ public class DAGClientRPCImpl implements DAGClient {
proxy = null;
}
- DAGStatus getDAGStatusViaAM() throws IOException, TezException {
+ DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException {
if(LOG.isDebugEnabled()) {
LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
}
- GetDAGStatusRequestProto requestProto =
- GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
+ GetDAGStatusRequestProto.Builder requestProtoBuilder =
+ GetDAGStatusRequestProto.newBuilder()
+ .setDagId(dagId);
+
+ if (statusOptions != null) {
+ requestProtoBuilder.addAllStatusOptions(
+ DagTypeConverters.convertStatusGetOptsToProto(statusOptions));
+ }
+
try {
return new DAGStatus(
- proxy.getDAGStatus(null, requestProto).getDagStatus());
+ proxy.getDAGStatus(null,
+ requestProtoBuilder.build()).getDagStatus());
} catch (ServiceException e) {
// TEZ-151 retrieve wrapped TezException
throw new TezException(e);
}
}
-
-
DAGStatus getDAGStatusViaRM() throws TezException, IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
@@ -175,7 +189,7 @@ public class DAGClientRPCImpl implements DAGClient {
DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
DAGStatus dagStatus = new DAGStatus(builder);
- DAGStatusStateProto dagState = null;
+ DAGStatusStateProto dagState;
switch (appReport.getYarnApplicationState()) {
case NEW:
case NEW_SAVING:
@@ -224,18 +238,27 @@ public class DAGClientRPCImpl implements DAGClient {
return dagStatus;
}
- VertexStatus getVertexStatusViaAM(String vertexName) throws TezException {
+ VertexStatus getVertexStatusViaAM(String vertexName,
+ Set<StatusGetOpts> statusOptions)
+ throws TezException {
if (LOG.isDebugEnabled()) {
LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
+ " vertex: " + vertexName);
}
- GetVertexStatusRequestProto requestProto =
- GetVertexStatusRequestProto.newBuilder().
- setDagId(dagId).setVertexName(vertexName).build();
+ GetVertexStatusRequestProto.Builder requestProtoBuilder =
+ GetVertexStatusRequestProto.newBuilder()
+ .setDagId(dagId)
+ .setVertexName(vertexName);
+
+ if (statusOptions != null) {
+ requestProtoBuilder.addAllStatusOptions(
+ DagTypeConverters.convertStatusGetOptsToProto(statusOptions));
+ }
try {
return new VertexStatus(
- proxy.getVertexStatus(null, requestProto).getVertexStatus());
+ proxy.getVertexStatus(null,
+ requestProtoBuilder.build()).getVertexStatus());
} catch (ServiceException e) {
// TEZ-151 retrieve wrapped TezException
throw new TezException(e);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index b948e60..9ce51a1 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -22,12 +22,12 @@ option java_generate_equals_and_hash = true;
// DAG plan messages
-// Many of these types have a dual in the Tez-api. To reduce confusion, these types have prefix or suffix
+// Many of these types have a dual in the Tez-api. To reduce confusion, these types have prefix or suffix
// of "Plan" to indicate they are to be used in the dag-plan.
-// The big types use a suffix: JobPlan, VertexPlan, EdgePlan
+// The big types use a suffix: JobPlan, VertexPlan, EdgePlan
// --> these get more direct use in the runtime and the naming is natural.
// The enums and utility types use prefix: PlanVertexType, PlanEdgeConnectionPaatern, etc
-// --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable.
+// --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable.
enum PlanVertexType {
INPUT = 0;
@@ -94,7 +94,7 @@ message PlanTaskConfiguration {
required string javaOpts = 4;
required string taskModule = 5;
repeated PlanLocalResource localResource = 6;
- repeated PlanKeyValuePair environmentSetting = 8;
+ repeated PlanKeyValuePair environmentSetting = 7;
}
message TezEntityDescriptorProto {
@@ -113,11 +113,11 @@ message VertexPlan {
required PlanVertexType type = 2;
optional TezEntityDescriptorProto processor_descriptor = 3;
required PlanTaskConfiguration taskConfig = 4;
- repeated PlanTaskLocationHint taskLocationHint = 7;
- repeated string inEdgeId = 8;
- repeated string outEdgeId = 9;
- repeated RootInputLeafOutputProto inputs = 10;
- repeated RootInputLeafOutputProto outputs = 11;
+ repeated PlanTaskLocationHint taskLocationHint = 5;
+ repeated string inEdgeId = 6;
+ repeated string outEdgeId = 7;
+ repeated RootInputLeafOutputProto inputs = 8;
+ repeated RootInputLeafOutputProto outputs = 9;
}
message EdgePlan {
@@ -165,6 +165,7 @@ message VertexStatusProto {
optional VertexStatusStateProto state = 1;
repeated string diagnostics = 2;
optional ProgressProto progress = 3;
+ optional TezCountersProto vertexCounters = 4;
}
enum DAGStatusStateProto {
@@ -187,9 +188,30 @@ message DAGStatusProto {
optional DAGStatusStateProto state = 1;
repeated string diagnostics = 2;
optional ProgressProto DAGProgress = 3;
- repeated StringProgressPairProto vertexProgress = 4;
+ repeated StringProgressPairProto vertexProgress = 4;
+ optional TezCountersProto dagCounters = 5;
}
message PlanLocalResourcesProto {
repeated PlanLocalResource localResources = 1;
}
+
+message TezCounterProto {
+ optional string name = 1;
+ optional string display_name = 2;
+ optional int64 value = 3;
+}
+
+message TezCounterGroupProto {
+ optional string name = 1;
+ optional string display_name = 2;
+ repeated TezCounterProto counters = 3;
+}
+
+message TezCountersProto {
+ repeated TezCounterGroupProto counter_groups = 1;
+}
+
+enum StatusGetOptsProto {
+ GET_COUNTERS = 0;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index 1236190..0f29364 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -34,6 +34,7 @@ message GetAllDAGsResponseProto {
message GetDAGStatusRequestProto {
optional string dagId = 1;
+ repeated StatusGetOptsProto statusOptions = 3;
}
message GetDAGStatusResponseProto {
@@ -43,6 +44,7 @@ message GetDAGStatusResponseProto {
message GetVertexStatusRequestProto {
optional string dagId = 1;
optional string vertexName = 2;
+ repeated StatusGetOptsProto statusOptions = 3;
}
message GetVertexStatusResponseProto {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
index 2b0f543..62b1399 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.api.client;
import java.util.List;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -33,32 +35,37 @@ public class DAGStatusBuilder extends DAGStatus {
public DAGStatusBuilder() {
super(DAGStatusProto.newBuilder());
}
-
+
public void setState(DAGState state) {
getBuilder().setState(getProtoState(state));
}
-
+
public void setDiagnostics(List<String> diagnostics) {
Builder builder = getBuilder();
builder.clearDiagnostics();
builder.addAllDiagnostics(diagnostics);
}
-
+
public void setDAGProgress(ProgressBuilder progress) {
getBuilder().setDAGProgress(progress.getProto());
}
-
+
+ public void setDAGCounters(TezCounters counters) {
+ getBuilder().setDagCounters(
+ DagTypeConverters.convertTezCountersToProto(counters));
+ }
+
public void addVertexProgress(String name, ProgressBuilder progress) {
StringProgressPairProto.Builder builder = StringProgressPairProto.newBuilder();
builder.setKey(name);
builder.setProgress(progress.getProto());
getBuilder().addVertexProgress(builder.build());
}
-
+
public DAGStatusProto getProto() {
return getBuilder().build();
}
-
+
private DAGStatusStateProto getProtoState(DAGState state) {
switch(state) {
case NEW:
@@ -80,7 +87,7 @@ public class DAGStatusBuilder extends DAGStatus {
throw new TezUncheckedException("Unsupported value for DAGState : " + state);
}
}
-
+
private DAGStatusProto.Builder getBuilder() {
return (Builder) this.proxy;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java
index 6cedb3f..99fcfa0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java
@@ -20,38 +20,37 @@ package org.apache.tez.dag.api.client;
import org.apache.tez.dag.api.records.DAGProtos.ProgressProto;
import org.apache.tez.dag.api.records.DAGProtos.ProgressProto.Builder;
-import org.apache.tez.dag.api.client.Progress;
public class ProgressBuilder extends Progress {
public ProgressBuilder() {
super(ProgressProto.newBuilder());
}
-
+
public ProgressProto getProto() {
return getBuilder().build();
}
-
+
public void setTotalTaskCount(int count) {
getBuilder().setTotalTaskCount(count);
}
-
+
public void setSucceededTaskCount(int count) {
getBuilder().setSucceededTaskCount(count);
}
-
+
public void setRunningTaskCount(int count) {
getBuilder().setRunningTaskCount(count);
}
-
+
public void setFailedTaskCount(int count) {
getBuilder().setFailedTaskCount(count);
}
-
+
public void setKilledTaskCount(int count) {
getBuilder().setKilledTaskCount(count);
}
-
+
private ProgressProto.Builder getBuilder() {
return (Builder) this.proxy;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index 66de71f..47bbb2c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.api.client;
import java.util.List;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProto.Builder;
import org.apache.tez.dag.api.records.DAGProtos.VertexStatusStateProto;
@@ -32,28 +34,34 @@ public class VertexStatusBuilder extends VertexStatus {
public VertexStatusBuilder() {
super(VertexStatusProto.newBuilder());
}
-
+
public void setState(VertexState state) {
getBuilder().setState(getProtoState(state));
}
-
+
public void setDiagnostics(List<String> diagnostics) {
Builder builder = getBuilder();
builder.clearDiagnostics();
builder.addAllDiagnostics(diagnostics);
}
-
+
public void setProgress(ProgressBuilder progress) {
getBuilder().setProgress(progress.getProto());
}
-
+
+ public void setVertexCounters(TezCounters counters) {
+ getBuilder().setVertexCounters(
+ DagTypeConverters.convertTezCountersToProto(counters));
+ }
+
public VertexStatusProto getProto() {
return getBuilder().build();
}
-
+
private VertexStatusStateProto getProtoState(VertexState state) {
switch(state) {
case NEW:
+ case INITIALIZING:
case INITED:
return VertexStatusStateProto.VERTEX_INITED;
case RUNNING:
@@ -72,7 +80,7 @@ public class VertexStatusBuilder extends VertexStatus {
throw new TezUncheckedException("Unsupported value for VertexState : " + state);
}
}
-
+
private VertexStatusProto.Builder getBuilder() {
return (Builder) this.proxy;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index eb1ff48..959fbbc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -74,7 +74,9 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
try {
String dagId = request.getDagId();
DAGStatus status;
- status = real.getDAGStatus(dagId);
+ status = real.getDAGStatus(dagId,
+ DagTypeConverters.convertStatusGetOptsFromProto(
+ request.getStatusOptionsList()));
assert status instanceof DAGStatusBuilder;
DAGStatusBuilder builder = (DAGStatusBuilder) status;
return GetDAGStatusResponseProto.newBuilder().
@@ -90,7 +92,9 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
try {
String dagId = request.getDagId();
String vertexName = request.getVertexName();
- VertexStatus status = real.getVertexStatus(dagId, vertexName);
+ VertexStatus status = real.getVertexStatus(dagId, vertexName,
+ DagTypeConverters.convertStatusGetOptsFromProto(
+ request.getStatusOptionsList()));
assert status instanceof VertexStatusBuilder;
VertexStatusBuilder builder = (VertexStatusBuilder) status;
return GetVertexStatusResponseProto.newBuilder().
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 6d2ec19..10d05ff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -85,6 +86,7 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
@@ -146,7 +148,7 @@ import org.apache.tez.runtime.library.common.security.TokenCache;
@SuppressWarnings("rawtypes")
public class DAGAppMaster extends AbstractService {
-
+
private static final Log LOG = LogFactory.getLog(DAGAppMaster.class);
/**
@@ -474,7 +476,7 @@ public class DAGAppMaster extends AbstractService {
protected DAG createDAG(DAGPlan dagPB) {
TezDAGID dagId = new TezDAGID(appAttemptID.getApplicationId(),
dagCounter.incrementAndGet());
-
+
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
String dagIdString = dagId.toString();
@@ -500,7 +502,7 @@ public class DAGAppMaster extends AbstractService {
return newDag;
} // end createDag()
-
+
protected void addIfService(Object object, boolean addDispatcher) {
if (object instanceof Service) {
Service service = (Service) object;
@@ -712,13 +714,17 @@ public class DAGAppMaster extends AbstractService {
return Collections.singletonList(currentDAG.getID().toString());
}
- public DAGStatus getDAGStatus(String dagIdStr) throws TezException {
- return getDAG(dagIdStr).getDAGStatus();
+ public DAGStatus getDAGStatus(String dagIdStr,
+ Set<StatusGetOpts> statusOptions)
+ throws TezException {
+ return getDAG(dagIdStr).getDAGStatus(statusOptions);
}
- public VertexStatus getVertexStatus(String dagIdStr, String vertexName)
+ public VertexStatus getVertexStatus(String dagIdStr, String vertexName,
+ Set<StatusGetOpts> statusOptions)
throws TezException{
- VertexStatus status = getDAG(dagIdStr).getVertexStatus(vertexName);
+ VertexStatus status = getDAG(dagIdStr)
+ .getVertexStatus(vertexName, statusOptions);
if(status == null) {
throw new TezException("Unknown vertexName: " + vertexName);
}
@@ -1414,11 +1420,11 @@ public class DAGAppMaster extends AbstractService {
appMaster.currentUser = UserGroupInformation.getCurrentUser();
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
-
+
UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
appMasterUgi.addCredentials(credentials);
-
+
// Now remove the AM->RM token so tasks don't have it
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
@@ -1427,7 +1433,7 @@ public class DAGAppMaster extends AbstractService {
iter.remove();
}
}
-
+
appMaster.tokens = credentials;
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index ce1ee89..4e12603 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -20,11 +20,13 @@ package org.apache.tez.dag.app.dag;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
@@ -66,8 +68,9 @@ public interface DAG {
Configuration getConf();
DAGPlan getJobPlan();
- DAGStatusBuilder getDAGStatus();
- VertexStatusBuilder getVertexStatus(String vertexName);
+ DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions);
+ VertexStatusBuilder getVertexStatus(String vertexName,
+ Set<StatusGetOpts> statusOptions);
boolean isComplete();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index caab317..737091a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.counters.TezCounters;
@@ -28,6 +29,7 @@ import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -68,7 +70,8 @@ public interface Vertex extends Comparable<Vertex> {
int getRunningTasks();
float getProgress();
ProgressBuilder getVertexProgress();
- VertexStatusBuilder getVertexStatus();
+ VertexStatusBuilder getVertexStatus(Set<StatusGetOpts> statusOptions);
+
void setParallelism(int parallelism, Map<Vertex, EdgeManager> sourceEdgeManagers);
void setVertexLocationHint(VertexLocationHint vertexLocationHint);
@@ -79,13 +82,13 @@ public interface Vertex extends Comparable<Vertex> {
Map<Vertex, Edge> getInputVertices();
Map<Vertex, Edge> getOutputVertices();
-
+
void setAdditionalInputs(List<RootInputLeafOutputProto> inputs);
void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs);
-
+
Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs();
Map<String, RootInputLeafOutputDescriptor<OutputDescriptor>> getAdditionalOutputs();
-
+
List<InputSpec> getInputSpecList(int taskIndex);
List<OutputSpec> getOutputSpecList(int taskIndex);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0df7875..d16086b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
@@ -510,7 +511,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// monitoring apis
@Override
- public DAGStatusBuilder getDAGStatus() {
+ public DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions) {
DAGStatusBuilder status = new DAGStatusBuilder();
int totalTaskCount = 0;
int totalSucceededTaskCount = 0;
@@ -537,6 +538,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
status.setState(getState());
status.setDiagnostics(diagnostics);
status.setDAGProgress(dagProgress);
+ if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
+ status.setDAGCounters(getAllCounters());
+ }
return status;
} finally {
readLock.unlock();
@@ -544,12 +548,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
@Override
- public VertexStatusBuilder getVertexStatus(String vertexName) {
+ public VertexStatusBuilder getVertexStatus(String vertexName,
+ Set<StatusGetOpts> statusOptions) {
Vertex vertex = vertexMap.get(vertexName);
if(vertex == null) {
return null;
}
- return vertex.getVertexStatus();
+ return vertex.getVertexStatus(statusOptions);
}
@@ -1032,10 +1037,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
/**
* Set the terminationCause and send a kill-message to all vertices.
* The vertex-kill messages are only sent once.
- * @param the trigger that is causing the DAG to transition to KILLED/FAILED
- * @param event The type of kill event to send to the vertices.
*/
- void enactKill(DAGTerminationCause dagTerminationCause, VertexTerminationCause vertexTerminationCause) {
+ void enactKill(DAGTerminationCause dagTerminationCause,
+ VertexTerminationCause vertexTerminationCause) {
if(trySetTerminationCause(dagTerminationCause)){
for (Vertex v : vertices.values()) {
@@ -1100,7 +1104,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
job.numCompletedVertices++;
if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
if (!job.reRunningVertices.contains(vertex.getVertexId())) {
- // vertex succeeded for the first time
+ // vertex succeeded for the first time
job.dagScheduler.vertexCompleted(vertex);
}
job.vertexSucceeded(vertex);
@@ -1117,7 +1121,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
job.vertexKilled(vertex);
forceTransitionToKillWait = true;
}
-
+
job.reRunningVertices.remove(vertex.getVertexId());
LOG.info("Vertex " + vertex.getVertexId() + " completed."
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 3d4703b..8a587c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -109,11 +109,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private long scheduledTime;
protected TaskLocationHint locationHint;
-
+
private List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
- private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
+ private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
new ArrayList(0);
-
+
// counts the number of attempts that are either running or in a state where
// they will come to be running when they get a Container
@@ -162,8 +162,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
- // When current attempt fails/killed and new attempt launched then
- // TODO Task should go back to SCHEDULED state TEZ-495
+ // When current attempt fails/killed and new attempt launched then
+ // TODO Task should go back to SCHEDULED state TEZ-495
// Transitions from RUNNING state
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
@@ -444,7 +444,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
readLock.unlock();
}
}
-
+
@Override
public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
int fromEventId, int maxEvents) {
@@ -464,39 +464,39 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
events = Collections.unmodifiableList(new ArrayList<TezEvent>(
tezEventsForTaskAttempts.subList(fromEventId, toEventId)));
LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
- + "-" + toEventId + ")");
- // currently not modifying the events so that we dont have to create
+ + "-" + toEventId + ")");
+ // currently not modifying the events so that we dont have to create
// copies of events. e.g. if we have to set taskAttemptId into the TezEvent
// destination metadata then we will need to create a copy of the TezEvent
- // and then modify the metadata and then send the copy on the RPC. This
- // is important because TezEvents are only routed in the AM and not copied
- // during routing. So e.g. a broadcast edge will send the same event to
- // all consumers (like it should). If copies were created then re-routing
- // the events on parallelism changes would be difficult. We would have to
- // buffer the events in the Vertex until the parallelism was set and then
+ // and then modify the metadata and then send the copy on the RPC. This
+ // is important because TezEvents are only routed in the AM and not copied
+ // during routing. So e.g. a broadcast edge will send the same event to
+ // all consumers (like it should). If copies were created then re-routing
+ // the events on parallelism changes would be difficult. We would have to
+ // buffer the events in the Vertex until the parallelism was set and then
// route the events.
}
return events;
} finally {
readLock.unlock();
- }
+ }
}
-
- @Override
+
+ @Override
public List<TezEvent> getAndClearTaskTezEvents() {
readLock.lock();
try {
List<TezEvent> events = tezEventsForTaskAttempts;
- tezEventsForTaskAttempts = new ArrayList<TezEvent>();
+ tezEventsForTaskAttempts = new ArrayList<TezEvent>();
return events;
} finally {
readLock.unlock();
- }
+ }
}
@Override
public List<String> getDiagnostics() {
- List<String> diagnostics = new ArrayList<String>(5);
+ List<String> diagnostics = new ArrayList<String>(attempts.size());
readLock.lock();
try {
for (TaskAttempt att : attempts.values()) {
@@ -616,7 +616,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
if (ta == null) {
throw new TezUncheckedException("Unknown task for commit: " + taskAttemptID);
}
- // Its ok to get a non-locked state snapshot since we handle changes of
+ // Its ok to get a non-locked state snapshot since we handle changes of
// state in the task attempt. Dont want to deadlock here.
TaskAttemptState taState = ta.getStateNoLock();
if (taState == TaskAttemptState.RUNNING) {
@@ -624,7 +624,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
LOG.info(taskAttemptID + " given a go for committing the task output.");
return true;
} else {
- LOG.info(taskAttemptID + " with state: " + taState +
+ LOG.info(taskAttemptID + " with state: " + taState +
" given a no-go for commit because its not running.");
return false;
}
@@ -636,14 +636,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// Don't think this can be a pluggable decision, so simply raise an
// event for the TaskAttempt to delete its output.
// Wait for commit attempt to succeed. Dont kill this. If commit
- // attempt fails then choose a different committer. When commit attempt
+ // attempt fails then choose a different committer. When commit attempt
// succeeds then this and others will be killed
LOG.info(commitAttempt
+ " is current committer. Commit waiting for: "
+ taskAttemptID);
return false;
}
-
+
} finally {
writeLock.unlock();
}
@@ -654,7 +654,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
public boolean needsWaitAfterOutputConsumable() {
Vertex vertex = getVertex();
ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
- if (processorDescriptor != null &&
+ if (processorDescriptor != null &&
processorDescriptor.getClassName().contains("InitialTaskWithInMemSort")) {
return true;
} else {
@@ -929,17 +929,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TezTaskAttemptID successTaId = ((TaskEventTAUpdate) event).getTaskAttemptID();
-
- if (task.commitAttempt != null &&
+
+ if (task.commitAttempt != null &&
!task.commitAttempt.equals(successTaId)) {
// The succeeded attempt is not the one that was selected to commit
// This is impossible and has to be a bug
- throw new TezUncheckedException("TA: " + successTaId
- + " succeeded but TA: " + task.commitAttempt
+ throw new TezUncheckedException("TA: " + successTaId
+ + " succeeded but TA: " + task.commitAttempt
+ " was expected to commit and succeed");
}
-
- task.handleTaskAttemptCompletion(successTaId,
+
+ task.handleTaskAttemptCompletion(successTaId,
TaskAttemptStateInternal.SUCCEEDED);
task.finishedAttempts++;
--task.numberUncompletedAttempts;
@@ -976,7 +976,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
- if (task.commitAttempt !=null &&
+ if (task.commitAttempt !=null &&
castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
task.commitAttempt = null;
}
@@ -1028,7 +1028,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.failedAttempts++;
TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
- if (task.commitAttempt != null &&
+ if (task.commitAttempt != null &&
castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
task.commitAttempt = null;
}
@@ -1096,17 +1096,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// fails, we have to let AttemptFailedTransition.transition
// believe that there's no redundancy.
unSucceed(task);
-
+
// fake values for code for super.transition
++task.numberUncompletedAttempts;
task.finishedAttempts--;
- TaskStateInternal returnState = super.transition(task, event);
-
+ TaskStateInternal returnState = super.transition(task, event);
+
if (returnState == TaskStateInternal.SCHEDULED) {
// tell the dag about the rescheduling
- task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
+ task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
}
-
+
return returnState;
}
@@ -1184,8 +1184,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
logMsg));
}
}
-
- private static class AddTezEventTransition
+
+ private static class AddTezEventTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index df99bb7..257984f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -62,6 +62,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.committer.NullVertexOutputCommitter;
@@ -240,7 +241,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.INITIALIZING, VertexState.ERROR,
VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
-
+
// Transitions from INITED state
// SOURCE_VERTEX_STARTED - for srces which detemrine parallelism, they must complete before this vertex can start.
.addTransition(VertexState.INITED, VertexState.INITED,
@@ -318,8 +319,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.SUCCEEDED,
VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
- .addTransition(VertexState.SUCCEEDED,
- EnumSet.of(VertexState.RUNNING, VertexState.FAILED),
+ .addTransition(VertexState.SUCCEEDED,
+ EnumSet.of(VertexState.RUNNING, VertexState.FAILED),
VertexEventType.V_TASK_RESCHEDULED,
new TaskRescheduledAfterVertexSuccessTransition())
@@ -329,7 +330,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
// after we are done reruns of source tasks should not affect
// us. These reruns may be triggered by other consumer vertices.
- // We should have been in RUNNING state if we had triggered the
+ // We should have been in RUNNING state if we had triggered the
// reruns.
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
// accumulate these in case we get restarted
@@ -432,9 +433,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
private RootInputInitializerRunner rootInputInitializer;
-
+
private VertexScheduler vertexScheduler;
-
+
private boolean parallelismSet = false;
private VertexOutputCommitter committer;
@@ -558,10 +559,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
try {
// does it matter to create a duplicate list for efficiency
// instead of traversing the map
- // local assign to LinkedHashMap to ensure that sequential traversal
+ // local assign to LinkedHashMap to ensure that sequential traversal
// assumption is satisfied
LinkedHashMap<TezTaskID, Task> taskList = tasks;
- int i=0;
+ int i=0;
for(Map.Entry<TezTaskID, Task> entry : taskList.entrySet()) {
if(taskIndex == i) {
return entry.getValue();
@@ -681,13 +682,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
- public VertexStatusBuilder getVertexStatus() {
+ public VertexStatusBuilder getVertexStatus(
+ Set<StatusGetOpts> statusOptions) {
this.readLock.lock();
try {
VertexStatusBuilder status = new VertexStatusBuilder();
status.setState(getInternalState());
status.setDiagnostics(diagnostics);
status.setProgress(getVertexProgress());
+ if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
+ status.setVertexCounters(getAllCounters());
+ }
return status;
} finally {
this.readLock.unlock();
@@ -752,7 +757,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
- // TODO Create InputReadyVertexManager that schedules when there is something
+ // TODO Create InputReadyVertexManager that schedules when there is something
// to read and use that as default instead of ImmediateStart.TEZ-480
@Override
public void scheduleTasks(Collection<TezTaskID> taskIDs) {
@@ -775,7 +780,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
Preconditions.checkState(parallelismSet == false,
"Parallelism can only be set dynamically once per vertex");
parallelismSet = true;
-
+
// Input initializer expected to set parallelism.
if (numTasks == -1) {
Preconditions
@@ -789,7 +794,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// INITIALIZING state.
return;
}
-
+
if (parallelism >= numTasks) {
// not that hard to support perhaps. but checking right now since there
// is no use case for it and checking may catch other bugs.
@@ -800,16 +805,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Ingoring setParallelism to current value: " + parallelism);
return;
}
-
+
// start buffering incoming events so that we can re-route existing events
for (Edge edge : sourceVertices.values()) {
edge.startEventBuffering();
}
-
+
// Use a set since the same event may have been sent to multiple tasks
// and we want to avoid duplicates
Set<TezEvent> pendingEvents = new HashSet<TezEvent>();
-
+
LOG.info("Vertex " + getVertexId() + " parallelism set to " + parallelism);
// assign to local variable of LinkedHashMap to make sure that changing
// type of task causes compile error. We depend on LinkedHashMap for order
@@ -842,27 +847,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
Vertex sourceVertex = entry.getKey();
EdgeManager edgeManager = entry.getValue();
Edge edge = sourceVertices.get(sourceVertex);
- LOG.info("Replacing edge manager for source:"
+ LOG.info("Replacing edge manager for source:"
+ sourceVertex.getVertexId() + " destination: " + getVertexId());
edge.setEdgeManager(edgeManager);
}
}
-
+
// Re-route all existing TezEvents according to new routing table
- // At this point only events attributed to source task attempts can be
- // re-routed. e.g. DataMovement or InputFailed events.
+ // At this point only events attributed to source task attempts can be
+ // re-routed. e.g. DataMovement or InputFailed events.
// This assumption is fine for now since these tasks haven't been started.
- // So they can only get events generated from source task attempts that
+ // So they can only get events generated from source task attempts that
// have already been started.
DAG dag = getDAG();
for(TezEvent event : pendingEvents) {
TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID()
- .getTaskID().getVertexID();
+ .getTaskID().getVertexID();
Vertex sourceVertex = dag.getVertex(sourceVertexId);
Edge sourceEdge = sourceVertices.get(sourceVertex);
sourceEdge.sendTezEventToDestinationTasks(event);
}
-
+
// stop buffering events
for (Edge edge : sourceVertices.values()) {
edge.stopEventBuffering();
@@ -871,7 +876,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
} finally {
writeLock.unlock();
}
-
+
}
public void setVertexLocationHint(VertexLocationHint vertexLocationHint) {
@@ -1104,7 +1109,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
return finalState;
}
-
+
VertexState finished(VertexState finalState) {
return finished(finalState, null);
}
@@ -1114,7 +1119,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Answer: Do commit for every vertex
// for now, only for leaf vertices
// TODO TEZ-41 make commmitter type configurable per vertex
-
+
if (!this.additionalOutputSpecs.isEmpty()) {
committer = new MRVertexOutputCommitter();
}
@@ -1138,7 +1143,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return VertexState.INITED;
}
-
+
/**
* If the number of tasks are greater than the configured value
* throw an exception that will fail job initialization
@@ -1146,7 +1151,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private void checkTaskLimits() {
// no code, for now
}
-
+
private void createTasks() {
Configuration conf = this.conf;
boolean useNullLocationHint = true;
@@ -1181,7 +1186,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
-
+
public static class InitTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -1259,7 +1264,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
vertex.checkTaskLimits();
-
+
// Create tasks based on initial configuration, but don't start them yet.
if (vertex.numTasks == -1) {
LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers"
@@ -1277,11 +1282,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
float waves = vertex.conf.getFloat(
TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
-
+
int numTasks = (int)((totalResource*waves)/taskResource);
-
- LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks
- + " tasks. Headroom: " + totalResource + " Task Resource: "
+
+ LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks
+ + " tasks. Headroom: " + totalResource + " Task Resource: "
+ taskResource + " waves: " + waves);
vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
@@ -1301,7 +1306,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return vertex.initializeVertex();
}
} // end of InitTransition
-
+
@VisibleForTesting
protected RootInputInitializerRunner createRootInputInitializerRunner(
String dagName, String vertexName, TezVertexID vertexID,
@@ -1309,7 +1314,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return new RootInputInitializerRunner(dagName, vertexName, vertexID,
eventHandler, numTasks);
}
-
+
public static class RootInputInitializedTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -1388,9 +1393,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.startTimeRequested = vertex.clock.getTime();
vertex.startSignalPending = true;
}
-
+
}
-
+
public static class StartTransition
implements SingleArcTransition<VertexImpl, VertexEvent> {
/**
@@ -1409,10 +1414,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertexScheduler.onVertexStarted(pendingReportedSrcCompletions);
pendingReportedSrcCompletions.clear();
logJobHistoryVertexStartedEvent();
-
+
// TODO: Metrics
//job.metrics.runningJob(job);
-
+
// default behavior is to start immediately. so send information about us
// starting to downstream vertices. If the connections/structure of this
// vertex is not fully defined yet then we could send this event later
@@ -1557,7 +1562,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ " attempt: " + completionEvent.getTaskAttemptId()
+ " with state: " + completionEvent.getTaskAttemptState()
+ " vertexState: " + vertex.getState());
-
+
if (TaskAttemptStateInternal.SUCCEEDED.equals(completionEvent
.getTaskAttemptState())) {
vertex.numSuccessSourceAttemptCompletions++;
@@ -1675,25 +1680,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.eventHandler.handle(new DAGEventVertexReRunning(vertex.getVertexId()));
return VertexState.RUNNING;
}
-
+
LOG.info(vertex.getVertexId() + " failed due to post-commit rescheduling of "
+ ((VertexEventTaskReschedule)event).getTaskID());
// terminate any running tasks
vertex.enactKill(VertexTerminationCause.OWN_TASK_FAILURE,
TaskTerminationCause.OWN_TASK_FAILURE);
- // since the DAG thinks this vertex is completed it must be notified of
+ // since the DAG thinks this vertex is completed it must be notified of
// an error
vertex.eventHandler.handle(new DAGEvent(vertex.getDAGId(),
DAGEventType.INTERNAL_ERROR));
return VertexState.FAILED;
}
}
-
+
private void addDiagnostic(String diag) {
diagnostics.add(diag);
}
-
- private static boolean isEventFromVertex(Vertex vertex,
+
+ private static boolean isEventFromVertex(Vertex vertex,
EventMetaData sourceMeta) {
if (!sourceMeta.getTaskVertexName().equals(vertex.getName())) {
return false;
@@ -1701,7 +1706,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return true;
}
- private static void checkEventSourceMetadata(Vertex vertex,
+ private static void checkEventSourceMetadata(Vertex vertex,
EventMetaData sourceMeta) {
if (!isEventFromVertex(vertex, sourceMeta)) {
throw new TezUncheckedException("Bad routing of event"
@@ -1754,7 +1759,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName());
Edge destEdge = vertex.targetVertices.get(destVertex);
if (destEdge == null) {
- throw new TezUncheckedException("Bad destination vertex: " +
+ throw new TezUncheckedException("Bad destination vertex: " +
sourceMeta.getEdgeVertexName() + " for event vertex: " +
vertex.getVertexId());
}
@@ -1766,7 +1771,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
sourceMeta.getTaskVertexName()));
if (srcEdge == null) {
- throw new TezUncheckedException("Bad source vertex: " +
+ throw new TezUncheckedException("Bad source vertex: " +
sourceMeta.getTaskVertexName() + " for destination vertex: " +
vertex.getVertexId());
}
@@ -1830,7 +1835,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
}
-
+
private static class InternalErrorTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {
@Override
@@ -1861,7 +1866,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
"For now, only a single root input can be specified on a Vertex");
this.additionalInputs = Maps.newHashMapWithExpectedSize(inputs.size());
for (RootInputLeafOutputProto input : inputs) {
-
+
InputDescriptor id = DagTypeConverters
.convertInputDescriptorFromDAGPlan(input.getEntityDescriptor());
@@ -1872,7 +1877,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
InputSpec inputSpec = new InputSpec(input.getName(), id, 0);
additionalInputSpecs.add(inputSpec);
}
-
+
}
@Override
@@ -1880,7 +1885,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("setting additional outputs for vertex " + this.vertexName);
this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
for (RootInputLeafOutputProto output : outputs) {
-
+
OutputDescriptor od = DagTypeConverters
.convertOutputDescriptorFromDAGPlan(output.getEntityDescriptor());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 5299431..23f5c72 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -18,11 +18,20 @@
package org.apache.tez.mapreduce.examples;
+import java.io.IOException;
import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
import org.apache.hadoop.util.ProgramDriver;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.mapreduce.examples.terasort.TeraGen;
import org.apache.tez.mapreduce.examples.terasort.TeraSort;
import org.apache.tez.mapreduce.examples.terasort.TeraValidate;
@@ -85,7 +94,17 @@ public class ExampleDriver {
System.exit(exitCode);
}
- public static void printMRRDAGStatus(DAGStatus dagStatus) {
+ public static void printDAGStatus(DAGClient dagClient, String[] vertexNames)
+ throws IOException, TezException {
+ printDAGStatus(dagClient, vertexNames, false, false);
+ }
+
+ public static void printDAGStatus(DAGClient dagClient, String[] vertexNames,
+ boolean displayDAGCounters, boolean displayVertexCounters)
+ throws IOException, TezException {
+ Set<StatusGetOpts> opts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+ DAGStatus dagStatus = dagClient.getDAGStatus(
+ (displayDAGCounters ? opts : null));
Progress progress = dagStatus.getDAGProgress();
double vProgressFloat = 0.0f;
if (progress != null) {
@@ -96,9 +115,10 @@ public class ExampleDriver {
+ (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) :
formatter.format((double)(progress.getSucceededTaskCount())
/progress.getTotalTaskCount())));
- final String[] vNames = { "initialmap", "ivertex1", "finalreduce" };
- for (String vertexName : vNames) {
- Progress vProgress = dagStatus.getVertexProgress().get(vertexName);
+ for (String vertexName : vertexNames) {
+ VertexStatus vStatus = dagClient.getVertexStatus(vertexName,
+ (displayVertexCounters ? opts : null));
+ Progress vProgress = vStatus.getProgress();
if (vProgress != null) {
vProgressFloat = 0.0f;
if (vProgress.getTotalTaskCount() == 0) {
@@ -113,6 +133,19 @@ public class ExampleDriver {
: vertexName)
+ " Progress: " + formatter.format(vProgressFloat));
}
+ if (displayVertexCounters) {
+ TezCounters counters = vStatus.getVertexCounters();
+ if (counters != null) {
+ System.out.println("Vertex Counters for " + vertexName + ": "
+ + counters);
+ }
+ }
+ }
+ }
+ if (displayDAGCounters) {
+ TezCounters counters = dagStatus.getDAGCounters();
+ if (counters != null) {
+ System.out.println("DAG Counters: " + counters);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index bd032e2..a73b3fc 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -93,7 +93,7 @@ public class FilterLinesByWord {
String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
boolean generateSplitsInClient = false;
-
+
SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
try {
generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
@@ -196,7 +196,7 @@ public class FilterLinesByWord {
Map<String, String> stage1Env = new HashMap<String, String>();
MRHelpers.updateEnvironmentForMRTasks(stage1Conf, stage1Env, true);
stage1Vertex.setTaskEnvironment(stage1Env);
-
+
// Configure the Input for stage1
Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
: MRInputAMSplitGenerator.class;
@@ -214,7 +214,7 @@ public class FilterLinesByWord {
Map<String, String> stage2Env = new HashMap<String, String>();
MRHelpers.updateEnvironmentForMRTasks(stage2Conf, stage2Env, false);
stage2Vertex.setTaskEnvironment(stage2Env);
-
+
// Configure the Output for stage2
stage2Vertex.addOutput("MROutput",
new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers
@@ -233,9 +233,10 @@ public class FilterLinesByWord {
LOG.info("Submitted DAG to Tez Session");
DAGStatus dagStatus = null;
+ String[] vNames = { "stage1", "stage2" };
try {
while (true) {
- dagStatus = dagClient.getDAGStatus();
+ dagStatus = dagClient.getDAGStatus(null);
if(dagStatus.getState() == DAGStatus.State.RUNNING ||
dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
dagStatus.getState() == DAGStatus.State.FAILED ||
@@ -252,13 +253,13 @@ public class FilterLinesByWord {
while (dagStatus.getState() == DAGStatus.State.RUNNING) {
try {
- ExampleDriver.printMRRDAGStatus(dagStatus);
+ ExampleDriver.printDAGStatus(dagClient, vNames);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue;
}
- dagStatus = dagClient.getDAGStatus();
+ dagStatus = dagClient.getDAGStatus(null);
} catch (TezException e) {
LOG.fatal("Failed to get application progress. Exiting");
System.exit(-1);
@@ -269,24 +270,24 @@ public class FilterLinesByWord {
tezSession.stop();
}
- ExampleDriver.printMRRDAGStatus(dagStatus);
+ ExampleDriver.printDAGStatus(dagClient, vNames);
LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
}
-
+
public static class TextLongPair implements Writable {
private Text text;
private LongWritable longWritable;
-
+
public TextLongPair() {
}
-
+
public TextLongPair(Text text, LongWritable longWritable) {
this.text = text;
this.longWritable = longWritable;
}
-
+
@Override
public void write(DataOutput out) throws IOException {
this.text.write(out);
@@ -300,10 +301,10 @@ public class FilterLinesByWord {
text.readFields(in);
longWritable.readFields(in);
}
-
+
@Override
public String toString() {
return text.toString() + "\t" + longWritable.get();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
index 29d6db5..60ce3da 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
@@ -197,9 +197,10 @@ public class GroupByOrderByMRRTest {
ApplicationId appId = TypeConverter.toYarn(jobId).getAppId();
DAGClient dagClient = tezClient.getDAGClient(appId);
- DAGStatus dagStatus = null;
+ DAGStatus dagStatus;
+ String[] vNames = { "initialmap" , "ireduce1" , "finalreduce" };
while (true) {
- dagStatus = dagClient.getDAGStatus();
+ dagStatus = dagClient.getDAGStatus(null);
if(dagStatus.getState() == DAGStatus.State.RUNNING ||
dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
dagStatus.getState() == DAGStatus.State.FAILED ||
@@ -216,20 +217,20 @@ public class GroupByOrderByMRRTest {
while (dagStatus.getState() == DAGStatus.State.RUNNING) {
try {
- ExampleDriver.printMRRDAGStatus(dagStatus);
+ ExampleDriver.printDAGStatus(dagClient, vNames);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue;
}
- dagStatus = dagClient.getDAGStatus();
+ dagStatus = dagClient.getDAGStatus(null);
} catch (TezException e) {
LOG.fatal("Failed to get application progress. Exiting");
System.exit(-1);
}
}
- ExampleDriver.printMRRDAGStatus(dagStatus);
+ ExampleDriver.printDAGStatus(dagClient, vNames);
LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 8943dfa..edea15b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -834,7 +834,7 @@ public class MRRSleepJob extends Configured implements Tool {
tezClient.submitDAGApplication(appId, dag, amConfig);
while (true) {
- DAGStatus status = dagClient.getDAGStatus();
+ DAGStatus status = dagClient.getDAGStatus(null);
LOG.info("DAG Status: " + status);
if (status.isCompleted()) {
break;