You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/23 07:25:56 UTC
git commit: TEZ-1433. Invalid credentials can be used when a DAG is
submitted to a session which has timed out (bikas)
Repository: tez
Updated Branches:
refs/heads/master 9159e1170 -> 06fa79ae5
TEZ-1433. Invalid credentials can be used when a DAG is submitted to a session which has timed out (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/06fa79ae
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/06fa79ae
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/06fa79ae
Branch: refs/heads/master
Commit: 06fa79ae599187d7976ed5e9cccf9c66fc91eff7
Parents: 9159e11
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Sep 22 22:25:48 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Sep 22 22:25:48 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/tez/client/TezClientUtils.java | 49 ++-----
.../main/java/org/apache/tez/dag/api/DAG.java | 135 +++++++++---------
.../apache/tez/dag/api/DagTypeConverters.java | 29 ++++
tez-api/src/main/proto/DAGApiRecords.proto | 1 +
.../org/apache/tez/client/TestTezClient.java | 6 +-
.../org/apache/tez/dag/api/TestDAGPlan.java | 8 +-
.../org/apache/tez/dag/api/TestDAGVerify.java | 79 ++++++-----
.../java/org/apache/tez/dag/app/dag/DAG.java | 2 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 11 ++
.../app/rm/container/AMContainerHelpers.java | 9 +-
.../dag/app/rm/container/AMContainerImpl.java | 8 +-
.../apache/tez/dag/app/MockDAGAppMaster.java | 18 ++-
.../tez/dag/app/TestMockDAGAppMaster.java | 138 +++++++++++++++++++
.../tez/dag/app/dag/impl/TestDAGImpl.java | 2 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 2 +-
.../tez/dag/history/utils/TestDAGUtils.java | 2 +-
17 files changed, 348 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f3b2ed0..998bb03 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -57,6 +57,8 @@ ALL CHANGES
Windows
TEZ-1554. Failing tests in TestMRHelpers related to environment on Windows
TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness
+ TEZ-1433. Invalid credentials can be used when a DAG is submitted to a
+ session which has timed out
Release 0.5.0: 2014-09-03
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 917fcff..1e01138 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -334,17 +334,13 @@ public class TezClientUtils {
* @throws IOException
*/
@Private
- static void setupDAGCredentials(DAG dag, Credentials sessionCredentials,
+ static Credentials setupDAGCredentials(DAG dag, Credentials sessionCredentials,
Configuration conf) throws IOException {
Preconditions.checkNotNull(sessionCredentials);
TezCommonUtils.logCredentials(LOG, sessionCredentials, "session");
- Credentials dagCredentials = dag.getCredentials();
- if (dagCredentials == null) {
- dagCredentials = new Credentials();
- dag.setCredentials(dagCredentials);
- }
+ Credentials dagCredentials = new Credentials();
// All session creds are required for the DAG.
dagCredentials.mergeAll(sessionCredentials);
@@ -361,6 +357,10 @@ public class TezClientUtils {
lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource()));
}
}
+
+ for (LocalResource lr: dag.getTaskLocalFiles().values()) {
+ lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource()));
+ }
Path[] paths = lrPaths.toArray(new Path[lrPaths.size()]);
TokenCache.obtainTokensForFileSystems(dagCredentials, paths, conf);
@@ -368,6 +368,8 @@ public class TezClientUtils {
} catch (URISyntaxException e) {
throw new IOException(e);
}
+
+ return dagCredentials;
}
@Private
@@ -622,35 +624,12 @@ public class TezClientUtils {
}
- static void updateDAGVertices(DAG dag, AMConfiguration amConfig,
- Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
- Credentials credentials) throws IOException {
- setupDAGCredentials(dag, credentials, amConfig.getTezConfiguration());
- for (Vertex v : dag.getVertices()) {
- if (tezJarResources != null) {
- v.getTaskLocalFiles().putAll(tezJarResources);
- }
- v.getTaskLocalFiles().put(TezConstants.TEZ_PB_BINARY_CONF_NAME,
- amConfig.getBinaryConfLR());
-
- Map<String, String> taskEnv = v.getTaskEnvironment();
- TezYARNUtils.setupDefaultEnv(taskEnv, amConfig.getTezConfiguration(),
- TezConfiguration.TEZ_TASK_LAUNCH_ENV,
- TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive);
-
- setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration());
- }
- }
-
static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
Credentials credentials) throws IOException {
- DAGPlan dagPB = dag.getCachedDAGPlan();
- if (dagPB == null) {
- updateDAGVertices(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials);
- dagPB = dag.createDag(amConfig.getTezConfiguration());
- }
- return dagPB;
+ Credentials dagCredentials = setupDAGCredentials(dag, credentials, amConfig.getTezConfiguration());
+ return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
+ amConfig.getBinaryConfLR(), true);
}
static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
@@ -678,8 +657,8 @@ public class TezClientUtils {
return StringUtils.join(vargs, " ").trim();
}
- static void setDefaultLaunchCmdOpts(Vertex v, TezConfiguration conf) {
- String vOpts = v.getTaskLaunchCmdOpts();
+ @Private
+ public static String addDefaultsToTaskLaunchCmdOpts(String vOpts, Configuration conf) {
String vConfigOpts = conf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS,
TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT);
if (vConfigOpts != null && vConfigOpts.length() > 0) {
@@ -689,7 +668,7 @@ public class TezClientUtils {
vOpts = maybeAddDefaultLoggingJavaOpts(conf.get(
TezConfiguration.TEZ_TASK_LOG_LEVEL,
TezConfiguration.TEZ_TASK_LOG_LEVEL_DEFAULT), vOpts);
- v.setTaskLaunchCmdOpts(vOpts);
+ return vOpts;
}
@Private
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index ffd2e83..c28f210 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -39,10 +39,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezYARNUtils;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -53,7 +54,6 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
-import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
@@ -89,7 +89,6 @@ public class DAG {
Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
private Stack<String> topologicalVertexStack = new Stack<String>();
- private DAGPlan cachedDAGPlan;
private DAG(String name) {
this.name = name;
@@ -112,7 +111,7 @@ public class DAG {
* elements of the map.
* @return {@link DAG}
*/
- public DAG addTaskLocalFiles(Map<String, LocalResource> localFiles) {
+ public synchronized DAG addTaskLocalFiles(Map<String, LocalResource> localFiles) {
Preconditions.checkNotNull(localFiles);
TezCommonUtils.addAdditionalLocalResources(localFiles, commonTaskLocalFiles);
return this;
@@ -298,6 +297,11 @@ public class DAG {
return this.name;
}
+ @Private
+ public Map<String, LocalResource> getTaskLocalFiles() {
+ return commonTaskLocalFiles;
+ }
+
void checkAndInferOneToOneParallelism() {
// infer all 1-1 via dependencies
// collect all 1-1 edges where the source parallelism is set
@@ -575,14 +579,11 @@ public class DAG {
}
}
- @Private
- public DAGPlan getCachedDAGPlan() {
- return cachedDAGPlan;
- }
-
// create protobuf message describing DAG
@Private
- public DAGPlan createDag(Configuration dagConf) {
+ public DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
+ Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
+ boolean tezLrsAsArchive) {
verify(true);
DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
@@ -608,6 +609,15 @@ public class DAG {
}
}
+ Credentials dagCredentials = new Credentials();
+ if (extraCredentials != null) {
+ dagCredentials.mergeAll(extraCredentials);
+ }
+ dagCredentials.mergeAll(credentials);
+ if (!commonTaskLocalFiles.isEmpty()) {
+ dagBuilder.addAllLocalResource(DagTypeConverters.convertToDAGPlan(commonTaskLocalFiles));
+ }
+
Preconditions.checkArgument(topologicalVertexStack.size() == vertices.size(),
"size of topologicalVertexStack is:" + topologicalVertexStack.size() +
" while size of vertices is:" + vertices.size() +
@@ -615,38 +625,57 @@ public class DAG {
while(!topologicalVertexStack.isEmpty()) {
Vertex vertex = vertices.get(topologicalVertexStack.pop());
// infer credentials, resources and parallelism from data source
- if (vertex.getTaskResource() == null) {
- vertex.setTaskResource(Resource.newInstance(dagConf.getInt(
+ Resource vertexTaskResource = vertex.getTaskResource();
+ if (vertexTaskResource == null) {
+ vertexTaskResource = Resource.newInstance(dagConf.getInt(
TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), dagConf.getInt(
TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
- TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT)));
+ TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT));
}
+ Map<String, LocalResource> vertexLRs = Maps.newHashMap();
+ vertexLRs.putAll(vertex.getTaskLocalFiles());
List<DataSourceDescriptor> dataSources = vertex.getDataSources();
for (DataSourceDescriptor dataSource : dataSources) {
if (dataSource.getCredentials() != null) {
- credentials.addAll(dataSource.getCredentials());
+ dagCredentials.addAll(dataSource.getCredentials());
}
vertex.addTaskLocalFiles(dataSource.getAdditionalLocalFiles());
+ if (dataSource.getAdditionalLocalFiles() != null) {
+ TezCommonUtils.addAdditionalLocalResources(dataSource.getAdditionalLocalFiles(), vertexLRs);
+ }
+ }
+ if (tezJarResources != null) {
+ TezCommonUtils.addAdditionalLocalResources(tezJarResources, vertexLRs);
}
+ if (binaryConfig != null) {
+ vertexLRs.put(TezConstants.TEZ_PB_BINARY_CONF_NAME, binaryConfig);
+ }
+ int vertexParallelism = vertex.getParallelism();
+ VertexLocationHint vertexLocationHint = vertex.getLocationHint();
if (dataSources.size() == 1) {
DataSourceDescriptor dataSource = dataSources.get(0);
- if (vertex.getParallelism() == -1 && dataSource.getNumberOfShards() > -1) {
- vertex.setParallelism(dataSource.getNumberOfShards());
+ if (vertexParallelism == -1 && dataSource.getNumberOfShards() > -1) {
+ vertexParallelism = dataSource.getNumberOfShards();
}
- if (vertex.getLocationHint() == null && dataSource.getLocationHint() != null) {
- vertex.setLocationHint(dataSource.getLocationHint());
+ if (vertexLocationHint == null && dataSource.getLocationHint() != null) {
+ vertexLocationHint = dataSource.getLocationHint();
}
}
+ if (vertexParallelism == -1) {
+ Preconditions.checkState(vertexLocationHint == null,
+ "Cannot specify vertex location hint without specifying vertex parallelism. Vertex: "
+ + vertex.getName());
+ } else if (vertexLocationHint != null) {
+ Preconditions.checkState(vertexParallelism == vertexLocationHint.getTaskLocationHints().size(),
+ "vertex task location hint must equal vertex parallelism. Vertex: " + vertex.getName());
+ }
for (DataSinkDescriptor dataSink : vertex.getDataSinks()) {
if (dataSink.getCredentials() != null) {
- credentials.addAll(dataSink.getCredentials());
+ dagCredentials.addAll(dataSink.getCredentials());
}
}
- // add common task files for this DAG
- vertex.addTaskLocalFiles(commonTaskLocalFiles);
-
VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
vertexBuilder.setName(vertex.getName());
vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46.
@@ -665,48 +694,31 @@ public class DAG {
//task config
PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
- Resource resource = vertex.getTaskResource();
- taskConfigBuilder.setNumTasks(vertex.getParallelism());
- taskConfigBuilder.setMemoryMb(resource.getMemory());
- taskConfigBuilder.setVirtualCores(resource.getVirtualCores());
- taskConfigBuilder.setJavaOpts(vertex.getTaskLaunchCmdOpts());
+ taskConfigBuilder.setNumTasks(vertexParallelism);
+ taskConfigBuilder.setMemoryMb(vertexTaskResource.getMemory());
+ taskConfigBuilder.setVirtualCores(vertexTaskResource.getVirtualCores());
+ taskConfigBuilder.setJavaOpts(
+ TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vertex.getTaskLaunchCmdOpts(), dagConf));
taskConfigBuilder.setTaskModule(vertex.getName());
- PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
- localResourcesBuilder.clear();
- for (Entry<String, LocalResource> entry :
- vertex.getTaskLocalFiles().entrySet()) {
- String key = entry.getKey();
- LocalResource lr = entry.getValue();
- localResourcesBuilder.setName(key);
- localResourcesBuilder.setUri(
- DagTypeConverters.convertToDAGPlan(lr.getResource()));
- localResourcesBuilder.setSize(lr.getSize());
- localResourcesBuilder.setTimeStamp(lr.getTimestamp());
- localResourcesBuilder.setType(
- DagTypeConverters.convertToDAGPlan(lr.getType()));
- localResourcesBuilder.setVisibility(
- DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
- if (lr.getType() == LocalResourceType.PATTERN) {
- if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
- throw new TezUncheckedException("LocalResource type set to pattern"
- + " but pattern is null or empty");
- }
- localResourcesBuilder.setPattern(lr.getPattern());
- }
- taskConfigBuilder.addLocalResource(localResourcesBuilder);
+ if (!vertexLRs.isEmpty()) {
+ taskConfigBuilder.addAllLocalResource(DagTypeConverters.convertToDAGPlan(vertexLRs));
}
-
- for (String key : vertex.getTaskEnvironment().keySet()) {
+
+ Map<String, String> taskEnv = Maps.newHashMap(vertex.getTaskEnvironment());
+ TezYARNUtils.setupDefaultEnv(taskEnv, dagConf,
+ TezConfiguration.TEZ_TASK_LAUNCH_ENV,
+ TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive);
+ for (Map.Entry<String, String> entry : taskEnv.entrySet()) {
PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
- envSettingBuilder.setKey(key);
- envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key));
+ envSettingBuilder.setKey(entry.getKey());
+ envSettingBuilder.setValue(entry.getValue());
taskConfigBuilder.addEnvironmentSetting(envSettingBuilder);
}
- if (vertex.getLocationHint() != null) {
- if (vertex.getLocationHint().getTaskLocationHints() != null) {
- for (TaskLocationHint hint : vertex.getLocationHint().getTaskLocationHints()) {
+ if (vertexLocationHint != null) {
+ if (vertexLocationHint.getTaskLocationHints() != null) {
+ for (TaskLocationHint hint : vertexLocationHint.getTaskLocationHints()) {
PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
if (hint.getAffinitizedContainer() != null) {
@@ -788,12 +800,11 @@ public class DAG {
dagBuilder.setDagKeyValues(confProtoBuilder); // This does not seem to be used anywhere
// should this replace BINARY_PB_CONF???
- if (credentials != null) {
- dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(credentials));
- TezCommonUtils.logCredentials(LOG, credentials, "dag");
+ if (dagCredentials != null) {
+ dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(dagCredentials));
+ TezCommonUtils.logCredentials(LOG, dagCredentials, "dag");
}
- cachedDAGPlan = dagBuilder.build();
- return cachedDAGPlan;
+ return dagBuilder.build();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 6594f02..179f3cc 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.Map.Entry;
import javax.annotation.Nullable;
@@ -72,6 +73,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexLocationHintProto;
+import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.ByteString.Output;
@@ -86,6 +88,33 @@ public class DagTypeConverters {
default : throw new RuntimeException("unknown 'visibility': " + visibility);
}
}
+
+ public static List<PlanLocalResource> convertToDAGPlan(Map<String, LocalResource> lrs) {
+ List<PlanLocalResource> planLrs = Lists.newArrayListWithCapacity(lrs.size());
+ for (Entry<String, LocalResource> entry : lrs.entrySet()) {
+ PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
+ String key = entry.getKey();
+ LocalResource lr = entry.getValue();
+ localResourcesBuilder.setName(key);
+ localResourcesBuilder.setUri(
+ DagTypeConverters.convertToDAGPlan(lr.getResource()));
+ localResourcesBuilder.setSize(lr.getSize());
+ localResourcesBuilder.setTimeStamp(lr.getTimestamp());
+ localResourcesBuilder.setType(
+ DagTypeConverters.convertToDAGPlan(lr.getType()));
+ localResourcesBuilder.setVisibility(
+ DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
+ if (lr.getType() == LocalResourceType.PATTERN) {
+ if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
+ throw new TezUncheckedException("LocalResource type set to pattern"
+ + " but pattern is null or empty");
+ }
+ localResourcesBuilder.setPattern(lr.getPattern());
+ }
+ planLrs.add(localResourcesBuilder.build());
+ }
+ return planLrs;
+ }
public static LocalResourceVisibility convertFromDAGPlan(PlanLocalResourceVisibility visibility){
switch(visibility){
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 54cd191..5b1f518 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -159,6 +159,7 @@ message DAGPlan {
optional ConfigurationProto dagKeyValues = 4;
optional bytes credentials_binary = 5;
repeated PlanVertexGroupInfo vertex_groups = 6;
+ repeated PlanLocalResource local_resource = 7;
}
// DAG monitoring messages
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 163b042..0ce6dff 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -161,11 +161,7 @@ public class TestTezClient {
Resource.newInstance(1, 1));
DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
DAGClient dagClient = client.submitDAG(dag);
-
- // verify that both DAG and TezClient localResources are added to the vertex
- Map<String, LocalResource> vertexLR = vertex.getTaskLocalFiles();
- Assert.assertTrue(vertexLR.containsKey(mockLR1Name));
-
+
Assert.assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
if (isSession) {
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 22fe3a1..8cbd611 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -123,7 +123,7 @@ public class TestDAGPlan {
dag.addVertex(v1).addVertex(v2).addEdge(edge);
- DAGPlan dagProto = dag.createDag(new TezConfiguration());
+ DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
EdgeProperty edgeProperty = DagTypeConverters.createEdgePropertyMapFromDAGPlan(dagProto
.getEdgeList().get(0));
@@ -158,7 +158,7 @@ public class TestDAGPlan {
dag.addVertex(v1).addVertex(v2).addEdge(edge);
- DAGPlan dagProto = dag.createDag(new TezConfiguration());
+ DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
assertEquals(2, dagProto.getVertexCount());
assertEquals(1, dagProto.getEdgeCount());
@@ -224,7 +224,7 @@ public class TestDAGPlan {
dag.addVertex(v1).addVertex(v2).addEdge(edge).addVertex(v3);
- DAGPlan dagProto = dag.createDag(new TezConfiguration());
+ DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
assertEquals(3, dagProto.getVertexCount());
assertEquals(1, dagProto.getEdgeCount());
@@ -300,7 +300,7 @@ public class TestDAGPlan {
dag.setCredentials(dagCredentials);
- DAGPlan dagProto = dag.createDag(new TezConfiguration());
+ DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
assertTrue(dagProto.hasCredentialsBinary());
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index ba89a47..0697584 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -19,7 +19,9 @@
package org.apache.tez.dag.api;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
@@ -35,10 +37,14 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
public class TestDAGVerify {
@@ -901,51 +907,51 @@ public class TestDAGVerify {
dag.addVertex(v1);
- dag.createDag(new TezConfiguration());
+ dag.createDag(new TezConfiguration(), null, null, null, true);
}
@Test(timeout = 5000)
- public void testVerifyCommonFiles() {
- Vertex v1 = Vertex.create("v1",
- ProcessorDescriptor.create(dummyProcessorClassName),
- dummyTaskCount, dummyTaskResource);
- Vertex v2 = Vertex.create("v2",
- ProcessorDescriptor.create("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = Edge.create(v1, v2,
- EdgeProperty.create(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- OutputDescriptor.create(dummyOutputClassName),
- InputDescriptor.create(dummyInputClassName)));
- Map<String, LocalResource> lrs = Maps.newHashMap();
+ public void testDAGCreateDataInference() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
+ Map<String, LocalResource> lrs1 = Maps.newHashMap();
String lrName1 = "LR1";
- lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
+ lrs1.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
+ LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+ Map<String, LocalResource> lrs2 = Maps.newHashMap();
+ String lrName2 = "LR2";
+ lrs2.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+ Set<String> hosts = Sets.newHashSet();
+ hosts.add("h1");
+ hosts.add("h2");
+ List<TaskLocationHint> taskLocationHints = Lists.newLinkedList();
+ taskLocationHints.add(TaskLocationHint.createTaskLocationHint(hosts, null));
+ taskLocationHints.add(TaskLocationHint.createTaskLocationHint(hosts, null));
+ VertexLocationHint vLoc = VertexLocationHint.create(taskLocationHints);
+ DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"),
+ null, dummyTaskCount, null, vLoc, lrs2);
+ v1.addDataSource("i1", ds);
+
DAG dag = DAG.create("testDag");
dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addEdge(e1);
- dag.addTaskLocalFiles(lrs);
- dag.createDag(new TezConfiguration());
- Assert.assertTrue(v1.getTaskLocalFiles().containsKey(lrName1));
- Assert.assertTrue(v2.getTaskLocalFiles().containsKey(lrName1));
+ dag.addTaskLocalFiles(lrs1);
+ DAGPlan dagPlan = dag.createDag(new TezConfiguration(), null, null, null, true);
+ Assert.assertEquals(lrName1, dagPlan.getLocalResource(0).getName());
+ VertexPlan vPlan = dagPlan.getVertex(0);
+ PlanTaskConfiguration taskPlan = vPlan.getTaskConfig();
+ Assert.assertEquals(dummyTaskCount, taskPlan.getNumTasks());
+ Assert.assertEquals(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT, taskPlan.getMemoryMb());
+ Assert.assertEquals(lrName2, taskPlan.getLocalResource(0).getName());
+ Assert.assertEquals(dummyTaskCount, vPlan.getTaskLocationHintCount());
}
@Test(timeout = 5000)
- public void testVerifyCommonFilesFail() {
+ public void testInferredFilesFail() {
Vertex v1 = Vertex.create("v1",
ProcessorDescriptor.create(dummyProcessorClassName),
dummyTaskCount, dummyTaskResource);
- Vertex v2 = Vertex.create("v2",
- ProcessorDescriptor.create("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = Edge.create(v1, v2,
- EdgeProperty.create(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- OutputDescriptor.create(dummyOutputClassName),
- InputDescriptor.create(dummyInputClassName)));
Map<String, LocalResource> lrs = Maps.newHashMap();
String lrName1 = "LR1";
lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
@@ -957,10 +963,13 @@ public class TestDAGVerify {
} catch (TezUncheckedException e) {
Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource"));
}
+
+ DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"),
+ null, -1, null, null, lrs);
+ v1.addDataSource("i1", ds);
+
DAG dag = DAG.create("testDag");
dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addEdge(e1);
dag.addTaskLocalFiles(lrs);
try {
dag.addTaskLocalFiles(lrs);
@@ -969,8 +978,8 @@ public class TestDAGVerify {
Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource"));
}
try {
- // dag will add duplicate common files to vertex
- dag.createDag(new TezConfiguration());
+ // data source will add duplicate common files to vertex
+ dag.createDag(new TezConfiguration(), null, null, null, true);
Assert.fail();
} catch (TezUncheckedException e) {
Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource"));
@@ -993,7 +1002,7 @@ public class TestDAGVerify {
dag.setAccessControls(dagAccessControls);
Configuration conf = new Configuration(false);
- DAGPlan dagPlan = dag.createDag(conf);
+ DAGPlan dagPlan = dag.createDag(conf, null, null, null, true);
Assert.assertNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
Assert.assertNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 2fae860..a2f04ab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -41,6 +42,7 @@ import org.apache.tez.dag.records.TezVertexID;
public interface DAG {
TezDAGID getID();
+ Map<String, LocalResource> getLocalResources();
String getName();
DAGState getState();
DAGReport getReport();
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 23f9096..c4e16e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -167,6 +168,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
public final Configuration conf;
private final DAGPlan jobPlan;
+
+ Map<String, LocalResource> localResources;
private final List<String> diagnostics = new ArrayList<String>();
@@ -421,6 +424,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
+
+ this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(jobPlan
+ .getLocalResourceList());
this.credentials = dagCredentials;
if (this.credentials == null) {
@@ -452,6 +458,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
public TezDAGID getID() {
return dagId;
}
+
+ @Override
+ public Map<String, LocalResource> getLocalResources() {
+ return localResources;
+ }
// TODO maybe removed after TEZ-74
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 5eb1e67..b776349 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -90,11 +90,7 @@ public class AMContainerHelpers {
*/
private static ContainerLaunchContext createCommonContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs,
- Credentials credentials) {
-
- // Application resources
- Map<String, LocalResource> localResources =
- new HashMap<String, LocalResource>();
+ Credentials credentials, Map<String, LocalResource> localResources) {
// Application environment
Map<String, String> environment = new HashMap<String, String>();
@@ -141,6 +137,7 @@ public class AMContainerHelpers {
@VisibleForTesting
public static ContainerLaunchContext createContainerLaunchContext(
TezDAGID tezDAGID,
+ Map<String, LocalResource> commonDAGLRs,
Map<ApplicationAccessType, String> acls,
ContainerId containerId,
Map<String, LocalResource> localResources,
@@ -154,7 +151,7 @@ public class AMContainerHelpers {
synchronized (commonContainerSpecLock) {
if (!commonContainerSpecs.containsKey(tezDAGID)) {
commonContainerSpec =
- createCommonContainerLaunchContext(acls, credentials);
+ createCommonContainerLaunchContext(acls, credentials, commonDAGLRs);
commonContainerSpecs.put(tezDAGID, commonContainerSpec);
} else {
commonContainerSpec = commonContainerSpecs.get(tezDAGID);
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index df1b65d..a0f9cb7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -496,8 +496,14 @@ public class AMContainerImpl implements AMContainer {
container.credentials = containerContext.getCredentials();
container.credentialsChanged = true;
+ TezDAGID dagId = null;
+ Map<String, LocalResource> dagLocalResources = null;
+ if (container.appContext.getCurrentDAG() != null) {
+ dagId = container.appContext.getCurrentDAG().getID();
+ dagLocalResources = container.appContext.getCurrentDAG().getLocalResources();
+ }
ContainerLaunchContext clc = AMContainerHelpers.createContainerLaunchContext(
- container.appContext.getCurrentDAGID(),
+ dagId, dagLocalResources,
container.appContext.getApplicationACLs(),
container.getContainerId(),
containerContext.getLocalResources(),
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 9fe9c4d..7c7e091 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.ContainerContext;
@@ -84,16 +85,19 @@ public class MockDAGAppMaster extends DAGAppMaster {
ContainerId cId;
TezTaskAttemptID taId;
String vName;
+ ContainerLaunchContext launchContext;
boolean completed;
- public ContainerData(ContainerId cId) {
+ public ContainerData(ContainerId cId, ContainerLaunchContext context) {
this.cId = cId;
+ this.launchContext = context;
}
void clear() {
taId = null;
vName = null;
completed = false;
+ launchContext = null;
}
}
@@ -126,6 +130,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
void waitToGo() {
+ if (goFlag == null) {
+ return;
+ }
synchronized (goFlag) {
goFlag.set(true);
goFlag.notify();
@@ -164,9 +171,16 @@ public class MockDAGAppMaster extends DAGAppMaster {
void launch(NMCommunicatorLaunchRequestEvent event) {
// launch container by putting it in simulated container list
- containers.put(event.getContainerId(), new ContainerData(event.getContainerId()));
+ containers.put(event.getContainerId(), new ContainerData(event.getContainerId(),
+ event.getContainerLaunchContext()));
getContext().getEventHandler().handle(new AMContainerEventLaunched(event.getContainerId()));
}
+
+ public void waitTillContainersLaunched() throws InterruptedException {
+ while (containers.isEmpty()) {
+ Thread.sleep(50);
+ }
+ }
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
new file mode 100644
index 0000000..1bab0d2
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -0,0 +1,138 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
+import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("deprecation")
+public class TestMockDAGAppMaster {
+ static Configuration defaultConf;
+ static FileSystem localFs;
+ static Path workDir;
+
+ static {
+ try {
+ defaultConf = new Configuration(false);
+ defaultConf.set("fs.defaultFS", "file:///");
+ defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ localFs = FileSystem.getLocal(defaultConf);
+ workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestDAGAppMaster").makeQualified(localFs);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+
+ @Test (timeout = 5000)
+ public void testLocalResourceSetup() throws Exception {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+ tezClient.start();
+
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(false);
+
+ Map<String, LocalResource> lrDAG = Maps.newHashMap();
+ String lrName1 = "LR1";
+ lrDAG.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
+ LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+ Map<String, LocalResource> lrVertex = Maps.newHashMap();
+ String lrName2 = "LR2";
+ lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
+ LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+
+ DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG);
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
+ dag.addVertex(vA);
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ mockLauncher.waitTillContainersLaunched();
+ ContainerData cData = mockLauncher.getContainers().values().iterator().next();
+ ContainerLaunchContext launchContext = cData.launchContext;
+ Map<String, LocalResource> taskLR = launchContext.getLocalResources();
+ // verify tasks are launched with both DAG and task resources.
+ Assert.assertTrue(taskLR.containsKey(lrName1));
+ Assert.assertTrue(taskLR.containsKey(lrName2));
+
+ mockLauncher.startScheduling(true);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ tezClient.stop();
+ }
+
+ @Test (timeout = 10000)
+ public void testMultipleSubmissions() throws Exception {
+ Map<String, LocalResource> lrDAG = Maps.newHashMap();
+ String lrName1 = "LR1";
+ lrDAG.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
+ LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+ Map<String, LocalResource> lrVertex = Maps.newHashMap();
+ String lrName2 = "LR2";
+ lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
+ LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+
+ DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG);
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
+ dag.addVertex(vA);
+
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+ tezClient.start();
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ tezClient.stop();
+
+ // submit the same DAG again to verify it can be done.
+ tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+ tezClient.start();
+ dagClient = tezClient.submitDAG(dag);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ tezClient.stop();
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 03aedef..2d7cf65 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -378,7 +378,7 @@ public class TestDAGImpl {
dag.addVertex(v2);
dag.addVertex(v3);
dag.addEdge(e1);
- return dag.createDag(conf);
+ return dag.createDag(conf, null, null, null, true);
}
public static DAGPlan createTestDAGPlan() {
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 5c00fec..a1b9847 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -429,7 +429,7 @@ public class TestVertexImpl {
SchedulingType.SEQUENTIAL, OutputDescriptor.create("out.class"),
InputDescriptor.create("out.class"))));
- return dag.createDag(conf);
+ return dag.createDag(conf, null, null, null, true);
}
private DAGPlan createDAGPlanWithInitializer0Tasks(String initializerClassName) {
http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index 081ed10..0be67ad 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -87,7 +87,7 @@ public class TestDAGUtils {
dag.addVertex(v2);
dag.addVertex(v3);
dag.addEdge(e1);
- return dag.createDag(conf);
+ return dag.createDag(conf, null, null, null, true);
}
@Test