You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/10/10 00:18:09 UTC
git commit: TEZ-537. Add support for session jars and pre-warming of
containers to be re-used across a session. (hitesh)
Updated Branches:
refs/heads/master d41471fc6 -> e2e9247e4
TEZ-537. Add support for session jars and pre-warming of containers to be re-used across a session. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e2e9247e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e2e9247e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e2e9247e
Branch: refs/heads/master
Commit: e2e9247e494259fab7e00252f64fafbb91edaa2c
Parents: d41471f
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Oct 9 15:17:52 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Oct 9 15:17:52 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/client/TezClientUtils.java | 81 +++++++--
.../java/org/apache/tez/client/TezSession.java | 16 +-
.../tez/client/TezSessionConfiguration.java | 33 +++-
.../main/java/org/apache/tez/dag/api/DAG.java | 180 ++++++++++---------
.../apache/tez/dag/api/DagTypeConverters.java | 58 ++++++
.../apache/tez/dag/api/TezConfiguration.java | 33 ++++
tez-api/src/main/proto/DAGApiRecords.proto | 4 +
.../apache/hadoop/mapred/YarnTezDagChild.java | 21 +--
.../java/org/apache/tez/dag/app/AppContext.java | 4 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 173 +++++++++++++++---
.../dag/app/TaskAttemptListenerImpTezDag.java | 6 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 9 +-
.../mapreduce/examples/OrderedWordCount.java | 22 +++
.../library/processor/SleepProcessor.java | 111 ++++++++++++
14 files changed, 594 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 93d51d1..87042e9 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
@@ -74,6 +75,7 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
@@ -204,23 +206,18 @@ public class TezClientUtils {
/**
* Create an ApplicationSubmissionContext to launch a Tez AM
- * @param conf
- * @param appId
- * @param dag
- * @param appStagingDir
- * @param ts
- * @param amQueueName
- * @param amName
- * @param amArgs
- * @param amEnv
- * @param amLocalResources
- * @param appConf
+ * @param conf TezConfiguration
+ * @param appId Application Id
+ * @param dag DAG to be submitted
+ * @param amName Name for the application
+ * @param amConfig AM Configuration
+ * @param tezJarResources Resources to be used by the AM
* @return
* @throws IOException
* @throws YarnException
*/
static ApplicationSubmissionContext createApplicationSubmissionContext(
- Configuration conf, ApplicationId appId, DAG dag, String amName,
+ TezConfiguration conf, ApplicationId appId, DAG dag, String amName,
AMConfiguration amConfig,
Map<String, LocalResource> tezJarResources)
throws IOException, YarnException{
@@ -327,7 +324,8 @@ public class TezClientUtils {
localResources.putAll(tezJarResources);
// emit conf as PB file
- Configuration finalTezConf = createFinalTezConfForApp(amConfig.getAMConf());
+ Configuration finalTezConf = createFinalTezConfForApp(conf,
+ amConfig.getAMConf());
Path binaryConfPath = new Path(amConfig.getStagingDir(),
TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
FSDataOutputStream amConfPBOutBinaryStream = null;
@@ -359,8 +357,38 @@ public class TezClientUtils {
localResources.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
binaryConfLRsrc);
+ // Create Session Jars definition to be sent to AM as a local resource
+ Path sessionJarsPath = new Path(amConfig.getStagingDir(),
+ TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME + "."
+ + appId.toString());
+ FSDataOutputStream sessionJarsPBOutStream = null;
+ try {
+ Map<String, LocalResource> sessionJars =
+ new HashMap<String, LocalResource>(tezJarResources.size() + 1);
+ sessionJars.putAll(tezJarResources);
+ sessionJars.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+ binaryConfLRsrc);
+ DAGProtos.PlanLocalResourcesProto proto =
+ DagTypeConverters.convertFromLocalResources(sessionJars);
+ sessionJarsPBOutStream = FileSystem.create(fs, sessionJarsPath,
+ new FsPermission(TEZ_AM_FILE_PERMISSION));
+ proto.writeTo(sessionJarsPBOutStream);
+ } finally {
+ if (sessionJarsPBOutStream != null) {
+ sessionJarsPBOutStream.close();
+ }
+ }
+
+ LocalResource sessionJarsPBLRsrc =
+ TezClientUtils.createLocalResource(fs,
+ sessionJarsPath, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION);
+ localResources.put(
+ TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME,
+ sessionJarsPBLRsrc);
+
if(dag != null) {
- // Add tez jars to vertices too
+
for (Vertex v : dag.getVertices()) {
v.getTaskLocalResources().putAll(tezJarResources);
v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
@@ -440,19 +468,36 @@ public class TezClientUtils {
+ "," + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
}
- static Configuration createFinalTezConfForApp(TezConfiguration amConf) {
+ static Configuration createFinalTezConfForApp(TezConfiguration tezConf,
+ TezConfiguration amConf) {
Configuration conf = new Configuration(false);
conf.setQuietMode(true);
+ assert tezConf != null;
assert amConf != null;
- Iterator<Entry<String, String>> iter = amConf.iterator();
+
+ Entry<String, String> entry;
+ Iterator<Entry<String, String>> iter = tezConf.iterator();
+ while (iter.hasNext()) {
+ entry = iter.next();
+ // Copy all tez config parameters.
+ if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
+ conf.set(entry.getKey(), entry.getValue());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding tez dag am parameter from conf: " + entry.getKey()
+ + ", with value: " + entry.getValue());
+ }
+ }
+ }
+
+ iter = amConf.iterator();
while (iter.hasNext()) {
- Entry<String, String> entry = iter.next();
+ entry = iter.next();
// Copy all tez config parameters.
if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
conf.set(entry.getKey(), entry.getValue());
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding tez dag am parameter: " + entry.getKey()
+ LOG.debug("Adding tez dag am parameter from amConf: " + entry.getKey()
+ ", with value: " + entry.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index 45408e1..95ee1b5 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -89,6 +89,11 @@ public class TezSession {
tezJarResources = TezClientUtils.setupTezJarsLocalResources(
sessionConfig.getTezConfiguration());
+ if (sessionConfig.getSessionResources() != null
+ && !sessionConfig.getSessionResources().isEmpty()) {
+ tezJarResources.putAll(sessionConfig.getSessionResources());
+ }
+
try {
if (applicationId == null) {
applicationId = yarnClient.createApplication().
@@ -134,14 +139,7 @@ public class TezSession {
LOG.info("Submitting dag to TezSession"
+ ", sessionName=" + sessionName
+ ", applicationId=" + applicationId);
- // Add tez jars to vertices too
- for (Vertex v : dag.getVertices()) {
- v.getTaskLocalResources().putAll(tezJarResources);
- if (null != tezConfPBLRsrc) {
- v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
- tezConfPBLRsrc);
- }
- }
+
DAGPlan dagPlan = dag.createDag(sessionConfig.getTezConfiguration());
SubmitDAGRequestProto requestProto =
SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
@@ -153,7 +151,6 @@ public class TezSession {
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
long endTime = startTime + (timeout * 1000);
while (true) {
- // FIXME implement a max time to wait for submit
proxy = TezClientUtils.getSessionAMProxy(yarnClient,
sessionConfig.getYarnConfiguration(), applicationId);
if (proxy != null) {
@@ -269,4 +266,5 @@ public class TezSession {
}
return TezSessionStatus.INITIALIZING;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
index 61ca60b..2ac5b6c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
@@ -18,28 +18,49 @@
package org.apache.tez.client;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.dag.api.TezConfiguration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+
public class TezSessionConfiguration {
private final AMConfiguration amConfiguration;
private final YarnConfiguration yarnConfig;
private final TezConfiguration tezConfig;
+ private final Map<String, LocalResource> sessionResources;
public TezSessionConfiguration(AMConfiguration amConfiguration,
TezConfiguration tezConfig) {
- this.amConfiguration = amConfiguration;
- this.tezConfig = tezConfig;
- this.yarnConfig = new YarnConfiguration(tezConfig);
+ this(amConfiguration, tezConfig, new YarnConfiguration(tezConfig));
+ }
+
+ TezSessionConfiguration(AMConfiguration amConfiguration,
+ TezConfiguration tezConfig,
+ YarnConfiguration yarnConf) {
+ this(amConfiguration, tezConfig, yarnConf,
+ new TreeMap<String, LocalResource>());
}
+ /**
+ * TezSessionConfiguration constructor
+ * @param amConfiguration AM Configuration @see AMConfiguration
+ * @param tezConfig Tez Configuration
+ * @param yarnConf Yarn Configuration
+ * @param sessionResources LocalResources accessible to all tasks that are
+ * launched within this session.
+ */
TezSessionConfiguration(AMConfiguration amConfiguration,
TezConfiguration tezConfig,
- YarnConfiguration yarnConf) {
+ YarnConfiguration yarnConf,
+ Map<String, LocalResource> sessionResources) {
this.amConfiguration = amConfiguration;
this.tezConfig = tezConfig;
this.yarnConfig = yarnConf;
+ this.sessionResources = sessionResources;
}
public AMConfiguration getAMConfiguration() {
@@ -54,4 +75,8 @@ public class TezSessionConfiguration {
return tezConfig;
}
+ public Map<String, LocalResource> getSessionResources() {
+ return Collections.unmodifiableMap(sessionResources);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 98ea91b..7ae7f16 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -64,12 +64,12 @@ public class DAG { // FIXME rename to Topology
public synchronized DAG addVertex(Vertex vertex) {
if (vertices.containsKey(vertex.getVertexName())) {
throw new IllegalStateException(
- "Vertex " + vertex.getVertexName() + " already defined!");
+ "Vertex " + vertex.getVertexName() + " already defined!");
}
vertices.put(vertex.getVertexName(), vertex);
return this;
}
-
+
public synchronized Vertex getVertex(String vertexName) {
return vertices.get(vertexName);
}
@@ -83,15 +83,15 @@ public class DAG { // FIXME rename to Topology
// Sanity checks
if (!vertices.containsValue(edge.getInputVertex())) {
throw new IllegalArgumentException(
- "Input vertex " + edge.getInputVertex() + " doesn't exist!");
+ "Input vertex " + edge.getInputVertex() + " doesn't exist!");
}
if (!vertices.containsValue(edge.getOutputVertex())) {
throw new IllegalArgumentException(
- "Output vertex " + edge.getOutputVertex() + " doesn't exist!");
+ "Output vertex " + edge.getOutputVertex() + " doesn't exist!");
}
if (edges.contains(edge)) {
throw new IllegalArgumentException(
- "Edge " + edge + " already defined!");
+ "Edge " + edge + " already defined!");
}
// Inform the vertices
@@ -101,7 +101,7 @@ public class DAG { // FIXME rename to Topology
edges.add(edge);
return this;
}
-
+
public String getName() {
return this.name;
}
@@ -116,11 +116,11 @@ public class DAG { // FIXME rename to Topology
int outDegree;
- private AnnotatedVertex(Vertex v){
- this.v = v;
- index = -1;
- lowlink = -1;
- outDegree = 0;
+ private AnnotatedVertex(Vertex v) {
+ this.v = v;
+ index = -1;
+ lowlink = -1;
+ outDegree = 0;
}
}
@@ -147,16 +147,16 @@ public class DAG { // FIXME rename to Topology
verify(true);
}
- public void verify(boolean restricted) throws IllegalStateException {
+ public void verify(boolean restricted) throws IllegalStateException {
if (vertices.isEmpty()) {
throw new IllegalStateException("Invalid dag containing 0 vertices");
}
Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
- for(Edge e : edges){
+ for (Edge e : edges) {
Vertex inputVertex = e.getInputVertex();
List<Edge> edgeList = edgeMap.get(inputVertex);
- if(edgeList == null){
+ if (edgeList == null) {
edgeList = new ArrayList<Edge>();
edgeMap.put(inputVertex, edgeList);
}
@@ -166,10 +166,10 @@ public class DAG { // FIXME rename to Topology
// check for valid vertices, duplicate vertex names,
// and prepare for cycle detection
Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
- for(Vertex v : vertices.values()){
- if(vertexMap.containsKey(v.getVertexName())){
- throw new IllegalStateException("DAG contains multiple vertices"
- + " with name: " + v.getVertexName());
+ for (Vertex v : vertices.values()) {
+ if (vertexMap.containsKey(v.getVertexName())) {
+ throw new IllegalStateException("DAG contains multiple vertices"
+ + " with name: " + v.getVertexName());
}
vertexMap.put(v.getVertexName(), new AnnotatedVertex(v));
}
@@ -180,12 +180,12 @@ public class DAG { // FIXME rename to Topology
for (RootInputLeafOutput<InputDescriptor> in : v.getInputs()) {
if (vertexMap.containsKey(in.getName())) {
throw new IllegalStateException(
- "DAG contains a vertex and an Input to a vertex with the same name: "
- + in.getName());
+ "DAG contains a vertex and an Input to a vertex with the same name: "
+ + in.getName());
}
if (namedIOs.contains(in.getName())) {
throw new IllegalStateException(
- "DAG contains an Input or an Output with a repeated name: " + in.getName());
+ "DAG contains an Input or an Output with a repeated name: " + in.getName());
} else {
namedIOs.add(in.getName());
}
@@ -193,38 +193,38 @@ public class DAG { // FIXME rename to Topology
for (RootInputLeafOutput<OutputDescriptor> out : v.getOutputs()) {
if (vertexMap.containsKey(out.getName())) {
throw new IllegalStateException(
- "DAG contains a vertex and an Output from a vertex with the same name: "
- + out.getName());
+ "DAG contains a vertex and an Output from a vertex with the same name: "
+ + out.getName());
}
if (namedIOs.contains(out.getName())) {
throw new IllegalStateException(
- "DAG contains an Input or an Output with a repeated name: " + out.getName());
+ "DAG contains an Input or an Output with a repeated name: " + out.getName());
} else {
namedIOs.add(out.getName());
}
}
}
-
+
detectCycles(edgeMap, vertexMap);
- if(restricted){
- for(Edge e : edges){
+ if (restricted) {
+ for (Edge e : edges) {
vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
if (e.getEdgeProperty().getDataSourceType() !=
- DataSourceType.PERSISTED) {
+ DataSourceType.PERSISTED) {
throw new IllegalStateException(
- "Unsupported source type on edge. " + e);
+ "Unsupported source type on edge. " + e);
}
if (e.getEdgeProperty().getSchedulingType() !=
- SchedulingType.SEQUENTIAL) {
+ SchedulingType.SEQUENTIAL) {
throw new IllegalStateException(
- "Unsupported scheduling type on edge. " + e);
+ "Unsupported scheduling type on edge. " + e);
}
}
- for(AnnotatedVertex av: vertexMap.values()){
+ for (AnnotatedVertex av : vertexMap.values()) {
if (av.outDegree > 1) {
throw new IllegalStateException("Vertex has outDegree>1: "
- + av.v.getVertexName());
+ + av.v.getVertexName());
}
}
}
@@ -233,11 +233,11 @@ public class DAG { // FIXME rename to Topology
// Adaptation of Tarjan's algorithm for connected components.
// http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap)
- throws IllegalStateException{
+ throws IllegalStateException {
Integer nextIndex = 0; // boxed integer so it is passed by reference.
Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
- for(AnnotatedVertex av: vertexMap.values()){
- if(av.index == -1){
+ for (AnnotatedVertex av : vertexMap.values()) {
+ if (av.index == -1) {
assert stack.empty();
strongConnect(av, vertexMap, edgeMap, stack, nextIndex);
}
@@ -247,10 +247,10 @@ public class DAG { // FIXME rename to Topology
// part of Tarjan's algorithm for connected components.
// http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
private void strongConnect(
- AnnotatedVertex av,
- Map<String, AnnotatedVertex> vertexMap,
- Map<Vertex, List<Edge>> edgeMap,
- Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException{
+ AnnotatedVertex av,
+ Map<String, AnnotatedVertex> vertexMap,
+ Map<Vertex, List<Edge>> edgeMap,
+ Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException {
av.index = nextIndex;
av.lowlink = nextIndex;
nextIndex++;
@@ -258,14 +258,13 @@ public class DAG { // FIXME rename to Topology
av.onstack = true;
List<Edge> edges = edgeMap.get(av.v);
- if(edges != null){
- for(Edge e : edgeMap.get(av.v)){
+ if (edges != null) {
+ for (Edge e : edgeMap.get(av.v)) {
AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getVertexName());
- if(outVertex.index == -1){
+ if (outVertex.index == -1) {
strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
- }
- else if(outVertex.onstack){
+ } else if (outVertex.onstack) {
// strongly connected component detected, but we will wait till later so that the full cycle can be displayed.
// update lowlink in case outputVertex should be considered the root of this component.
av.lowlink = Math.min(av.lowlink, outVertex.index);
@@ -273,21 +272,21 @@ public class DAG { // FIXME rename to Topology
}
}
- if(av.lowlink == av.index ){
- AnnotatedVertex pop = stack.pop();
- pop.onstack = false;
- if(pop != av){
- // there was something on the stack other than this "av".
- // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av"
- StringBuilder message = new StringBuilder();
- message.append(av.v.getVertexName() + " <- ");
- for( ; pop != av; pop = stack.pop()){
- message.append(pop.v.getVertexName() + " <- ");
- pop.onstack = false;
- }
- message.append(av.v.getVertexName());
- throw new IllegalStateException("DAG contains a cycle: " + message);
- }
+ if (av.lowlink == av.index) {
+ AnnotatedVertex pop = stack.pop();
+ pop.onstack = false;
+ if (pop != av) {
+ // there was something on the stack other than this "av".
+ // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av"
+ StringBuilder message = new StringBuilder();
+ message.append(av.v.getVertexName() + " <- ");
+ for (; pop != av; pop = stack.pop()) {
+ message.append(pop.v.getVertexName() + " <- ");
+ pop.onstack = false;
+ }
+ message.append(av.v.getVertexName());
+ throw new IllegalStateException("DAG contains a cycle: " + message);
+ }
}
}
@@ -301,12 +300,12 @@ public class DAG { // FIXME rename to Topology
dagBuilder.setName(this.name);
- for(Vertex vertex : vertices.values()){
+ for (Vertex vertex : vertices.values()) {
VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
vertexBuilder.setName(vertex.getVertexName());
vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46.
vertexBuilder.setProcessorDescriptor(DagTypeConverters
- .convertToDAGPlan(vertex.getProcessorDescriptor()));
+ .convertToDAGPlan(vertex.getProcessorDescriptor()));
if (vertex.getInputs().size() > 0) {
for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input));
@@ -328,31 +327,34 @@ public class DAG { // FIXME rename to Topology
taskConfigBuilder.setTaskModule(vertex.getVertexName());
PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
- Map<String,LocalResource> lrs = vertex.getTaskLocalResources();
- for(Entry<String, LocalResource> entry : lrs.entrySet()){
- String key = entry.getKey();
- LocalResource lr = entry.getValue();
- localResourcesBuilder.setName(key);
- localResourcesBuilder.setUri(
+ if (vertex.getTaskLocalResources() != null) {
+ localResourcesBuilder.clear();
+ for (Entry<String, LocalResource> entry :
+ vertex.getTaskLocalResources().entrySet()) {
+ String key = entry.getKey();
+ LocalResource lr = entry.getValue();
+ localResourcesBuilder.setName(key);
+ localResourcesBuilder.setUri(
DagTypeConverters.convertToDAGPlan(lr.getResource()));
- localResourcesBuilder.setSize(lr.getSize());
- localResourcesBuilder.setTimeStamp(lr.getTimestamp());
- localResourcesBuilder.setType(
+ localResourcesBuilder.setSize(lr.getSize());
+ localResourcesBuilder.setTimeStamp(lr.getTimestamp());
+ localResourcesBuilder.setType(
DagTypeConverters.convertToDAGPlan(lr.getType()));
- localResourcesBuilder.setVisibility(
+ localResourcesBuilder.setVisibility(
DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
- if(lr.getType() == LocalResourceType.PATTERN){
- if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
- throw new TezUncheckedException("LocalResource type set to pattern"
+ if (lr.getType() == LocalResourceType.PATTERN) {
+ if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
+ throw new TezUncheckedException("LocalResource type set to pattern"
+ " but pattern is null or empty");
+ }
+ localResourcesBuilder.setPattern(lr.getPattern());
}
- localResourcesBuilder.setPattern(lr.getPattern());
+ taskConfigBuilder.addLocalResource(localResourcesBuilder);
}
- taskConfigBuilder.addLocalResource(localResourcesBuilder);
}
- if(vertex.getTaskEnvironment() != null){
- for(String key : vertex.getTaskEnvironment().keySet()){
+ if (vertex.getTaskEnvironment() != null) {
+ for (String key : vertex.getTaskEnvironment().keySet()) {
PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
envSettingBuilder.setKey(key);
envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key));
@@ -360,15 +362,15 @@ public class DAG { // FIXME rename to Topology
}
}
- if(vertex.getTaskLocationsHint() != null ){
- if(vertex.getTaskLocationsHint().getTaskLocationHints() != null){
- for(TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()){
+ if (vertex.getTaskLocationsHint() != null) {
+ if (vertex.getTaskLocationsHint().getTaskLocationHints() != null) {
+ for (TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()) {
PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
- if(hint.getDataLocalHosts() != null){
+ if (hint.getDataLocalHosts() != null) {
taskLocationHintBuilder.addAllHost(hint.getDataLocalHosts());
}
- if(hint.getRacks() != null){
+ if (hint.getRacks() != null) {
taskLocationHintBuilder.addAllRack(hint.getRacks());
}
@@ -377,11 +379,11 @@ public class DAG { // FIXME rename to Topology
}
}
- for(String inEdgeId : vertex.getInputEdgeIds()){
+ for (String inEdgeId : vertex.getInputEdgeIds()) {
vertexBuilder.addInEdgeId(inEdgeId);
}
- for(String outEdgeId : vertex.getOutputEdgeIds()){
+ for (String outEdgeId : vertex.getOutputEdgeIds()) {
vertexBuilder.addOutEdgeId(outEdgeId);
}
@@ -389,7 +391,7 @@ public class DAG { // FIXME rename to Topology
dagBuilder.addVertex(vertexBuilder);
}
- for(Edge edge : edges){
+ for (Edge edge : edges) {
EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
edgeBuilder.setId(edge.getId());
edgeBuilder.setInputVertexName(edge.getInputVertex().getVertexName());
@@ -402,10 +404,10 @@ public class DAG { // FIXME rename to Topology
dagBuilder.addEdge(edgeBuilder);
}
- if(dagConf != null) {
+ if (dagConf != null) {
Iterator<Entry<String, String>> iter = dagConf.iterator();
ConfigurationProto.Builder confProtoBuilder =
- ConfigurationProto.newBuilder();
+ ConfigurationProto.newBuilder();
while (iter.hasNext()) {
Entry<String, String> entry = iter.next();
PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 3247935..803c943 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -44,6 +44,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
@@ -322,4 +323,61 @@ public class DagTypeConverters {
+ " proto");
}
+
+ public static PlanLocalResourcesProto convertFromLocalResources(
+ Map<String, LocalResource> localResources) {
+ PlanLocalResourcesProto.Builder builder =
+ PlanLocalResourcesProto.newBuilder();
+ for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
+ PlanLocalResource plr = convertLocalResourceToPlanLocalResource(
+ entry.getKey(), entry.getValue());
+ builder.addLocalResources(plr);
+ }
+ return builder.build();
+ }
+
+ public static Map<String, LocalResource> convertFromPlanLocalResources(
+ PlanLocalResourcesProto proto) {
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>(proto.getLocalResourcesCount());
+ for (PlanLocalResource plr : proto.getLocalResourcesList()) {
+ String name = plr.getName();
+ LocalResource lr = convertPlanLocalResourceToLocalResource(plr);
+ localResources.put(name, lr);
+ }
+ return localResources;
+ }
+
+ public static PlanLocalResource convertLocalResourceToPlanLocalResource(
+ String name, LocalResource lr) {
+ PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
+ localResourcesBuilder.setName(name);
+ localResourcesBuilder.setUri(
+ DagTypeConverters.convertToDAGPlan(lr.getResource()));
+ localResourcesBuilder.setSize(lr.getSize());
+ localResourcesBuilder.setTimeStamp(lr.getTimestamp());
+ localResourcesBuilder.setType(
+ DagTypeConverters.convertToDAGPlan(lr.getType()));
+ localResourcesBuilder.setVisibility(
+ DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
+ if (lr.getType() == LocalResourceType.PATTERN) {
+ if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
+ throw new TezUncheckedException("LocalResource type set to pattern"
+ + " but pattern is null or empty");
+ }
+ localResourcesBuilder.setPattern(lr.getPattern());
+ }
+ return localResourcesBuilder.build();
+ }
+
+ public static LocalResource convertPlanLocalResourceToLocalResource(
+ PlanLocalResource plr) {
+ return LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(new Path(plr.getUri())),
+ DagTypeConverters.convertFromDAGPlan(plr.getType()),
+ DagTypeConverters.convertFromDAGPlan(plr.getVisibility()),
+ plr.getSize(), plr.getTimeStamp(),
+ plr.hasPattern() ? plr.getPattern() : null);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index e807636..4efa6e2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -227,6 +227,9 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_SESSION_PREFIX =
TEZ_PREFIX + "session.";
+ public static final String TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME =
+ TEZ_SESSION_PREFIX + "local-resources.pb.file-name";
+
/**
* Time (in seconds) to wait for AM to come up when trying to submit a DAG
* from the client.
@@ -245,4 +248,34 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT =
300;
+ /**
+ * Session pre-warm related configuration options
+ */
+
+ public static final String TEZ_SESSION_PRE_WARM_PREFIX =
+ TEZ_SESSION_PREFIX + "pre-warm.";
+ public static final String TEZ_SESSION_PRE_WARM_ENABLED =
+ TEZ_SESSION_PRE_WARM_PREFIX + "enabled";
+ public static final boolean TEZ_SESSION_PRE_WARM_ENABLED_DEFAULT = false;
+
+ public static final String TEZ_SESSION_PRE_WARM_NUM_CONTAINERS =
+ TEZ_SESSION_PRE_WARM_PREFIX + "num-containers";
+ public static final String TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB =
+ TEZ_SESSION_PRE_WARM_PREFIX + "container.resource.memory.mb";
+ public static final int
+ TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB_DEFAULT = 1024;
+
+ public static final String TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES =
+ TEZ_SESSION_PRE_WARM_PREFIX + "container.resource.vcores";
+ public static final int
+ TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES_DEFAULT = 1;
+
+ public static final String TEZ_SESSION_PRE_WARM_CONTAINER_JAVA_OPTS =
+ TEZ_SESSION_PRE_WARM_PREFIX + "container.java.opts";
+ public static final String TEZ_SESSION_PRE_WARM_CONTAINER_ENVIRONMENT =
+ TEZ_SESSION_PRE_WARM_PREFIX + "container.environment";
+
+ public static final String TEZ_SESSION_PRE_WARM_PROCESSOR_NAME =
+ TEZ_SESSION_PRE_WARM_PREFIX + "processor";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 79e62ca..b948e60 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -189,3 +189,7 @@ message DAGStatusProto {
optional ProgressProto DAGProgress = 3;
repeated StringProgressPairProto vertexProgress = 4;
}
+
+message PlanLocalResourcesProto {
+ repeated PlanLocalResource localResources = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index e87bbb0..5b4793b 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -164,9 +164,11 @@ public class YarnTezDagChild {
int eventCounter = 0;
int eventsRange = 0;
TezTaskAttemptID taskAttemptID = null;
+ List<TezEvent> events = new ArrayList<TezEvent>();
try {
taskLock.readLock().lock();
if (currentTask != null) {
+ eventsToSend.drainTo(events);
taskAttemptID = currentTaskAttemptID;
eventCounter = currentTask.getEventCounter();
eventsRange = maxEventsToGet;
@@ -175,24 +177,21 @@ public class YarnTezDagChild {
currentTask.getCounters(), currentTask.getProgress()),
new EventMetaData(EventProducerConsumerType.SYSTEM,
currentTask.getVertexName(), "", taskAttemptID));
- } else if (outOfBandEvents == null) {
+ events.add(updateEvent);
+ } else if (outOfBandEvents == null && events.isEmpty()) {
LOG.info("Setting TaskAttemptID to null as the task has already"
+ " completed. Caused by race-condition between the normal"
+ " heartbeat and out-of-band heartbeats");
taskAttemptID = null;
+ } else {
+ if (outOfBandEvents != null && !outOfBandEvents.isEmpty()) {
+ events.addAll(outOfBandEvents);
+ }
}
}
} finally {
taskLock.readLock().unlock();
}
- List<TezEvent> events = new ArrayList<TezEvent>();
- if (updateEvent != null) {
- events.add(updateEvent);
- }
- eventsToSend.drainTo(events);
- if (outOfBandEvents != null && !outOfBandEvents.isEmpty()) {
- events.addAll(outOfBandEvents);
- }
long reqId = requestCounter.incrementAndGet();
TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
@@ -244,9 +243,7 @@ public class YarnTezDagChild {
public static void main(String[] args) throws Throwable {
Thread.setDefaultUncaughtExceptionHandler(
new YarnUncaughtExceptionHandler());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Child starting");
- }
+ LOG.info("YarnTezDagChild starting");
final Configuration defaultConf = new Configuration();
TezUtils.addUserSpecifiedTezConfiguration(defaultConf);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 26c0992..1f3619e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.dag.app.dag.DAG;
@@ -74,4 +75,7 @@ public interface AppContext {
AMNodeMap getAllNodes();
TaskSchedulerEventHandler getTaskScheduler();
+
+ Map<String, LocalResource> getSessionResources();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 8d6fd1f..ced6ca2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -26,6 +26,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -63,25 +64,32 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGState;
@@ -116,6 +124,7 @@ import org.apache.tez.dag.history.avro.HistoryEventType;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
/**
* The Map-Reduce Application Master.
@@ -172,6 +181,8 @@ public class DAGAppMaster extends AbstractService {
private VertexEventDispatcher vertexEventDispatcher;
private TaskSchedulerEventHandler taskSchedulerEventHandler;
private HistoryEventHandler historyEventHandler;
+ private final Map<String, LocalResource> sessionResources =
+ new HashMap<String, LocalResource>();
private DAGAppMasterShutdownHandler shutdownHandler =
new DAGAppMasterShutdownHandler();
@@ -230,9 +241,7 @@ public class DAGAppMaster extends AbstractService {
}
@Override
- public void serviceInit(final Configuration conf) throws Exception {
-
- this.state = DAGAppMasterState.INITED;
+ public synchronized void serviceInit(final Configuration conf) throws Exception {
this.amConf = conf;
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
@@ -304,8 +313,27 @@ public class DAGAppMaster extends AbstractService {
TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
+ if (isSession) {
+ FileInputStream sessionResourcesStream = null;
+ try {
+ sessionResourcesStream = new FileInputStream(
+ TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME);
+ PlanLocalResourcesProto localResourcesProto =
+ PlanLocalResourcesProto.parseFrom(sessionResourcesStream);
+ sessionResources.putAll(DagTypeConverters.convertFromPlanLocalResources(
+ localResourcesProto));
+ } finally {
+ if (sessionResourcesStream != null) {
+ sessionResourcesStream.close();
+ }
+ }
+ }
+
initServices(conf);
super.serviceInit(conf);
+
+ this.state = DAGAppMasterState.INITED;
+
}
protected Dispatcher createDispatcher() {
@@ -863,6 +891,11 @@ public class DAGAppMaster extends AbstractService {
}
@Override
+ public Map<String, LocalResource> getSessionResources() {
+ return sessionResources;
+ }
+
+ @Override
public Map<ApplicationAccessType, String> getApplicationACLs() {
if (getServiceState() != STATE.STARTED) {
throw new TezUncheckedException(
@@ -1066,14 +1099,12 @@ public class DAGAppMaster extends AbstractService {
@SuppressWarnings("unchecked")
@Override
- public void serviceStart() throws Exception {
+ public synchronized void serviceStart() throws Exception {
//start all the components
startServices();
super.serviceStart();
- this.state = DAGAppMasterState.IDLE;
-
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("DAGAppMaster");
@@ -1089,19 +1120,124 @@ public class DAGAppMaster extends AbstractService {
if (!isSession) {
startDAG();
} else {
- LOG.info("In Session mode. Waiting for DAG over RPC");
- this.dagSubmissionTimer = new Timer(true);
- this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- checkAndHandleSessionTimeout();
- }
- }, sessionTimeoutInterval, sessionTimeoutInterval/10);
+ boolean preWarmContainersEnabled = amConf.getBoolean(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_ENABLED,
+ TezConfiguration.TEZ_SESSION_PRE_WARM_ENABLED_DEFAULT);
+
+ boolean ranPreWarmContainersDAG = false;
+ if (preWarmContainersEnabled) {
+ ranPreWarmContainersDAG = runPreWarmContainersDAG();
+ }
+
+ if (!ranPreWarmContainersDAG) {
+ LOG.info("In Session mode. Waiting for DAG over RPC");
+ this.state = DAGAppMasterState.IDLE;
+
+ this.dagSubmissionTimer = new Timer(true);
+ this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ checkAndHandleSessionTimeout();
+ }
+ }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
+ }
}
}
+ private boolean runPreWarmContainersDAG() {
+ int numContainers = amConf.getInt(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_NUM_CONTAINERS, 0);
+ if (numContainers == 0) {
+ LOG.info("Not pre-warming containers as "
+ + TezConfiguration.TEZ_SESSION_PRE_WARM_NUM_CONTAINERS
+ + " not specified or set to 0");
+ return false;
+ }
+ if ((null == amConf.get(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB))
+ || (null == amConf.get(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES))) {
+ LOG.info("Not pre-warming containers as container resource"
+ + " requirements not specified"
+ + ", "
+ + TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB
+ + "=" + amConf.get(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB)
+ + ", "
+ + TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES
+ + "=" + amConf.get(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES));
+ return false;
+ }
+
+ Resource containerResource = Resource.newInstance(
+ amConf.getInt(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB,
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB_DEFAULT),
+ amConf.getInt(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES,
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES_DEFAULT));
+
+ ProcessorDescriptor processorDescriptor = null;
+
+ if (amConf.get(TezConfiguration.TEZ_SESSION_PRE_WARM_PROCESSOR_NAME) != null) {
+ processorDescriptor = new ProcessorDescriptor(amConf.get(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_PROCESSOR_NAME));
+ } else {
+ processorDescriptor = new ProcessorDescriptor(
+ SleepProcessor.class.getName());
+ }
+
+ // Create a DAG using SleepProcessor to launch the required containers.
+ org.apache.tez.dag.api.DAG preWarmContainersDAG =
+ new org.apache.tez.dag.api.DAG("PreWarmContainersDAG");
+ Vertex sleepVertex = new Vertex("PreWarmSleepVertex",
+ processorDescriptor, numContainers, containerResource);
+
+ Map<String, String> environment = new HashMap<String, String>();
+ if (null != amConf.get(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_ENVIRONMENT)) {
+ Apps.setEnvFromInputString(environment,
+ amConf.get(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_ENVIRONMENT));
+ }
+
+ Apps.addToEnvironment(environment,
+ Environment.CLASSPATH.name(),
+ Environment.PWD.$());
+
+ Apps.addToEnvironment(environment,
+ Environment.CLASSPATH.name(),
+ Environment.PWD.$() + File.separator + "*");
+
+ // Add YARN/COMMON/HDFS jars to path
+ for (String c : amConf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+ c.trim());
+ }
+
+ sleepVertex.setTaskEnvironment(environment);
+
+ if (null != amConf.get(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_JAVA_OPTS)) {
+ sleepVertex.setJavaOpts(amConf.get(
+ TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_JAVA_OPTS));
+ }
+
+ preWarmContainersDAG.addVertex(sleepVertex);
+
+ LOG.info("Starting DAG to pre-warm containers for AM using "
+ + SleepProcessor.class.getName()
+ + ", numContainers=" + numContainers
+ + ", containerResource=" + containerResource);
+ startDAG(preWarmContainersDAG.createDag(amConf));
+ return true;
+ }
+
@Override
- public void serviceStop() throws Exception {
+ public synchronized void serviceStop() throws Exception {
if (isSession) {
sessionStopped.set(true);
}
@@ -1276,12 +1412,9 @@ public class DAGAppMaster extends AbstractService {
DAGPlan dagPlan = null;
// Read the protobuf DAG
- DAGPlan.Builder dagPlanBuilder = DAGPlan.newBuilder();
dagPBBinaryStream = new FileInputStream(
TezConfiguration.TEZ_PB_PLAN_BINARY_NAME);
- dagPlanBuilder.mergeFrom(dagPBBinaryStream);
-
- dagPlan = dagPlanBuilder.build();
+ dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
startDAG(dagPlan);
@@ -1293,6 +1426,7 @@ public class DAGAppMaster extends AbstractService {
}
private void startDAG(DAGPlan dagPlan) {
+ this.state = DAGAppMasterState.RUNNING;
if (LOG.isDebugEnabled()) {
LOG.debug("Running a DAG with " + dagPlan.getVertexCount()
+ " vertices ");
@@ -1312,7 +1446,6 @@ public class DAGAppMaster extends AbstractService {
private void startDAG(DAG dag) {
currentDAG = dag;
- this.state = DAGAppMasterState.RUNNING;
// End of creating the job.
((RunningAppContext) context).setDAG(currentDAG);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 582d274..23fd394 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -525,8 +525,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
List<TezEvent> inEvents = request.getEvents();
- LOG.info("Ping from " + taskAttemptID.toString() +
- " events: " + (inEvents != null? inEvents.size() : -1));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ping from " + taskAttemptID.toString() +
+ " events: " + (inEvents != null? inEvents.size() : -1));
+ }
if(inEvents!=null && !inEvents.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
context.getEventHandler().handle(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ec37eb8..97034d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -464,6 +464,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.localResources = DagTypeConverters
.createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig()
.getLocalResourceList());
+ this.localResources.putAll(appContext.getSessionResources());
this.environment = DagTypeConverters
.createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig()
.getEnvironmentSettingList());
@@ -1052,7 +1053,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// for now, only for leaf vertices
// TODO TEZ-41 make commmitter type configurable per vertex
- if (targetVertices.isEmpty()) {
+ if (!this.additionalOutputSpecs.isEmpty()) {
committer = new MRVertexOutputCommitter();
}
try {
@@ -1608,8 +1609,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
List<TezEvent> tezEvents = rEvent.getEvents();
for(TezEvent tezEvent : tezEvents) {
- LOG.info("Vertex: " + vertex.getName() + " routing event: "
- + tezEvent.getEventType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vertex: " + vertex.getName() + " routing event: "
+ + tezEvent.getEventType());
+ }
EventMetaData sourceMeta = tezEvent.getSourceInfo();
switch(tezEvent.getEventType()) {
case DATA_MOVEMENT_EVENT:
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 4093b85..196e3d4 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -55,6 +55,7 @@ import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezSession;
import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
@@ -368,6 +369,8 @@ public class OrderedWordCount {
DAGClient dagClient;
if (useTezSession) {
+ LOG.info("Waiting for TezSession to get into ready state");
+ waitForTezSessionReady(tezSession);
LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex);
dagClient = tezSession.submitDAG(dag);
LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex);
@@ -424,4 +427,23 @@ public class OrderedWordCount {
}
}
+ private static void waitForTezSessionReady(TezSession tezSession)
+ throws IOException, TezException {
+ while (true) {
+ TezSessionStatus status = tezSession.getSessionStatus();
+ if (status.equals(TezSessionStatus.SHUTDOWN)) {
+ throw new RuntimeException("TezSession has already shutdown");
+ }
+ if (status.equals(TezSessionStatus.READY)) {
+ return;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while trying to check session status");
+ return;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
new file mode 100644
index 0000000..2e3aba0
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.processor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple sleep processor implementation that sleeps for the configured
+ * time in milliseconds.
+ *
+ * @see SleepProcessorConfig for configuring the SleepProcessor
+ */
+public class SleepProcessor implements LogicalIOProcessor {
+
+ private static final Log LOG = LogFactory.getLog(SleepProcessor.class);
+
+ private int timeToSleepMS;
+
+ @Override
+ public void initialize(TezProcessorContext processorContext)
+ throws Exception {
+ if (processorContext.getUserPayload() == null) {
+ LOG.info("No processor user payload specified"
+ + ", using default timeToSleep of 1 ms");
+ timeToSleepMS = 1;
+ } else {
+ SleepProcessorConfig cfg =
+ new SleepProcessorConfig();
+ cfg.fromUserPayload(processorContext.getUserPayload());
+ timeToSleepMS = cfg.getTimeToSleepMS();
+ }
+ LOG.info("Initialized SleepProcessor, timeToSleepMS=" + timeToSleepMS);
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs) throws Exception {
+ LOG.info("Running the Sleep Processor, sleeping for "
+ + timeToSleepMS + " ms");
+ try {
+ Thread.sleep(timeToSleepMS);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+ // Nothing to do
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Nothing to cleanup
+ }
+
+ /**
+ * Configuration for the Sleep Processor.
+ * Only configuration option is time to sleep in milliseconds.
+ */
+ public static class SleepProcessorConfig {
+ private int timeToSleepMS;
+
+ public SleepProcessorConfig() {
+ }
+
+ /**
+ * @param timeToSleepMS Time to sleep in milliseconds
+ */
+ public SleepProcessorConfig (int timeToSleepMS) {
+ this.timeToSleepMS = timeToSleepMS;
+ }
+
+ public byte[] toUserPayload() {
+ return Integer.toString(timeToSleepMS).getBytes();
+ }
+
+ public void fromUserPayload(byte[] userPayload) {
+ timeToSleepMS = Integer.valueOf(new String(userPayload)).intValue();
+ }
+
+ public int getTimeToSleepMS() {
+ return timeToSleepMS;
+ }
+ }
+}