You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/15 20:51:10 UTC
git commit: TEZ-1563. TezClient.submitDAGSession alters DAG local
resources regardless of DAG submission (bikas)
Repository: tez
Updated Branches:
refs/heads/master 7d1303fa6 -> 5d6b8fd56
TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless of DAG submission (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5d6b8fd5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5d6b8fd5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5d6b8fd5
Branch: refs/heads/master
Commit: 5d6b8fd5660dbe5828467d5678bbbd58c3702c7a
Parents: 7d1303f
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Sep 15 11:51:02 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Sep 15 11:51:02 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../java/org/apache/tez/client/TezClient.java | 5 +-
.../org/apache/tez/client/TezClientUtils.java | 15 +++-
.../main/java/org/apache/tez/dag/api/DAG.java | 9 ++-
.../org/apache/tez/client/TestTezClient.java | 77 +++++++++++++-------
.../org/apache/tez/dag/app/TestPreemption.java | 4 +-
6 files changed, 80 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3198323..1520e70 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,8 @@ ALL CHANGES:
TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
TEZ-1524. Resolve user group information only if ACLs are enabled.
TEZ-1581. GroupByOrderByMRRTest no longer functional.
+ TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless
+ of DAG submission
Release 0.5.1: Unreleased
@@ -36,6 +38,8 @@ ALL CHANGES
TEZ-1519. TezTaskRunner should not initialize TezConfiguration in TezChild.
TEZ-1534. Make client side configs available to AM and tasks.
TEZ-1574. Support additional formats for the tez deployed archive
+ TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless
+ of DAG submission
Release 0.5.0: 2014-09-03
http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 77ab20c..5cec8b0 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -366,10 +366,9 @@ public class TezClient {
}
Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
- TezClientUtils.updateDAGVertices(dag, amConfig, tezJarResources,
+ DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
TezClientUtils.usingTezLibsFromArchive(tezJarResources), sessionCredentials);
-
- DAGPlan dagPlan = dag.createDag(amConfig.getTezConfiguration());
+
SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
requestBuilder.setDAGPlan(dagPlan).build();
if (!additionalLocalResources.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 3297adc..917fcff 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -557,7 +557,7 @@ public class TezClientUtils {
if(dag != null) {
- updateDAGVertices(dag, amConfig, tezJarResources, tezLrsAsArchive, sessionCreds);
+ DAGPlan dagPB = prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, sessionCreds);
// emit protobuf DAG file style
Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath);
@@ -567,8 +567,6 @@ public class TezClientUtils {
+ sessionJarsPath + " binaryPlanPath :" + binaryPath);
}
- DAGPlan dagPB = dag.createDag(amConfig.getTezConfiguration());
-
FSDataOutputStream dagPBOutBinaryStream = null;
try {
@@ -644,6 +642,17 @@ public class TezClientUtils {
}
}
+ static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
+ Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
+ Credentials credentials) throws IOException {
+ DAGPlan dagPB = dag.getCachedDAGPlan();
+ if (dagPB == null) {
+ updateDAGVertices(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials);
+ dagPB = dag.createDag(amConfig.getTezConfiguration());
+ }
+ return dagPB;
+ }
+
static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
if (vargs != null && !vargs.isEmpty()) {
for (String arg : vargs) {
http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index d063e79..ffd2e83 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -89,6 +89,7 @@ public class DAG {
Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
private Stack<String> topologicalVertexStack = new Stack<String>();
+ private DAGPlan cachedDAGPlan;
private DAG(String name) {
this.name = name;
@@ -574,6 +575,10 @@ public class DAG {
}
}
+ @Private
+ public DAGPlan getCachedDAGPlan() {
+ return cachedDAGPlan;
+ }
// create protobuf message describing DAG
@Private
@@ -787,6 +792,8 @@ public class DAG {
dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(credentials));
TezCommonUtils.logCredentials(LOG, credentials, "dag");
}
- return dagBuilder.build();
+
+ cachedDAGPlan = dagBuilder.build();
+ return cachedDAGPlan;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 5044d55..fbb4486 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -20,6 +20,7 @@ package org.apache.tez.client;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@@ -89,7 +90,16 @@ public class TestTezClient {
}
}
- static void configure(TezClientForTest client) throws YarnException, IOException {
+ TezClientForTest configure() throws YarnException, IOException {
+ return configure(new HashMap<String, LocalResource>(), true);
+ }
+
+ TezClientForTest configure(Map<String, LocalResource> lrs, boolean isSession) throws YarnException, IOException {
+ TezConfiguration conf = new TezConfiguration();
+ conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
+ conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
+ TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
+
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
YarnClient yarnClient = mock(YarnClient.class, RETURNS_DEEP_STUBS);
when(yarnClient.createApplication().getNewApplicationResponse().getApplicationId()).thenReturn(appId1);
@@ -100,31 +110,27 @@ public class TestTezClient {
client.mockTezYarnClient = new TezYarnClient(yarnClient);
client.mockYarnClient = yarnClient;
client.mockAppId = appId1;
+
+ return client;
}
- @Test
+ @Test (timeout = 5000)
public void testTezclientApp() throws Exception {
testTezClient(false);
}
- @Test
+ @Test (timeout = 5000)
public void testTezclientSession() throws Exception {
testTezClient(true);
}
public void testTezClient(boolean isSession) throws Exception {
- TezConfiguration conf = new TezConfiguration();
- conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
- conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
-
Map<String, LocalResource> lrs = Maps.newHashMap();
String lrName1 = "LR1";
lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file:///", "localhost", 0, "test"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
- TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
-
- configure(client);
+ TezClientForTest client = configure(lrs, isSession);
ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
client.start();
@@ -229,13 +235,7 @@ public class TestTezClient {
}
public void testPreWarm() throws Exception {
- TezConfiguration conf = new TezConfiguration();
- conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
- conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
- Map<String, LocalResource> lrs = Maps.newHashMap();
- final TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
-
- configure(client);
+ TezClientForTest client = configure();
client.start();
PreWarmVertex vertex = PreWarmVertex.create("PreWarm", 1, Resource.newInstance(1, 1));
client.preWarm(vertex);
@@ -248,16 +248,42 @@ public class TestTezClient {
client.stop();
}
+ @Test (timeout = 10000)
+ public void testMultipleSubmissions() throws Exception {
+ testMultipleSubmissionsJob(false);
+ testMultipleSubmissionsJob(true);
+ }
+
+ public void testMultipleSubmissionsJob(boolean isSession) throws Exception {
+ TezClientForTest client1 = configure(new HashMap<String, LocalResource>(), isSession);
+ client1.start();
+
+ String mockLR1Name = "LR1";
+ Map<String, LocalResource> lrDAG = Collections.singletonMap(mockLR1Name, LocalResource
+ .newInstance(URL.newInstance("file:///", "localhost", 0, "test"), LocalResourceType.FILE,
+ LocalResourceVisibility.PUBLIC, 1, 1));
+ Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1,
+ Resource.newInstance(1, 1));
+ DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
+ // the dag resource will be added to the vertex once
+ client1.submitDAG(dag);
+
+ TezClientForTest client2 = configure();
+ client2.start();
+
+ // verify resubmission of same dag to new client (simulates submission error resulting in the
+ // creation of a new client and resubmission of the DAG)
+ // TEZ-1563 dag resource will not be added again to the vertex because its cached
+ // so resubmission works fine
+ client2.submitDAG(dag);
+
+ client1.stop();
+ client2.stop();
+ }
+
@Test(timeout = 5000)
public void testWaitTillReady_Interrupt() throws Exception {
- TezConfiguration conf = new TezConfiguration();
- conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
- conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
- Map<String, LocalResource> lrs = Maps.newHashMap();
- final TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
-
- configure(client);
-
+ final TezClientForTest client = configure();
client.start();
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
@@ -278,6 +304,7 @@ public class TestTezClient {
thread.interrupt();
thread.join();
Assert.assertThat(exceptionReference.get(),CoreMatchers. instanceOf(InterruptedException.class));
+ client.stop();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index c7aacd4..cfb1c9f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -85,7 +85,7 @@ public class TestPreemption {
return dag;
}
- @Test
+ @Test (timeout = 5000)
public void testPreemptionWithoutSession() throws Exception {
System.out.println("TestPreemptionWithoutSession");
TezConfiguration tezconf = new TezConfiguration(defaultConf);
@@ -120,7 +120,7 @@ public class TestPreemption {
tezClient.stop();
}
- @Test
+ @Test (timeout = 30000)
public void testPreemptionWithSession() throws Exception {
System.out.println("TestPreemptionWithSession");
MockTezClient tezClient = createTezSession();