You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ml...@apache.org on 2013/05/13 20:21:44 UTC
[1/2] TEZ-25: Change DAG representation from Configuration object to
structured protobuf message
Updated Branches:
refs/heads/TEZ-1 97e1fa967 -> cb5758b42
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java b/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
deleted file mode 100644
index eb8c781..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestVertexLocationHint {
-
- private DataInput in;
- private DataOutput out;
- private ByteArrayOutputStream bOut;
-
- @Before
- public void setup() {
- bOut = new ByteArrayOutputStream();
- out = new DataOutputStream(bOut);
- }
-
- @After
- public void teardown() {
- in = null;
- out = null;
- bOut = null;
- }
-
- @Test
- public void testNullTaskLocationHintSerDes() throws IOException {
- TaskLocationHint expected = new TaskLocationHint(null, null);
- expected.write(out);
- in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
- TaskLocationHint actual = new TaskLocationHint();
- actual.readFields(in);
- Assert.assertNull(actual.getDataLocalHosts());
- Assert.assertNull(actual.getRacks());
- }
-
- @Test
- public void testTaskLocationHintSerDes() throws IOException {
- String[] hosts = { "h1", "h2", "", null };
- String[] racks = { "r1", "r2" };
- TaskLocationHint expected = new TaskLocationHint(hosts, racks);
- expected.write(out);
- in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
- TaskLocationHint actual = new TaskLocationHint();
- actual.readFields(in);
- Assert.assertNotNull(actual.getDataLocalHosts());
- Assert.assertNotNull(actual.getRacks());
- Assert.assertArrayEquals(hosts, actual.getDataLocalHosts());
- Assert.assertArrayEquals(racks, actual.getRacks());
- }
-
- @Test
- public void testTaskLocationHintSerDes2() throws IOException {
- String[] hosts = null;
- String[] racks = { "r1", "r2" };
- TaskLocationHint expected = new TaskLocationHint(hosts, racks);
- expected.write(out);
- in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
- TaskLocationHint actual = new TaskLocationHint();
- actual.readFields(in);
- Assert.assertNull(actual.getDataLocalHosts());
- Assert.assertNotNull(actual.getRacks());
- Assert.assertArrayEquals(racks, actual.getRacks());
- }
-
- @Test
- public void testEmptyVertexLocationHintSerDes() throws IOException {
- VertexLocationHint expected = new VertexLocationHint(0);
- expected.write(out);
- in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
- VertexLocationHint actual = new VertexLocationHint();
- actual.readFields(in);
- Assert.assertEquals(0, actual.getNumTasks());
- Assert.assertNotNull(actual.getTaskLocationHints());
- Assert.assertEquals(0, actual.getTaskLocationHints().length);
- }
-
- @Test
- public void testVertexLocationHintSerDes() throws IOException {
- String[] hosts = { "h1", "h2", "", null };
- String[] racks = { "r1", "r2" };
- VertexLocationHint expected = new VertexLocationHint(4);
- expected.getTaskLocationHints()[0] = new TaskLocationHint(hosts, racks);
- expected.getTaskLocationHints()[1] = null;
- expected.getTaskLocationHints()[2] = new TaskLocationHint(null, racks);
- expected.getTaskLocationHints()[3] = new TaskLocationHint(hosts, null);
- expected.write(out);
- in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
- VertexLocationHint actual = new VertexLocationHint();
- actual.readFields(in);
-
- Assert.assertEquals(4, actual.getNumTasks());
- Assert.assertNotNull(actual.getTaskLocationHints());
- Assert.assertEquals(4, actual.getTaskLocationHints().length);
-
- Assert.assertNotNull(actual.getTaskLocationHints()[0]);
- Assert.assertNull(actual.getTaskLocationHints()[1]);
- Assert.assertNotNull(actual.getTaskLocationHints()[2]);
- Assert.assertNotNull(actual.getTaskLocationHints()[3]);
-
- Assert.assertArrayEquals(
- expected.getTaskLocationHints()[0].getDataLocalHosts(),
- actual.getTaskLocationHints()[0].getDataLocalHosts());
- Assert.assertArrayEquals(
- expected.getTaskLocationHints()[0].getRacks(),
- actual.getTaskLocationHints()[0].getRacks());
- Assert.assertNull(
- actual.getTaskLocationHints()[2].getDataLocalHosts());
- Assert.assertArrayEquals(
- expected.getTaskLocationHints()[2].getRacks(),
- actual.getTaskLocationHints()[2].getRacks());
- Assert.assertArrayEquals(
- expected.getTaskLocationHints()[3].getDataLocalHosts(),
- actual.getTaskLocationHints()[3].getDataLocalHosts());
- Assert.assertNull(
- actual.getTaskLocationHints()[3].getRacks());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 715a364..9042846 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -99,7 +99,7 @@ import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezConfiguration;
@@ -737,6 +737,14 @@ public class YARNRunner implements ClientProtocol {
List<String> vargs = new ArrayList<String>(8);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+//[Debug AppMaster] Current simplest way to attach debugger to AppMaster
+// Uncomment the following, then launch a regular job, eg
+// >hadoop jar {path}\hadoop-mapreduce-examples-3.0.0-SNAPSHOT.jar pi 2 2
+// LOG.error(" !!!!!!!!!");
+// LOG.error(" !!!!!!!!! Launching AppMaster in debug/suspend mode. Attach to port 8002");
+// LOG.error(" !!!!!!!!!");
+// vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8002,server=y,suspend=y");
+
// FIXME set up logging related properties
// TODO -Dtez.root.logger??
// MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
@@ -795,24 +803,38 @@ public class YARNRunner implements ClientProtocol {
setDAGParamsFromMRConf(dag);
- // FIXME add serialized dag conf
- DAGConfiguration dagConf = dag.serializeDag();
-
- Path dagConfFilePath = new Path(jobSubmitDir,
- TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
- FSDataOutputStream dagConfOut =
- FileSystem.create(fs, dagConfFilePath,
- new FsPermission(DAG_FILE_PERMISSION));
+ // emit protobuf DAG file style
+ JobPlan dagPB = dag.createDag();
+ FSDataOutputStream dagPBOutBinaryStream = null;
+ FSDataOutputStream dagPBOutTextStream = null;
+ Path binaryPath = new Path(jobSubmitDir, TezConfiguration.DAG_AM_PLAN_PB_BINARY);
+ Path textPath = new Path(jobSubmitDir, TezConfiguration.DAG_AM_PLAN_PB_TEXT);
try {
- dagConf.writeXml(dagConfOut);
+ //binary output
+ dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
+ new FsPermission(DAG_FILE_PERMISSION));
+ dagPB.writeTo(dagPBOutBinaryStream);
+
+ // text / human-readable output
+ dagPBOutTextStream = FileSystem.create(fs, textPath,
+ new FsPermission(DAG_FILE_PERMISSION));
+ dagPBOutTextStream.writeUTF(dagPB.toString());
} finally {
- dagConfOut.close();
+ if(dagPBOutBinaryStream != null){
+ dagPBOutBinaryStream.close();
+ }
+ if(dagPBOutTextStream != null){
+ dagPBOutTextStream.close();
+ }
}
- localResources.put(TezConfiguration.DAG_AM_PLAN_CONFIG_XML,
+
+ localResources.put(TezConfiguration.DAG_AM_PLAN_PB_BINARY,
createApplicationResource(defaultFileContext,
- dagConfFilePath, LocalResourceType.FILE));
+ binaryPath, LocalResourceType.FILE));
- // FIXME add tez conf if needed
+ localResources.put(TezConfiguration.DAG_AM_PLAN_PB_TEXT,
+ createApplicationResource(defaultFileContext,
+ textPath, LocalResourceType.FILE));
// FIXME are we using MR acls for tez?
Map<ApplicationAccessType, String> acls
[2/2] git commit: TEZ-25: Change DAG representation from
Configuration object to structured protobuf message
Posted by ml...@apache.org.
TEZ-25: Change DAG representation from Configuration object to structured protobuf message
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cb5758b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cb5758b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cb5758b4
Branch: refs/heads/TEZ-1
Commit: cb5758b424a80c5a17e0d9983a7b6ff2982dcd31
Parents: 97e1fa9
Author: mikelid <mi...@microsoft.com>
Authored: Mon May 13 11:21:27 2013 -0700
Committer: mikelid <mi...@microsoft.com>
Committed: Mon May 13 11:21:27 2013 -0700
----------------------------------------------------------------------
pom.xml | 5 +
tez-dag-api/pom.xml | 26 +
.../src/main/java/org/apache/tez/dag/api/DAG.java | 128 ++++-
.../org/apache/tez/dag/api/DAGConfiguration.java | 492 ---------------
.../org/apache/tez/dag/api/DAGLocationHint.java | 95 ---
.../org/apache/tez/dag/api/DagTypeConverters.java | 221 +++++++
.../org/apache/tez/dag/api/TezConfiguration.java | 3 +-
.../org/apache/tez/dag/api/VertexLocationHint.java | 83 +---
tez-dag-api/src/main/proto/DAGPlan.proto | 119 ++++
.../java/org/apache/tez/dag/api/TestDAGPlan.java | 82 +++
.../java/org/apache/tez/dag/app/DAGAppMaster.java | 82 ++--
.../org/apache/tez/dag/app/MRRExampleHelper.java | 33 +-
.../main/java/org/apache/tez/dag/app/dag/DAG.java | 7 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 3 +
.../org/apache/tez/dag/app/dag/impl/DAGImpl.java | 101 ++--
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 33 +-
.../apache/tez/dag/utils/TezEngineChildJVM.java | 6 +
.../apache/tez/dag/api/TestDAGLocationHint.java | 114 ----
.../apache/tez/dag/api/TestVertexLocationHint.java | 147 -----
.../java/org/apache/tez/mapreduce/YARNRunner.java | 50 +-
20 files changed, 736 insertions(+), 1094 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 586eb18..5c545c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -248,6 +248,11 @@
<artifactId>avro</artifactId>
<version>1.5.3</version>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.4.0a</version>
+ </dependency>
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag-api/pom.xml b/tez-dag-api/pom.xml
index c084f01..2b6aebb 100644
--- a/tez-dag-api/pom.xml
+++ b/tez-dag-api/pom.xml
@@ -46,6 +46,32 @@
<configuration>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <imports>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>DAGPlan.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 185f9a4..c18c720 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -18,8 +18,24 @@
package org.apache.tez.dag.api;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.*;
+import org.apache.tez.dag.api.DAGPlan.PlanKeyValuePair.Builder;
+import org.apache.tez.dag.api.DAGPlan.PlanTaskLocationHint;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.DAGPlan.*;
+
public class DAG { // FIXME rename to Topology
List<Vertex> vertices;
@@ -76,28 +92,102 @@ public class DAG { // FIXME rename to Topology
//for each vertex if not specified?
}
- // FIXME DAGConfiguration is not public API
- public DAGConfiguration serializeDag() {
- DAGConfiguration dagConf = new DAGConfiguration();
-
- dagConf.setName(name);
- dagConf.setVertices(vertices);
- dagConf.setEdgeProperties(edges);
-
- for(Vertex vertex : vertices) {
- if(vertex.getInputVertices() != null) {
- dagConf.setInputVertices(vertex.getVertexName(), vertex.getInputVertices());
- dagConf.setInputEdgeIds(vertex.getVertexName(), vertex.getInputEdgeIds());
+ // create protobuf message describing DAG
+ public JobPlan createDag(){
+ JobPlan.Builder jobBuilder = JobPlan.newBuilder();
+
+ jobBuilder.setName(this.name);
+
+ for(Vertex vertex : vertices){
+ VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
+ vertexBuilder.setName(vertex.getVertexName());
+ vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46.
+ vertexBuilder.setProcessorName(vertex.getProcessorName());
+
+ //task config
+ PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
+ Resource resource = vertex.getTaskResource();
+ taskConfigBuilder.setNumTasks(vertex.getParallelism());
+ taskConfigBuilder.setMemoryMb(resource.getMemory());
+ taskConfigBuilder.setVirtualCores(resource.getVirtualCores());
+ taskConfigBuilder.setJavaOpts(vertex.getJavaOpts());
+
+ taskConfigBuilder.setTaskModule(vertex.getVertexName());
+ PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
+ Map<String,LocalResource> lrs = vertex.getTaskLocalResources();
+ for(String key : lrs.keySet()){
+ LocalResource lr = lrs.get(key);
+ localResourcesBuilder.setName(key);
+ localResourcesBuilder.setUri(DagTypeConverters.convertToDAGPlan(lr.getResource()));
+ localResourcesBuilder.setSize(lr.getSize());
+ localResourcesBuilder.setTimeStamp(lr.getTimestamp());
+ localResourcesBuilder.setType(DagTypeConverters.convertToDAGPlan(lr.getType()));
+ localResourcesBuilder.setVisibility(DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
+ if(lr.getType() == LocalResourceType.PATTERN){
+ assert lr.getPattern() != null : "resourceType=PATTERN but pattern is null";
+ localResourcesBuilder.setPattern(lr.getPattern());
+ }
+ taskConfigBuilder.addLocalResource(localResourcesBuilder);
}
- if(vertex.getOutputVertices() != null) {
- dagConf.setOutputVertices(vertex.getVertexName(), vertex.getOutputVertices());
- dagConf.setOutputEdgeIds(vertex.getVertexName(), vertex.getOutputEdgeIds());
+
+ if(vertex.getTaskEnvironment() != null){
+ for(String key : vertex.getTaskEnvironment().keySet()){
+ PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
+ envSettingBuilder.setKey(key);
+ envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key));
+ taskConfigBuilder.addEnvironmentSetting(envSettingBuilder);
+ }
}
+
+ if(vertex.getTaskLocationsHint() != null ){
+ if(vertex.getTaskLocationsHint().getTaskLocationHints() != null){
+ for(TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()){
+ PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
+
+ if(hint.getDataLocalHosts() != null){
+ taskLocationHintBuilder.addAllHost(Arrays.asList(hint.getDataLocalHosts()));
+ }
+ if(hint.getRacks() != null){
+ taskLocationHintBuilder.addAllRack(Arrays.asList(hint.getRacks()));
+ }
+
+ vertexBuilder.addTaskLocationHint(taskLocationHintBuilder);
+ }
+ }
+ }
+
+ for(String inEdgeId : vertex.getInputEdgeIds()){
+ vertexBuilder.addInEdgeId(inEdgeId);
+ }
+
+ for(String outEdgeId : vertex.getOutputEdgeIds()){
+ vertexBuilder.addOutEdgeId(outEdgeId);
+ }
+
+ vertexBuilder.setTaskConfig(taskConfigBuilder);
+ jobBuilder.addVertex(vertexBuilder);
+ }
+
+ for(Edge edge : edges){
+ EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
+ edgeBuilder.setId(edge.getId());
+ edgeBuilder.setInputVertexName(edge.getInputVertex().getVertexName());
+ edgeBuilder.setOutputVertexName(edge.getOutputVertex().getVertexName());
+ edgeBuilder.setConnectionPattern(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().connectionPattern));
+ edgeBuilder.setSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSourceType()));
+ edgeBuilder.setInputClass(edge.getEdgeProperty().inputClass);
+ edgeBuilder.setOutputClass(edge.getEdgeProperty().outputClass);
+
+ jobBuilder.addEdge(edgeBuilder);
}
- dagConf.setConfig(config);
-
- return dagConf;
+ for(Entry<String, String> entry : this.config.entrySet()){
+ PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+ kvp.setKey(entry.getKey());
+ kvp.setValue(entry.getValue());
+ jobBuilder.addJobSetting(kvp);
+ }
+
+ return jobBuilder.build();
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
deleted file mode 100644
index 93ed118..0000000
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.TreeMap;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-// FIXME dag conf should not be public API???
-public class DAGConfiguration extends Configuration {
-
- private static final Log LOG = LogFactory.getLog(DAGConfiguration.class);
-
- public DAGConfiguration(Configuration conf) {
- super(conf);
- if (! (conf instanceof DAGConfiguration)) {
- this.reloadConfiguration();
- }
- }
-
- public DAGConfiguration() {
- super();
- }
-
- public final static String DAG = "tez.dag.";
-
- public final static String DAG_AM = DAG + "am.";
-
- public final static String VERTEX = DAG + "vertex.";
-
- public final static String TASK = DAG + "task.";
-
- public final static String TASK_ATTEMPT = DAG + "attempt.";
-
- public final static String TEZ_DAG_VERTICES = DAG + "vertices";
-
- public final static String TEZ_DAG_EDGES = DAG + "edges";
-
- public final static String EDGE = DAG + "edge.";
-
- private final static String SEPARATOR = "|";
-
- public final static String TEZ_DAG_CONFIG_KEYS = DAG + "keys";
- public final static String TEZ_DAG_CONFIG_VALUES = DAG + "values";
-
- @Private
- public void setConfig(Map<String, String> config) {
- if(!config.isEmpty()) {
- String[] key = new String[config.size()];
- String[] value = new String[config.size()];
- int i=0;
- for(Entry<String, String> entry : config.entrySet()) {
- key[i] = entry.getKey();
- value[i] = entry.getValue();
- i++;
- }
- setStrings(TEZ_DAG_CONFIG_KEYS, key);
- setStrings(TEZ_DAG_CONFIG_VALUES, value);
- }
- }
-
- @Private
- public Map<String, String> getConfig() {
- HashMap<String, String> config = new HashMap<String, String>();
- String[] key = getStrings(TEZ_DAG_CONFIG_KEYS);
- if(key != null) {
- String[] value = getStrings(TEZ_DAG_CONFIG_VALUES);
- assert value.length == key.length;
- for(int i=0; i<key.length; ++i) {
- config.put(key[i], value[i]);
- }
- }
- return config;
- }
-
- @Private
- public void setEdgeProperties(List<Edge> edges) {
- String[] edgeIds = new String[edges.size()];
- for(int i=0; i<edges.size(); ++i) {
- edgeIds[i] = edges.get(i).getId();
- }
- setStrings(TEZ_DAG_EDGES, edgeIds);
- for (Edge edge : edges) {
- setEdgeProperty(edge.getEdgeProperty(), edge.getId());
- }
- }
-
- public Map<String, EdgeProperty> getEdgeProperties() {
- String[] edgeIds = getStrings(TEZ_DAG_EDGES);
- if (edgeIds == null) {
- return new TreeMap<String, EdgeProperty>();
- }
- Map<String, EdgeProperty> edgeProperties =
- new HashMap<String, EdgeProperty>(edgeIds.length);
- for(int i=0; i<edgeIds.length; ++i) {
- edgeProperties.put(edgeIds[i], getEdgeProperty(edgeIds[i]));
- }
- return edgeProperties;
- }
-
- private void setEdgeProperty(EdgeProperty edgeProperty, String edgeId) {
- String[] edgeStrs = new String[4];
- edgeStrs[0] = edgeProperty.getConnectionPattern().name();
- edgeStrs[1] = edgeProperty.getSourceType().name();
- edgeStrs[2] = edgeProperty.inputClass;
- edgeStrs[3] = edgeProperty.outputClass;
-
- setStrings(EDGE + edgeId, edgeStrs);
- }
-
- private EdgeProperty getEdgeProperty(String edgeId) {
- String[] edgeStr = getStrings(EDGE + edgeId);
- assert edgeStr.length == 4;
- return new EdgeProperty(EdgeProperty.ConnectionPattern.valueOf(edgeStr[0]),
- EdgeProperty.SourceType.valueOf(edgeStr[1]),
- edgeStr[2],
- edgeStr[3]);
- }
-
- public final static String TEZ_DAG_VERTEX_TASKS = VERTEX + "num-tasks";
- public final static int DEFAULT_TEZ_DAG_VERTEX_TASKS = 0;
-
- public int getNumVertexTasks(String vertexName) {
- return getInt(
- TEZ_DAG_VERTEX_TASKS + "." + vertexName,
- DEFAULT_TEZ_DAG_VERTEX_TASKS);
- }
-
- public void setNumVertexTasks(String vertexName, int numTasks) {
- setInt(
- TEZ_DAG_VERTEX_TASKS + "." + vertexName,
- numTasks);
- }
-
- public final String TEZ_DAG_VERTEX_TASK_MEMORY = TASK + "memory-mb";
-
- public final int DEFAULT_TEZ_DAG_VERTEX_TASK_MEMORY = 1024;
-
- public int getVertexTaskMemory(String vertexName) {
- return getInt(
- TEZ_DAG_VERTEX_TASK_MEMORY + "." + vertexName,
- DEFAULT_TEZ_DAG_VERTEX_TASK_MEMORY);
- }
-
- public void setVertexTaskMemory(String vertexName, int memory) {
- setInt(
- TEZ_DAG_VERTEX_TASK_MEMORY + "." + vertexName,
- memory);
- }
-
- public final String TEZ_DAG_VERTEX_TASK_CPU = TASK + "cpu-vcores";
-
- public final int DEFAULT_TEZ_DAG_VERTEX_TASK_CORES = 1;
-
- public int getVertexTaskCores(String vertexName) {
- return getInt(
- TEZ_DAG_VERTEX_TASK_CPU + "." + vertexName,
- DEFAULT_TEZ_DAG_VERTEX_TASK_CORES);
- }
- public void setVertexTaskCores(String vertexName, int cores) {
- setInt(
- TEZ_DAG_VERTEX_TASK_CPU + "." + vertexName,
- cores);
- }
-
- private final String[] EMPTY = new String[0];
-
- public String[] getVertices() {
- String[] vertices = getStrings(TEZ_DAG_VERTICES, EMPTY);
- return vertices == null? EMPTY : vertices;
- }
-
- void setVertexResource(Vertex vertex) {
- Resource resource = vertex.getTaskResource();
- if (resource == null) {
- return;
- }
- setVertexTaskCores(vertex.getVertexName(), resource.getVirtualCores());
- setVertexTaskMemory(vertex.getVertexName(), resource.getMemory());
- }
-
- public Resource getVertexResource(String vertexName) {
- int memory = getVertexTaskMemory(vertexName);
- int vCores = getVertexTaskCores(vertexName);
- return BuilderUtils.newResource(memory, vCores);
- }
-
- // FIXME we are serializing YarnURL which is not same as serializing a URL
- public final String TEZ_DAG_VERTEX_TASK_LOCAL_RESOURCE = TASK + "local-resource.";
- void setVertexLocalResource(Vertex vertex) {
- Map<String,LocalResource> lrs = vertex.getTaskLocalResources();
- if (lrs == null) {
- return;
- }
- String[] lrStrs = new String[lrs.size()];
- int i=0;
- for(Map.Entry<String,LocalResource> entry : lrs.entrySet()) {
- LocalResource lr = entry.getValue();
- try {
- String lrStr = StringUtils.escapeString(entry.getKey(),
- StringUtils.ESCAPE_CHAR,
- SEPARATOR.charAt(0))
- + SEPARATOR
- + StringUtils.escapeString(
- ConverterUtils.getPathFromYarnURL(lr.getResource()).toString(),
- StringUtils.ESCAPE_CHAR,
- SEPARATOR.charAt(0))
- + SEPARATOR
- + StringUtils.escapeString(String.valueOf(lr.getSize()),
- StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0))
- + SEPARATOR
- + StringUtils.escapeString(String.valueOf(lr.getTimestamp()),
- StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0))
- + SEPARATOR
- + StringUtils.escapeString(lr.getType().name(),
- StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0))
- + SEPARATOR
- + StringUtils.escapeString(lr.getVisibility().name(),
- StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0))
- + SEPARATOR
- + StringUtils.escapeString(
- (lr.getPattern() == null ? "" : lr.getPattern()),
- StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0));
- lrStrs[i++] = StringUtils.escapeString(lrStr);
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
- }
- setStrings(TEZ_DAG_VERTEX_TASK_LOCAL_RESOURCE + vertex.getVertexName(),
- lrStrs);
- }
-
- public Map<String, LocalResource> getVertexLocalResources(String vertexName) {
- String[] lrStrs = StringUtils.split(get(
- TEZ_DAG_VERTEX_TASK_LOCAL_RESOURCE + vertexName, ""));
- Map<String, LocalResource> localResources =
- new TreeMap<String, LocalResource>();
- if (lrStrs == null) {
- return localResources;
- }
- LOG.info("XXXX Found " + lrStrs.length + " local resources");
- for (String lrStr : lrStrs) {
- String[] tokens =
- StringUtils.split(
- lrStr, StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0));
- if (tokens.length != 6
- && tokens.length != 7) {
- LOG.warn("Invalid token count in serialized LocalResource"
- + ", serializedString=" + lrStr
- + ", tokenCount=" + tokens.length);
- }
- String resourceName = tokens[0];
- LocalResource lRsrc = Records.newRecord(LocalResource.class);
- lRsrc.setResource(ConverterUtils.getYarnUrlFromPath(
- new Path(tokens[1])));
- lRsrc.setSize(Long.valueOf(tokens[2]));
- lRsrc.setTimestamp(Long.valueOf(tokens[3]));
- lRsrc.setType(LocalResourceType.valueOf(tokens[4]));
- lRsrc.setVisibility(LocalResourceVisibility.valueOf(
- tokens[5]));
- if (tokens.length == 7) {
- lRsrc.setPattern(tokens[6]);
- }
- try {
- LOG.info("XXXX Adding local resource"
- + ", vertexName=" + vertexName
- + ", resourceName=" + resourceName
- + ", resourceUrl"
- + ConverterUtils.getPathFromYarnURL(
- lRsrc.getResource()).toString());
- } catch (URISyntaxException e) {
- // Ignore
- // FIXME
- }
- localResources.put(resourceName, lRsrc);
- }
- return localResources;
- }
-
- public final String TEZ_DAG_VERTEX_TASK_ENV = TASK + "env.";
- void setVertexEnv(Vertex vertex) {
- Map<String, String> env = vertex.getTaskEnvironment();
- if (env == null) {
- return;
- }
- String[] envStrs = new String[env.size()];
- int i=0;
- for(Map.Entry<String,String> entry : env.entrySet()) {
- String envStr = entry.getKey() + SEPARATOR + entry.getValue();
- envStrs[i++] = StringUtils.escapeString(envStr);
- }
- set(TEZ_DAG_VERTEX_TASK_ENV + vertex.getVertexName(),
- StringUtils.join(",", envStrs));
- }
-
- public Map<String,String> getVertexEnv(String vertexName) {
- String[] envStrs = StringUtils.split(
- get(TEZ_DAG_VERTEX_TASK_ENV + vertexName, ""));
- Map<String,String> env = new HashMap<String,String>();
- if(envStrs == null) {
- return env;
- }
-
- LOG.info("XXXX Found " + envStrs.length + " environment");
- for(String envStr : envStrs ) {
- LOG.info("XXXX Parsing env from " + envStr);
- StringTokenizer tokenizer = new StringTokenizer (envStr, SEPARATOR);
- String envName = tokenizer.nextToken();
- String envValue = tokenizer.nextToken();
- env.put(StringUtils.unEscapeString(envName),
- StringUtils.unEscapeString(envValue));
- }
-
- return env;
- }
-
- public final String TEZ_DAG_NAME = DAG + "name";
- @Private
- public void setName(String name) {
- set(TEZ_DAG_NAME, name);
- }
-
- @Public
- @Stable
- public String getName() {
- String name = get(TEZ_DAG_NAME);
- return name;
- }
-
- @Private
- public void setVertices(List<Vertex> vertices) {
- setVertices(TEZ_DAG_VERTICES, vertices);
- for(Vertex vertex : vertices) {
- // set num tasks
- setNumVertexTasks(vertex.getVertexName(), vertex.getParallelism());
- // set resource
- setVertexResource(vertex);
- // set localResource
- setVertexLocalResource(vertex);
- // set environment
- setVertexEnv(vertex);
- // set processor name
- setVertexTaskModuleClassName(vertex);
- //set javaOpts
- setVertexJavaOpts(vertex.getVertexName(), vertex.getJavaOpts());
- }
- }
-
- public final String TEZ_DAG_VERTEX_INPUT_VERTICES = VERTEX + "input-vertices";
- @Public
- @Stable
- public String[] getInputVertices(String vertexName) {
- String[] vertices =
- getStrings(TEZ_DAG_VERTEX_INPUT_VERTICES + "." + vertexName, EMPTY);
- return vertices == null ? EMPTY : vertices;
- }
- @Private
- public void setInputVertices(String vertexName, List<Vertex> inputVertices) {
- setVertices(TEZ_DAG_VERTEX_INPUT_VERTICES + "." + vertexName,
- inputVertices);
- }
-
- private void setVertices(String key, List<Vertex> vertices) {
- String[] verticesNames = new String[vertices.size()];
- for (int i = 0; i < vertices.size(); ++i) {
- verticesNames[i] = vertices.get(i).getVertexName();
- }
- setStrings(key, verticesNames);
- }
-
- public final String TEZ_DAG_VERTEX_OUTPUT_VERTICES = VERTEX
- + "output-vertices";
- @Public
- @Stable
- public String[] getOutputVertices(String vertexName) {
- String[] vertices =
- getStrings(TEZ_DAG_VERTEX_OUTPUT_VERTICES + "." + vertexName, EMPTY);
- return vertices == null? EMPTY : vertices;
- }
- @Private
- public void setOutputVertices(String vertexName,
- List<Vertex> outputVertices) {
- setVertices(TEZ_DAG_VERTEX_OUTPUT_VERTICES + "." + vertexName,
- outputVertices);
- }
-
- public final String TEZ_DAG_VERTEX_INPUT_EDGES = VERTEX + "input-edges";
- @Public
- @Stable
- public List<String> getInputEdgeIds(String vertexName) {
- return getEdgeIds(TEZ_DAG_VERTEX_INPUT_EDGES + "." + vertexName);
- }
- public void setInputEdgeIds(String vertexName, List<String> edgeIds) {
- setEdgeIds(TEZ_DAG_VERTEX_INPUT_EDGES + "." + vertexName, edgeIds);
- }
-
- public final String TEZ_DAG_VERTEX_OUTPUT_EDGES = VERTEX + "output-edges";
- @Public
- @Stable
- public List<String> getOutputEdgeIds(String vertexName) {
- return getEdgeIds(TEZ_DAG_VERTEX_OUTPUT_EDGES + "." + vertexName);
- }
- @Private
- public void setOutputEdgeIds(String vertexName, List<String> edgeIds) {
- setEdgeIds(TEZ_DAG_VERTEX_OUTPUT_EDGES + "." + vertexName, edgeIds);
- }
-
- private List<String> getEdgeIds(String key) {
- String[] edgeIds = getStrings(key, EMPTY);
- if (edgeIds == null) {
- return new ArrayList<String>();
- }
- return Arrays.asList(edgeIds);
- }
-
- private void setEdgeIds(String key, List<String> edgeIds) {
- setStrings(key, edgeIds.toArray(new String[]{}));
- }
-
- private void setVertexTaskModuleClassName(Vertex vertex) {
- setVertexTaskModuleClassName(vertex.getVertexName(),
- vertex.getProcessorName());
- }
-
- public final String TEZ_DAG_VERTEX_TASK_MODULE= VERTEX + "task-module";
- @Private
- public String getVertexTaskModuleClassName(String vertexName) {
- return get(TEZ_DAG_VERTEX_TASK_MODULE + "." + vertexName);
- }
- @Private
- public void setVertexTaskModuleClassName(String vertexName,
- String taskModule) {
- set(TEZ_DAG_VERTEX_TASK_MODULE + "." + vertexName, taskModule);
- }
-
- public final String TEZ_DAG_VERTEX_JAVAOPTS= VERTEX + "java-opts";
- @Private
- public String getVertexJavaOpts(String vertexName) {
- String opts = get(TEZ_DAG_VERTEX_JAVAOPTS + "." + vertexName);
- return opts == null? "" : opts;
- }
-
- @Private
- public void setVertexJavaOpts(String vertexName, String javaOpts){
- set(TEZ_DAG_VERTEX_JAVAOPTS + "." + vertexName, javaOpts);
- }
-
- /// File used for storing location hints that are passed to the DAG
- public static final String DAG_LOCATION_HINT_RESOURCE_FILE =
- DAG + "location-hint-resource-file";
- public static final String DEFAULT_DAG_LOCATION_HINT_RESOURCE_FILE =
- "tezdaglocationhint.info";
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGLocationHint.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGLocationHint.java
deleted file mode 100644
index 9f48f2c..0000000
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGLocationHint.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-public class DAGLocationHint implements Writable {
-
- private Map<String, VertexLocationHint> vertexLocationHints;
-
- public DAGLocationHint() {
- vertexLocationHints = new TreeMap<String, VertexLocationHint>();
- }
-
- public Map<String, VertexLocationHint> getVertexLocationHints() {
- return vertexLocationHints;
- }
-
- public VertexLocationHint getVertexLocationHint(String vertexName) {
- return vertexLocationHints.get(vertexName);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(vertexLocationHints.size());
- for (Entry<String, VertexLocationHint> entry :
- vertexLocationHints.entrySet()) {
- Text.writeString(out, entry.getKey());
- out.writeBoolean(entry.getValue() != null);
- if (entry.getValue() != null) {
- entry.getValue().write(out);
- }
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int entryCount = in.readInt();
- vertexLocationHints = new TreeMap<String, VertexLocationHint>();
- for (int i = 0; i < entryCount; ++i) {
- String vertexName = Text.readString(in);
- if (!in.readBoolean()) {
- vertexLocationHints.put(vertexName, null);
- } else {
- VertexLocationHint hint = new VertexLocationHint();
- hint.readFields(in);
- vertexLocationHints.put(vertexName, hint);
- }
- }
- }
-
- public static DAGLocationHint initDAGDagLocationHint(
- String locationHintFile) throws IOException {
- DataInput in = new DataInputStream(new FileInputStream(locationHintFile));
- DAGLocationHint dagLocationHint = new DAGLocationHint();
- dagLocationHint.readFields(in);
- return dagLocationHint;
- }
-
- public static void writeDAGDagLocationHint(
- DAGLocationHint dagLocationHint,
- String locationHintFile) throws IOException {
- DataOutput out = new DataOutputStream(new FileOutputStream(
- locationHintFile));
- dagLocationHint.write(out);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
new file mode 100644
index 0000000..04404ba
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.DAGPlan.EdgePlan;
+import org.apache.tez.dag.api.DAGPlan.PlanKeyValuePair;
+import org.apache.tez.dag.api.DAGPlan.PlanLocalResource;
+import org.apache.tez.dag.api.DAGPlan.PlanTaskConfiguration;
+import org.apache.tez.dag.api.DAGPlan.PlanTaskLocationHint;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.DAGPlan.*;
+
+public class DagTypeConverters {
+
+ public static PlanLocalResourceVisibility convertToDAGPlan(LocalResourceVisibility visibility){
+ switch(visibility){
+ case PUBLIC : return PlanLocalResourceVisibility.PUBLIC;
+ case PRIVATE : return PlanLocalResourceVisibility.PRIVATE;
+ case APPLICATION : return PlanLocalResourceVisibility.APPLICATION;
+ default : throw new RuntimeException("unknown 'visibility'");
+ }
+ }
+
+ public static LocalResourceVisibility convertFromDAGPlan(PlanLocalResourceVisibility visibility){
+ switch(visibility){
+ case PUBLIC : return LocalResourceVisibility.PUBLIC;
+ case PRIVATE : return LocalResourceVisibility.PRIVATE;
+ case APPLICATION : return LocalResourceVisibility.APPLICATION;
+ default : throw new RuntimeException("unknown 'visibility'");
+ }
+ }
+
+ public static PlanEdgeSourceType convertToDAGPlan(SourceType sourceType){
+ switch(sourceType){
+ case STABLE : return PlanEdgeSourceType.STABLE;
+ case STABLE_PERSISTED : return PlanEdgeSourceType.STABLE_PERSISTED;
+ case STREAMING : return PlanEdgeSourceType.STREAMING;
+ default : throw new RuntimeException("unknown 'sourceType'");
+ }
+ }
+
+ public static SourceType convertFromDAGPlan(PlanEdgeSourceType sourceType){
+ switch(sourceType){
+ case STABLE : return SourceType.STABLE;
+ case STABLE_PERSISTED : return SourceType.STABLE_PERSISTED;
+ case STREAMING : return SourceType.STREAMING;
+ default : throw new RuntimeException("unknown 'sourceType'");
+ }
+ }
+
+ public static PlanEdgeConnectionPattern convertToDAGPlan(ConnectionPattern pattern){
+ switch(pattern){
+ case ONE_TO_ONE : return PlanEdgeConnectionPattern.ONE_TO_ONE;
+ case ONE_TO_ALL : return PlanEdgeConnectionPattern.ONE_TO_ALL;
+ case BIPARTITE : return PlanEdgeConnectionPattern.BIPARTITE;
+ default : throw new RuntimeException("unknown 'pattern'");
+ }
+ }
+
+ public static ConnectionPattern convertFromDAGPlan(PlanEdgeConnectionPattern pattern){
+ switch(pattern){
+ case ONE_TO_ONE : return ConnectionPattern.ONE_TO_ONE;
+ case ONE_TO_ALL : return ConnectionPattern.ONE_TO_ALL;
+ case BIPARTITE : return ConnectionPattern.BIPARTITE;
+ default : throw new IllegalArgumentException("unknown 'pattern'");
+ }
+ }
+
+ public static PlanLocalResourceType convertToDAGPlan(LocalResourceType type) {
+ switch(type){
+ case ARCHIVE : return PlanLocalResourceType.ARCHIVE;
+ case FILE : return PlanLocalResourceType.FILE;
+ case PATTERN : return PlanLocalResourceType.PATTERN;
+ default : throw new IllegalArgumentException("unknown 'type'");
+ }
+ }
+
+ public static LocalResourceType convertFromDAGPlan(PlanLocalResourceType type) {
+ switch(type){
+ case ARCHIVE : return LocalResourceType.ARCHIVE;
+ case FILE : return LocalResourceType.FILE;
+ case PATTERN : return LocalResourceType.PATTERN;
+ default : throw new IllegalArgumentException("unknown 'type'");
+ }
+ }
+
+ public static VertexLocationHint convertFromDAGPlan(
+ List<PlanTaskLocationHint> locationHints) {
+
+ List<TaskLocationHint> outputList = new ArrayList<TaskLocationHint>();
+
+ for(PlanTaskLocationHint inputHint : locationHints){
+ TaskLocationHint outputHint = new TaskLocationHint();
+ outputHint.setRacks(inputHint.getRackList().toArray(new String[inputHint.getRackList().size()]));
+ outputHint.setDataLocalHosts(inputHint.getHostList().toArray(new String[inputHint.getHostList().size()]));
+ outputList.add(outputHint);
+ }
+ return new VertexLocationHint(outputList.size(), outputList.toArray(new TaskLocationHint[outputList.size()]));
+ }
+
+ // notes re HDFS URL handling:
+ // Resource URLs in the protobuf message are strings of the form hdfs://host:port/path
+ // org.apache.hadoop.fs.Path.Path is actually a URI type that allows any scheme
+ // org.apache.hadoop.yarn.api.records.URL is a URL type used by YARN.
+ // java.net.URL cannot be used out of the box as it rejects unknown schemes such as HDFS.
+
+ public static String convertToDAGPlan(URL resource) {
+ // see above notes on HDFS URL handling
+ String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort()
+ + resource.getFile();
+ return out;
+ }
+
+ public static Map<String, LocalResource> createLocalResourceMapFromDAGPlan(
+ List<PlanLocalResource> localResourcesList) {
+ Map<String, LocalResource> map = new HashMap<String, LocalResource>();
+ for(PlanLocalResource res : localResourcesList){
+ LocalResource r = new LocalResourcePBImpl();
+
+ //NOTE: have to check every optional field in protobuf generated classes for existence before accessing
+ //else we will receive a default value back, eg ""
+ if(res.hasPattern()){
+ r.setPattern(res.getPattern());
+ }
+ r.setResource(ConverterUtils.getYarnUrlFromPath(new Path(res.getUri()))); // see above notes on HDFS URL handling
+ r.setSize(res.getSize());
+ r.setTimestamp(res.getTimeStamp());
+ r.setType(DagTypeConverters.convertFromDAGPlan(res.getType()));
+ r.setVisibility(DagTypeConverters.convertFromDAGPlan(res.getVisibility()));
+ map.put(res.getName(), r);
+ }
+ return map;
+ }
+
+ public static Map<String, String> createEnvironmentMapFromDAGPlan(
+ List<PlanKeyValuePair> environmentSettingList) {
+
+ Map<String, String> map = new HashMap<String, String>();
+ for(PlanKeyValuePair setting : environmentSettingList){
+ map.put(setting.getKey(), setting.getValue());
+ }
+
+ return map;
+ }
+
+ public static Map<String, EdgePlan> createEdgePlanMapFromDAGPlan(List<EdgePlan> edgeList){
+ Map<String, EdgePlan> edgePlanMap =
+ new HashMap<String, EdgePlan>();
+ for(EdgePlan edgePlanItem : edgeList){
+ edgePlanMap.put(edgePlanItem.getId(), edgePlanItem);
+ }
+ return edgePlanMap;
+ }
+
+ public static Map<String, EdgeProperty> createEdgePropertyMapFromDAGPlan(
+ List<EdgePlan> edgeList) {
+
+ Map<String, EdgeProperty> map = new HashMap<String, EdgeProperty>();
+ for(EdgePlan edge: edgeList){
+ map.put(edge.getId(),
+ new EdgeProperty(
+ convertFromDAGPlan(edge.getConnectionPattern()),
+ convertFromDAGPlan(edge.getSourceType()),
+ edge.getInputClass(),
+ edge.getOutputClass()
+ )
+ );
+ }
+
+ return map;
+ }
+
+ public static Resource CreateResourceRequestFromTaskConfig(
+ PlanTaskConfiguration taskConfig) {
+ return BuilderUtils.newResource(taskConfig.getMemoryMb(), taskConfig.getVirtualCores());
+ }
+
+ public static Map<String, String> createSettingsMapFromDAGPlan(
+ List<PlanKeyValuePair> settingList) {
+ Map<String, String> map = new HashMap<String, String>();
+ for(PlanKeyValuePair setting: settingList){
+ map.put(setting.getKey(), setting.getValue());
+ }
+ return map;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index be03cd8..597fb2f 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -98,5 +98,6 @@ private static final String TEZ_CONF_DIR_ENV = "TEZ_CONF_DIR";
TEZ_HOME_ENV + "/lib/*"
};
- public static final String DAG_AM_PLAN_CONFIG_XML = "tez-dag.xml";
+ public static final String DAG_AM_PLAN_PB_BINARY = "tez-dag.pb";
+ public static final String DAG_AM_PLAN_PB_TEXT = "tez-dag.pb.txt";
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
index 8571252..381b4ba 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
@@ -18,14 +18,7 @@
package org.apache.tez.dag.api;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-public class VertexLocationHint implements Writable {
+public class VertexLocationHint {
private int numTasks;
private TaskLocationHint[] taskLocationHints;
@@ -56,7 +49,7 @@ public class VertexLocationHint implements Writable {
this.taskLocationHints = taskLocationHints;
}
- public static class TaskLocationHint implements Writable {
+ public static class TaskLocationHint {
// Host names if any to be used
private String[] hosts;
@@ -84,77 +77,5 @@ public class VertexLocationHint implements Writable {
public void setRacks(String[] racks) {
this.racks = racks;
}
-
- private void writeStringArray(DataOutput out, String[] array)
- throws IOException {
- if (array == null) {
- out.writeInt(-1);
- return;
- }
- out.writeInt(array.length);
- for (String entry : array) {
- out.writeBoolean(entry != null);
- if (entry != null) {
- Text.writeString(out, entry);
- }
- }
- }
-
- private String[] readStringArray(DataInput in)
- throws IOException {
- int arrayLen = in.readInt();
- if (arrayLen == -1) {
- return null;
- }
- String[] array = new String[arrayLen];
- for (int i = 0; i < arrayLen; ++i) {
- if (!in.readBoolean()) {
- array[i] = null;
- } else {
- array[i] = Text.readString(in);
- }
- }
- return array;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- writeStringArray(out, hosts);
- writeStringArray(out, racks);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- hosts = readStringArray(in);
- racks = readStringArray(in);
- }
-
-
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(numTasks);
- for (int i = 0; i < numTasks; ++i) {
- out.writeBoolean(taskLocationHints[i] != null);
- if (taskLocationHints[i] != null) {
- taskLocationHints[i].write(out);
- }
- }
}
-
- @Override
- public void readFields(DataInput in) throws IOException {
- numTasks = in.readInt();
- taskLocationHints = new TaskLocationHint[numTasks];
- for (int i = 0; i < numTasks; ++i) {
- if (!in.readBoolean()) {
- taskLocationHints[i] = null;
- } else {
- taskLocationHints[i] = new TaskLocationHint(null, null);
- taskLocationHints[i].readFields(in);
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/proto/DAGPlan.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGPlan.proto b/tez-dag-api/src/main/proto/DAGPlan.proto
new file mode 100644
index 0000000..38e8971
--- /dev/null
+++ b/tez-dag-api/src/main/proto/DAGPlan.proto
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.dag.api";
+option java_outer_classname = "DAGPlan";
+option java_generate_equals_and_hash = true;
+package org.apache.tez.dag.api;
+
+// Many of these types have a dual in the Tez-api. To reduce confusion, these types have prefix or suffix
+// of "Plan" to indicate they are to be used in the dag-plan.
+// The big types use a suffix: JobPlan, VertexPlan, EdgePlan
+// --> these get more direct use in the runtime and the naming is natural.
+// The enums and utility types use prefix: PlanVertexType, PlanEdgeConnectionPaatern, etc
+// --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable.
+
+enum PlanVertexType {
+ INPUT = 0;
+ NORMAL = 1;
+ OUTPUT = 2;
+}
+
+enum PlanEdgeConnectionPattern {
+ ONE_TO_ONE = 0;
+ ONE_TO_ALL = 1;
+ BIPARTITE = 2;
+}
+
+enum PlanEdgeSourceType {
+ STABLE = 0;
+ STABLE_PERSISTED = 1;
+ STREAMING = 2;
+}
+
+message PlanKeyValuePair {
+ required string key = 1;
+ required string value = 2;
+}
+
+enum PlanLocalResourceType {
+ FILE = 0;
+ ARCHIVE = 1;
+ PATTERN = 2;
+}
+
+enum PlanLocalResourceVisibility {
+ PUBLIC = 0;
+ PRIVATE = 1;
+ APPLICATION = 2;
+}
+
+message PlanLocalResource {
+ required string name = 1;
+ required string uri = 2;
+ required int64 size = 3;
+ required int64 timeStamp = 4;
+ required PlanLocalResourceType type = 5;
+ required PlanLocalResourceVisibility visibility = 6;
+ optional string pattern = 7; // only used if type=PATTERN
+}
+
+// Each taskLocationHint represents a single split in in the input.
+// It is the list of [{rack,machines}] that host a replica of each particular split.
+// For now it is represented as pair-of-arrays rather than array-of-pairs.
+message PlanTaskLocationHint {
+ repeated string rack = 1;
+ repeated string host = 2;
+}
+
+message PlanTaskConfiguration {
+ required int32 numTasks = 1;
+ required int32 memoryMb = 2;
+ required int32 virtualCores = 3;
+ required string javaOpts = 4;
+ required string taskModule = 5;
+ repeated PlanLocalResource localResource = 6;
+ repeated PlanKeyValuePair environmentSetting = 8;
+}
+
+message VertexPlan {
+ required string name = 1;
+ required PlanVertexType type = 2;
+ optional string processorName = 3;
+ required PlanTaskConfiguration taskConfig = 4;
+ repeated PlanTaskLocationHint taskLocationHint = 7;
+ repeated string inEdgeId = 8;
+ repeated string outEdgeId = 9;
+}
+
+message EdgePlan {
+ required string id = 1;
+ required string inputVertexName = 2;
+ required string outputVertexName = 3;
+ required PlanEdgeConnectionPattern connectionPattern = 4;
+ required PlanEdgeSourceType sourceType = 5;
+ required string inputClass = 6;
+ required string outputClass = 7;
+}
+
+message JobPlan {
+ required string name = 1;
+ repeated VertexPlan vertex = 3;
+ repeated EdgePlan edge = 4;
+ repeated PlanKeyValuePair jobSetting = 5;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
new file mode 100644
index 0000000..f89a73e
--- /dev/null
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.DAGPlan.*;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+// based on TestDAGLocationHint
+public class TestDAGPlan {
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder(); //TODO: doesn't seem to be deleting this folder automatically as expected.
+
+ @Test
+ public void testBasicJobPlanSerde() throws IOException {
+
+ JobPlan job = JobPlan.newBuilder()
+ .setName("test")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(PlanTaskLocationHint.newBuilder().addHost("machineName").addRack("rack1").build())
+ .setTaskConfig(
+ DAGPlan.PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x.y")
+ .build())
+ .build())
+ .build();
+ File file = tempFolder.newFile("jobPlan");
+ FileOutputStream outStream = null;
+ try {
+ outStream = new FileOutputStream(file);
+ job.writeTo(outStream);
+ }
+ finally {
+ if(outStream != null){
+ outStream.close();
+ }
+ }
+
+ JobPlan inJob;
+ FileInputStream inputStream;
+ try {
+ inputStream = new FileInputStream(file);
+ inJob = JobPlan.newBuilder().mergeFrom(inputStream).build();
+ }
+ finally {
+ outStream.close();
+ }
+
+ Assert.assertEquals(job, inJob);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 37587e4..57003c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
@@ -62,8 +63,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.dag.api.DAGConfiguration;
-import org.apache.tez.dag.api.DAGLocationHint;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
+import org.apache.tez.dag.api.DAGPlan.VertexPlan;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.client.ClientService;
import org.apache.tez.dag.app.client.impl.TezClientService;
@@ -131,7 +133,7 @@ public class DAGAppMaster extends CompositeService {
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private Clock clock;
- private final DAGConfiguration dagPlan;
+ private final JobPlan jobPlan;
private long dagsStartTime;
private final long startTime;
private final long appSubmitTime;
@@ -170,8 +172,6 @@ public class DAGAppMaster extends CompositeService {
private TaskSchedulerEventHandler taskSchedulerEventHandler;
private HistoryEventHandler historyEventHandler;
- private DAGLocationHint dagLocationHint;
-
private DAGAppMasterState state;
private DAG dag;
@@ -180,16 +180,16 @@ public class DAGAppMaster extends CompositeService {
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
- long appSubmitTime, DAGConfiguration dagPlan) {
+ long appSubmitTime, JobPlan dagPB) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
- new SystemClock(), appSubmitTime, dagPlan);
+ new SystemClock(), appSubmitTime, dagPB);
}
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
- Clock clock, long appSubmitTime, DAGConfiguration dagPlan) {
+ Clock clock, long appSubmitTime, JobPlan dagPB) {
super(DAGAppMaster.class.getName());
- this.dagPlan = dagPlan;
+ this.jobPlan = dagPB;
this.clock = clock;
this.startTime = clock.getTime();
this.appSubmitTime = appSubmitTime;
@@ -215,13 +215,12 @@ public class DAGAppMaster extends CompositeService {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf);
- setupDAGLocationHint(dagPlan);
context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs
// for an app later
- appName = dagPlan.getName();
+ appName = jobPlan.getName();
dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
@@ -521,11 +520,11 @@ public class DAGAppMaster extends CompositeService {
// }
/** Create and initialize (but don't start) a single dag. */
- protected DAG createDAG(DAGConfiguration dagPlan) {
+ protected DAG createDAG(JobPlan dagPB) {
// create single job
DAG newDag =
- new DAGImpl(dagId, appAttemptID, conf, dagPlan, dispatcher.getEventHandler(),
+ new DAGImpl(dagId, appAttemptID, conf, dagPB, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
// TODO Recovery
//completedTasksFromPreviousRun,
@@ -534,7 +533,7 @@ public class DAGAppMaster extends CompositeService {
//committer, newApiCommitter,
currentUser.getShortUserName(), appSubmitTime,
//amInfos,
- taskHeartbeatHandler, context, dagLocationHint);
+ taskHeartbeatHandler, context);
((RunningAppContext) context).setDAG(newDag);
dispatcher.register(DAGFinishEvent.Type.class,
@@ -578,23 +577,6 @@ public class DAGAppMaster extends CompositeService {
}
}
- protected void setupDAGLocationHint(DAGConfiguration conf) {
- try {
- String dagLocationHintFile =
- conf.get(DAGConfiguration.DAG_LOCATION_HINT_RESOURCE_FILE,
- DAGConfiguration.DEFAULT_DAG_LOCATION_HINT_RESOURCE_FILE);
- File f = new File(dagLocationHintFile);
- if (f.exists()) {
- this.dagLocationHint = DAGLocationHint.initDAGDagLocationHint(
- dagLocationHintFile);
- } else {
- this.dagLocationHint = new DAGLocationHint();
- }
- } catch (IOException e) {
- throw new YarnException(e);
- }
- }
-
protected void addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
@@ -846,7 +828,7 @@ public class DAGAppMaster extends CompositeService {
*/
// /////////////////// Create the job itself.
- dag = createDAG(dagPlan);
+ dag = createDAG(jobPlan);
// End of creating the job.
@@ -973,27 +955,41 @@ public class DAGAppMaster extends CompositeService {
// Default to running mr if nothing specified.
// TODO change this once the client is ready.
String type;
- DAGConfiguration dagPlan = null;
TezConfiguration conf = new TezConfiguration(new YarnConfiguration());
+
+ JobPlan jobPlan = null;
if (cliParser.hasOption(OPT_PREDEFINED)) {
LOG.info("Running with PreDefined configuration");
type = cliParser.getOptionValue(OPT_PREDEFINED, "mr");
LOG.info("Running job type: " + type);
if (type.equals("mr")) {
- dagPlan = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMR();
+ jobPlan = MRRExampleHelper.createDAGConfigurationForMR();
} else if (type.equals("mrr")) {
- dagPlan = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMRR();
+ jobPlan = MRRExampleHelper.createDAGConfigurationForMRR();
+ }
+ }
+ else {
+ // Read the protobuf DAG
+ JobPlan.Builder dagPlanBuilder = JobPlan.newBuilder();
+ FileInputStream dagPBBinaryStream = null;
+ try {
+ dagPBBinaryStream = new FileInputStream(TezConfiguration.DAG_AM_PLAN_PB_BINARY);
+ dagPlanBuilder.mergeFrom(dagPBBinaryStream);
}
- } else {
- dagPlan = new DAGConfiguration();
- dagPlan.addResource(TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
+ finally {
+ if(dagPBBinaryStream != null){
+ dagPBBinaryStream.close();
+ }
+ }
+
+ jobPlan = dagPlanBuilder.build();
}
LOG.info("XXXX Running a DAG with "
- + dagPlan.getVertices().length + " vertices ");
- for (String v : dagPlan.getVertices()) {
- LOG.info("XXXX DAG has vertex " + v);
+ + jobPlan.getVertexCount() + " vertices ");
+ for (VertexPlan v : jobPlan.getVertexList()) {
+ LOG.info("XXXX DAG has vertex " + v.getName());
}
String jobUserName = System
@@ -1004,7 +1000,7 @@ public class DAGAppMaster extends CompositeService {
// the objects myself.
conf.setBoolean("fs.automatic.close", false);
- Map<String, String> config = dagPlan.getConfig();
+ Map<String, String> config = DagTypeConverters.createSettingsMapFromDAGPlan(jobPlan.getJobSettingList());
for(Entry<String, String> entry : config.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
@@ -1012,7 +1008,7 @@ public class DAGAppMaster extends CompositeService {
DAGAppMaster appMaster =
new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
- Integer.parseInt(nodeHttpPortString), appSubmitTime, dagPlan);
+ Integer.parseInt(nodeHttpPortString), appSubmitTime, jobPlan);
ShutdownHookManager.get().addShutdownHook(
new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
index 1b69919..b50887a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
@@ -17,7 +17,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
@@ -84,7 +84,10 @@ public class MRRExampleHelper {
return resourceNames;
}
- static Configuration createDAGConfigurationForMRR() throws IOException {
+ // TODO: these preconfigured jobs seem to require User and perhaps some other work.
+ // -> not tested with new DagPB system.
+
+ static JobPlan createDAGConfigurationForMRR() throws IOException {
org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
Vertex mapVertex = new Vertex("map",
"org.apache.tez.mapreduce.task.InitialTask", 6);
@@ -143,16 +146,15 @@ public class MRRExampleHelper {
dag.addEdge(edge1);
dag.addEdge(edge2);
dag.verify();
- DAGConfiguration dagConf = dag.serializeDag();
-
- dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
- dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
-
- return dagConf;
+ dag.addConfiguration(MRJobConfig.MAP_SPECULATIVE, new Boolean(false).toString());
+ dag.addConfiguration(MRJobConfig.REDUCE_SPECULATIVE, new Boolean(false).toString());
+
+ JobPlan dagPB = dag.createDag();
+ return dagPB;
}
// TODO remove once client is in place
- static Configuration createDAGConfigurationForMR() throws IOException {
+ static JobPlan createDAGConfigurationForMR() throws IOException {
org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
Vertex mapVertex = new Vertex("map",
"org.apache.tez.mapreduce.task.InitialTask", 6);
@@ -194,12 +196,13 @@ public class MRRExampleHelper {
dag.addVertex(reduceVertex);
dag.addEdge(edge);
dag.verify();
- DAGConfiguration dagConf = dag.serializeDag();
-
- dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
- dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
-
- return dagConf;
+
+ dag.addConfiguration(MRJobConfig.MAP_SPECULATIVE, new Boolean(false).toString());
+ dag.addConfiguration(MRJobConfig.REDUCE_SPECULATIVE, new Boolean(false).toString());
+
+ JobPlan dagPB = dag.createDag();
+
+ return dagPB;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 03bf570..39cfc1a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.engine.records.TezDAGID;
@@ -60,8 +60,8 @@ public interface DAG {
TezConfiguration getConf();
- DAGConfiguration getDagPlan();
-
+ JobPlan getJobPlan();
+
/**
* @return the ACLs for this job for each type of JobACL given.
*/
@@ -75,5 +75,4 @@ public interface DAG {
boolean checkAccess(UserGroupInformation callerUGI, ApplicationAccessType jobOperation);
- VertexLocationHint getVertexLocationHint(TezVertexID vertexId);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 5d8b8f7..8d9fbc3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -28,6 +28,7 @@ import org.apache.tez.common.InputSpec;
import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.DAGPlan.VertexPlan;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import org.apache.tez.engine.records.TezTaskID;
import org.apache.tez.engine.records.TezVertexID;
@@ -39,6 +40,8 @@ import org.apache.tez.engine.records.TezVertexID;
public interface Vertex extends Comparable<Vertex> {
TezVertexID getVertexId();
+ public VertexPlan getVertexPlan();
+
int getDistanceFromRoot();
String getName();
VertexState getState();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0e1206f..2e20692 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -53,13 +52,15 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAGConfiguration;
-import org.apache.tez.dag.api.DAGLocationHint;
+import org.apache.tez.dag.api.DAGPlan.EdgePlan;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
+import org.apache.tez.dag.api.DAGPlan.VertexPlan;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
@@ -124,16 +125,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final long appSubmitTime;
private final AppContext appContext;
- volatile Map<TezVertexID, Vertex> vertices =
- new HashMap<TezVertexID, Vertex>();
- private Map<String, EdgeProperty> edgeProperties =
- new HashMap<String, EdgeProperty>();
+ volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
+ private Map<String, EdgeProperty> edges = new HashMap<String, EdgeProperty>();
private TezCounters dagCounters = new TezCounters();
private Object fullCountersLock = new Object();
private TezCounters fullCounters = null;
public final TezConfiguration conf;
- public final DAGConfiguration dagPlan;
+ private final JobPlan jobPlan;
//fields initialized in init
private FileSystem fs;
@@ -151,9 +150,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private static final DAGSchedulerUpdateTransition
DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition();
- // Location hints for all vertices in DAG
- private final DAGLocationHint dagLocationHint;
-
protected static final
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
stateMachineFactory
@@ -315,7 +311,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
public DAGImpl(TezDAGID dagId, ApplicationAttemptId applicationAttemptId,
TezConfiguration conf,
- DAGConfiguration dagPlan,
+ JobPlan jobPlan,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
@@ -327,14 +323,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// TODO Recovery
//List<AMInfo> amInfos,
TaskHeartbeatHandler thh,
- AppContext appContext,
- DAGLocationHint dagLocationHint) {
+ AppContext appContext) {
this.applicationAttemptId = applicationAttemptId;
this.dagId = dagId;
- this.dagPlan = dagPlan;
+ this.jobPlan = jobPlan;
this.conf = conf;
- this.dagName = (dagPlan.getName() != null) ? dagPlan.getName() :
- "<missing app name>";
+ this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
+
this.userName = appUserName;
// TODO Metrics
//this.metrics = metrics;
@@ -357,8 +352,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
this.aclsManager = new ApplicationACLsManager(conf);
- this.dagLocationHint = dagLocationHint;
-
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
@@ -380,8 +373,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
@Override
- public DAGConfiguration getDagPlan() {
- return dagPlan;
+ public JobPlan getJobPlan() {
+ return jobPlan;
}
EventHandler getEventHandler() {
@@ -812,18 +805,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
*/
// create the vertices
- String[] vertexNames = dag.getDagPlan().getVertices();
- dag.numVertices = vertexNames.length;
+ dag.numVertices = dag.getJobPlan().getVertexCount();
for (int i=0; i < dag.numVertices; ++i) {
- VertexImpl v = createVertex(dag, vertexNames[i], i);
+ String vertexName = dag.getJobPlan().getVertex(i).getName();
+ VertexImpl v = createVertex(dag, vertexName, i);
dag.addVertex(v);
}
- dag.edgeProperties = dag.getDagPlan().getEdgeProperties();
-
+ dag.edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+ Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+
// setup the dag
for (Vertex v : dag.vertices.values()) {
- parseVertexEdges(dag, v);
+ parseVertexEdges(dag, edgePlans, v);
}
dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
@@ -846,39 +840,43 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+
+ VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
+ VertexLocationHint vertexLocationHint = DagTypeConverters.convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
+
return new VertexImpl(
- vertexId, vertexName, dag.conf,
+ vertexId, vertexPlan, vertexName, dag.conf,
dag.eventHandler, dag.taskAttemptListener,
dag.jobToken, dag.fsTokens, dag.clock,
dag.taskHeartbeatHandler, dag.appContext,
- dag.dagLocationHint.getVertexLocationHint(vertexName));
+ vertexLocationHint);
}
- private void parseVertexEdges(DAGImpl dag, Vertex vertex) {
- String[] inVerticesNames =
- dag.getDagPlan().getInputVertices(vertex.getName());
- List<String> inEdges =
- dag.getDagPlan().getInputEdgeIds(vertex.getName());
+ // hooks up this VertexImpl to input and output EdgeProperties
+ private void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
+ VertexPlan vertexPlan = vertex.getVertexPlan();
+
Map<Vertex, EdgeProperty> inVertices =
new HashMap<Vertex, EdgeProperty>();
- for (int i=0; i < inVerticesNames.length; ++i) {
- String vertexName = inVerticesNames[i];
- inVertices.put(dag.getVertex(vertexName),
- dag.edgeProperties.get(inEdges.get(i)));
- }
- vertex.setInputVertices(inVertices);
- String[] outVerticesNames =
- dag.getDagPlan().getOutputVertices(vertex.getName());
- List<String> outEdges =
- dag.getDagPlan().getOutputEdgeIds(vertex.getName());
Map<Vertex, EdgeProperty> outVertices =
new HashMap<Vertex, EdgeProperty>();
- for (int i=0; i < outVerticesNames.length; ++i) {
- String vertexName = outVerticesNames[i];
- outVertices.put(dag.getVertex(vertexName),
- dag.edgeProperties.get(outEdges.get(i)));
+
+ for(String inEdgeId : vertexPlan.getInEdgeIdList()){
+ EdgePlan edgePlan = edgePlans.get(inEdgeId);
+ Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
+ EdgeProperty edgeProp = dag.edges.get(inEdgeId);
+ inVertices.put(inVertex, edgeProp);
}
+
+ for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
+ EdgePlan edgePlan = edgePlans.get(outEdgeId);
+ Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
+ EdgeProperty edgeProp = dag.edges.get(outEdgeId);
+ outVertices.put(outVertex, edgeProp);
+ }
+
+ vertex.setInputVertices(inVertices);
vertex.setOutputVertices(outVertices);
}
@@ -1145,11 +1143,4 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
job.finished(DAGState.ERROR);
}
}
-
- @Override
- public VertexLocationHint getVertexLocationHint(TezVertexID vertexId) {
- return dagLocationHint.getVertexLocationHint(
- getVertex(vertexId).getName());
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 80de958..a423db1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -55,7 +55,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.tez.common.InputSpec;
import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGPlan.VertexPlan;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
import org.apache.tez.dag.api.EdgeProperty;
@@ -324,7 +325,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private Credentials fsTokens;
private Token<JobTokenIdentifier> jobToken;
- private final TezVertexID vertexId;
+ private final TezVertexID vertexId; //runtime assigned id.
+ private final VertexPlan vertexPlan;
private final String vertexName;
private final String processorName;
@@ -341,7 +343,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private Map<String, String> environment;
private String javaOpts;
- public VertexImpl(TezVertexID vertexId, String vertexName,
+ public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, String vertexName,
TezConfiguration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
@@ -353,6 +355,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
TaskHeartbeatHandler thh,
AppContext appContext, VertexLocationHint vertexLocationHint) {
this.vertexId = vertexId;
+ this.vertexPlan = vertexPlan;
this.vertexName = vertexName;
this.conf = conf;
//this.metrics = metrics;
@@ -373,12 +376,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.committer = new NullVertexOutputCommitter();
this.vertexLocationHint = vertexLocationHint;
- this.taskResource = getDAGPlan().getVertexResource(getName());
- this.processorName = getDAGPlan().getVertexTaskModuleClassName(getName());
- this.localResources = getDAGPlan().getVertexLocalResources(getName());
- this.environment = getDAGPlan().getVertexEnv(getName());
-
- this.javaOpts = getDAGPlan().getVertexJavaOpts(getName());
+ this.taskResource = DagTypeConverters.CreateResourceRequestFromTaskConfig(vertexPlan.getTaskConfig());
+ this.processorName = vertexPlan.hasProcessorName() ? vertexPlan.getProcessorName() : null;
+ this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig().getLocalResourceList());
+ this.environment = DagTypeConverters.createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig().getEnvironmentSettingList());
+ this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
// This "this leak" is okay because the retained pointer is in an
// instance variable.
@@ -395,6 +397,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
+ public VertexPlan getVertexPlan() {
+ return vertexPlan;
+ }
+
+ @Override
public int getDistanceFromRoot() {
return distanceFromRoot;
}
@@ -727,7 +734,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// TODODAGAM
// TODO: Splits?
- vertex.numTasks = vertex.getDAGPlan().getNumVertexTasks(vertex.getName());
+
+ vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
+
/*
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
@@ -1217,10 +1226,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return appContext.getDAG();
}
- DAGConfiguration getDAGPlan() {
- return getDAG().getDagPlan();
- }
-
// TODO Eventually remove synchronization.
@Override
public synchronized List<InputSpec> getInputSpecList() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
index 331e2a3..b3db770 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
@@ -82,6 +82,12 @@ public class TezEngineChildJVM {
//set custom javaOpts
vargs.add(javaOpts);
+//[Debug Task] Current simplest way to attach debugger to Tez Child Task
+// Uncomment the following, then launch a regular job
+// Works best on one-box configured with a single container (hence one task at a time).
+// LOG.error(" !!!!!!!!! Launching Child-Task in debug/suspend mode. Attach to port 8003 !!!!!!!!");
+// vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8003,server=y,suspend=y");
+
Path childTmpDir = new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java b/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java
deleted file mode 100644
index 3deaee7..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestDAGLocationHint {
-
- private DataInput in;
- private DataOutput out;
- private ByteArrayOutputStream bOut;
-
- @Before
- public void setup() {
- bOut = new ByteArrayOutputStream();
- out = new DataOutputStream(bOut);
- }
-
- @After
- public void teardown() {
- in = null;
- out = null;
- bOut = null;
- }
-
- @Test
- public void testNullDAGLocationHintSerDes() throws IOException {
- DAGLocationHint expected = new DAGLocationHint();
- expected.write(out);
- in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
- DAGLocationHint actual = new DAGLocationHint();
- actual.readFields(in);
- Assert.assertNotNull(actual.getVertexLocationHints());
- Assert.assertEquals(0, actual.getVertexLocationHints().size());
- }
-
- @Test
- public void testDAGLocationHintSerDes() throws IOException {
- String[] hosts = { "h1", "h2", "", null };
- String[] racks = { "r1", "r2" };
-
- VertexLocationHint vertexLocationHint = new VertexLocationHint(4);
- vertexLocationHint.getTaskLocationHints()[0] =
- new TaskLocationHint(hosts, racks);
- DAGLocationHint expected = new DAGLocationHint();
- expected.getVertexLocationHints().put("v1", null);
- expected.getVertexLocationHints().put("v2", new VertexLocationHint());
- expected.getVertexLocationHints().put("v3", vertexLocationHint);
- expected.write(out);
-
- in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
- DAGLocationHint actual = new DAGLocationHint();
- actual.readFields(in);
- Assert.assertNotNull(actual.getVertexLocationHints());
- Assert.assertEquals(3, actual.getVertexLocationHints().size());
-
- Assert.assertNull(actual.getVertexLocationHint("v1"));
- Assert.assertNotNull(actual.getVertexLocationHint("v2"));
- Assert.assertNotNull(actual.getVertexLocationHint("v3"));
-
- Assert.assertEquals(0, actual.getVertexLocationHint("v2").getNumTasks());
- Assert.assertEquals(0,
- actual.getVertexLocationHint("v2").getTaskLocationHints().length);
-
- Assert.assertEquals(4, actual.getVertexLocationHint("v3").getNumTasks());
- Assert.assertEquals(4,
- actual.getVertexLocationHint("v3").getTaskLocationHints().length);
- Assert.assertNotNull(
- actual.getVertexLocationHint("v3").getTaskLocationHints()[0]);
- Assert.assertArrayEquals(racks,
- actual.getVertexLocationHint("v3").getTaskLocationHints()[0].
- getRacks());
- Assert.assertArrayEquals(hosts,
- actual.getVertexLocationHint("v3").getTaskLocationHints()[0].
- getDataLocalHosts());
- Assert.assertNull(
- actual.getVertexLocationHint("v3").getTaskLocationHints()[1]);
- Assert.assertNull(
- actual.getVertexLocationHint("v3").getTaskLocationHints()[2]);
- Assert.assertNull(
- actual.getVertexLocationHint("v3").getTaskLocationHints()[3]);
- }
-
-
-}
-