You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/15 20:51:10 UTC

git commit: TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless of DAG submission (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 7d1303fa6 -> 5d6b8fd56


TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless of DAG submission (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5d6b8fd5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5d6b8fd5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5d6b8fd5

Branch: refs/heads/master
Commit: 5d6b8fd5660dbe5828467d5678bbbd58c3702c7a
Parents: 7d1303f
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Sep 15 11:51:02 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Sep 15 11:51:02 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +
 .../java/org/apache/tez/client/TezClient.java   |  5 +-
 .../org/apache/tez/client/TezClientUtils.java   | 15 +++-
 .../main/java/org/apache/tez/dag/api/DAG.java   |  9 ++-
 .../org/apache/tez/client/TestTezClient.java    | 77 +++++++++++++-------
 .../org/apache/tez/dag/app/TestPreemption.java  |  4 +-
 6 files changed, 80 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3198323..1520e70 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,8 @@ ALL CHANGES:
   TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
   TEZ-1524. Resolve user group information only if ACLs are enabled.
   TEZ-1581. GroupByOrderByMRRTest no longer functional.
+  TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless
+  of DAG submission
 
 Release 0.5.1: Unreleased
 
@@ -36,6 +38,8 @@ ALL CHANGES
   TEZ-1519. TezTaskRunner should not initialize TezConfiguration in TezChild.
   TEZ-1534. Make client side configs available to AM and tasks.
   TEZ-1574. Support additional formats for the tez deployed archive
+  TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless
+  of DAG submission
 
 Release 0.5.0: 2014-09-03
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 77ab20c..5cec8b0 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -366,10 +366,9 @@ public class TezClient {
     }
     
     Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
-    TezClientUtils.updateDAGVertices(dag, amConfig, tezJarResources,
+    DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
         TezClientUtils.usingTezLibsFromArchive(tezJarResources), sessionCredentials);
-    
-    DAGPlan dagPlan = dag.createDag(amConfig.getTezConfiguration());
+
     SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
     requestBuilder.setDAGPlan(dagPlan).build();
     if (!additionalLocalResources.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 3297adc..917fcff 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -557,7 +557,7 @@ public class TezClientUtils {
 
     if(dag != null) {
       
-      updateDAGVertices(dag, amConfig, tezJarResources, tezLrsAsArchive, sessionCreds);
+      DAGPlan dagPB = prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, sessionCreds);
 
       // emit protobuf DAG file style
       Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath);
@@ -567,8 +567,6 @@ public class TezClientUtils {
             + sessionJarsPath + " binaryPlanPath :" + binaryPath);
       }
 
-      DAGPlan dagPB = dag.createDag(amConfig.getTezConfiguration());
-
       FSDataOutputStream dagPBOutBinaryStream = null;
 
       try {
@@ -644,6 +642,17 @@ public class TezClientUtils {
     }
   }
   
+  static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
+      Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
+      Credentials credentials) throws IOException {
+    DAGPlan dagPB = dag.getCachedDAGPlan();
+    if (dagPB == null) {
+      updateDAGVertices(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials);  
+      dagPB = dag.createDag(amConfig.getTezConfiguration());  
+    }    
+    return dagPB;
+  }
+  
   static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
     if (vargs != null && !vargs.isEmpty()) {
       for (String arg : vargs) {

http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index d063e79..ffd2e83 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -89,6 +89,7 @@ public class DAG {
   Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
   
   private Stack<String> topologicalVertexStack = new Stack<String>();
+  private DAGPlan cachedDAGPlan;
 
   private DAG(String name) {
     this.name = name;
@@ -574,6 +575,10 @@ public class DAG {
     }
   }
 
+  @Private
+  public DAGPlan getCachedDAGPlan() {
+    return cachedDAGPlan;
+  }
 
   // create protobuf message describing DAG
   @Private
@@ -787,6 +792,8 @@ public class DAG {
       dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(credentials));
       TezCommonUtils.logCredentials(LOG, credentials, "dag");
     }
-    return dagBuilder.build();
+    
+    cachedDAGPlan = dagBuilder.build();
+    return cachedDAGPlan;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 5044d55..fbb4486 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -20,6 +20,7 @@ package org.apache.tez.client;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -89,7 +90,16 @@ public class TestTezClient {
     }
   }
   
-  static void configure(TezClientForTest client) throws YarnException, IOException {
+  TezClientForTest configure() throws YarnException, IOException {
+    return configure(new HashMap<String, LocalResource>(), true);
+  }
+  
+  TezClientForTest configure(Map<String, LocalResource> lrs, boolean isSession) throws YarnException, IOException {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
+    TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
+
     ApplicationId appId1 = ApplicationId.newInstance(0, 1);
     YarnClient yarnClient = mock(YarnClient.class, RETURNS_DEEP_STUBS);
     when(yarnClient.createApplication().getNewApplicationResponse().getApplicationId()).thenReturn(appId1);
@@ -100,31 +110,27 @@ public class TestTezClient {
     client.mockTezYarnClient = new TezYarnClient(yarnClient);
     client.mockYarnClient = yarnClient;
     client.mockAppId = appId1;
+    
+    return client;    
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testTezclientApp() throws Exception {
     testTezClient(false);
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testTezclientSession() throws Exception {
     testTezClient(true);
   }
   
   public void testTezClient(boolean isSession) throws Exception {
-    TezConfiguration conf = new TezConfiguration();
-    conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
-    conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
-
     Map<String, LocalResource> lrs = Maps.newHashMap();
     String lrName1 = "LR1";
     lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file:///", "localhost", 0, "test"),
         LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
     
-    TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
-    
-    configure(client);
+    TezClientForTest client = configure(lrs, isSession);
     
     ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
     client.start();
@@ -229,13 +235,7 @@ public class TestTezClient {
   }
   
   public void testPreWarm() throws Exception {
-    TezConfiguration conf = new TezConfiguration();
-    conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
-    conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
-    Map<String, LocalResource> lrs = Maps.newHashMap();
-    final TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
-
-    configure(client);
+    TezClientForTest client = configure();
     client.start();
     PreWarmVertex vertex = PreWarmVertex.create("PreWarm", 1, Resource.newInstance(1, 1));
     client.preWarm(vertex);
@@ -248,16 +248,42 @@ public class TestTezClient {
     client.stop();
   }
   
+  @Test (timeout = 10000)
+  public void testMultipleSubmissions() throws Exception {
+    testMultipleSubmissionsJob(false);
+    testMultipleSubmissionsJob(true);
+  }
+  
+  public void testMultipleSubmissionsJob(boolean isSession) throws Exception {
+    TezClientForTest client1 = configure(new HashMap<String, LocalResource>(), isSession);
+    client1.start();
+    
+    String mockLR1Name = "LR1";
+    Map<String, LocalResource> lrDAG = Collections.singletonMap(mockLR1Name, LocalResource
+        .newInstance(URL.newInstance("file:///", "localhost", 0, "test"), LocalResourceType.FILE,
+            LocalResourceVisibility.PUBLIC, 1, 1));
+    Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1,
+        Resource.newInstance(1, 1));
+    DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
+    // the dag resource will be added to the vertex once
+    client1.submitDAG(dag);
+    
+    TezClientForTest client2 = configure();
+    client2.start();
+    
+    // verify resubmission of same dag to new client (simulates submission error resulting in the
+    // creation of a new client and resubmission of the DAG)
+    // TEZ-1563 dag resource will not be added again to the vertex because its cached
+    // so resubmission works fine
+    client2.submitDAG(dag);
+    
+    client1.stop();
+    client2.stop();
+  }
+  
   @Test(timeout = 5000)
   public void testWaitTillReady_Interrupt() throws Exception {
-    TezConfiguration conf = new TezConfiguration();
-    conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
-    conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
-    Map<String, LocalResource> lrs = Maps.newHashMap();
-    final TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
-
-    configure(client);
-
+    final TezClientForTest client = configure();
     client.start();
 
     when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
@@ -278,6 +304,7 @@ public class TestTezClient {
     thread.interrupt();
     thread.join();
     Assert.assertThat(exceptionReference.get(),CoreMatchers. instanceOf(InterruptedException.class));
+    client.stop();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5d6b8fd5/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index c7aacd4..cfb1c9f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -85,7 +85,7 @@ public class TestPreemption {
     return dag;
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testPreemptionWithoutSession() throws Exception {
     System.out.println("TestPreemptionWithoutSession");
     TezConfiguration tezconf = new TezConfiguration(defaultConf);
@@ -120,7 +120,7 @@ public class TestPreemption {
     tezClient.stop();
   }
   
-  @Test
+  @Test (timeout = 30000)
   public void testPreemptionWithSession() throws Exception {
     System.out.println("TestPreemptionWithSession");
     MockTezClient tezClient = createTezSession();