You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/05/28 23:43:00 UTC

git commit: TEZ-1158. Disable multiple AM attempts if recovery is disabled. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master ae267de54 -> b3a9ec0e7


TEZ-1158. Disable multiple AM attempts if recovery is disabled. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b3a9ec0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b3a9ec0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b3a9ec0e

Branch: refs/heads/master
Commit: b3a9ec0e7026ec3e5aea26a499dee5b7824c310e
Parents: ae267de
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed May 28 14:42:41 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed May 28 14:42:41 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/client/TezSession.java  |  8 +++
 .../org/apache/tez/test/TestDAGRecovery2.java   | 53 +++++++++++++++-----
 2 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3a9ec0e/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index 5950c2e..a085774 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -129,6 +129,14 @@ public class TezSession {
               sessionConfig.getTezConfiguration(), applicationId,
               null, sessionName, sessionConfig.getAMConfiguration(),
               tezJarResources, sessionCredentials);
+
+      // Set Tez Sessions to not retry on AM crashes if recovery is disabled
+      if (!sessionConfig.getAMConfiguration().getAMConf().getBoolean(
+          TezConfiguration.DAG_RECOVERY_ENABLED,
+          TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+        appContext.setMaxAppAttempts(1);
+      }
+
       yarnClient.submitApplication(appContext);
     } catch (YarnException e) {
       throw new TezException(e);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3a9ec0e/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index d6d78ee..b1eeea4 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -32,7 +32,6 @@ import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
 import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -40,8 +39,6 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
 import org.apache.tez.test.dag.MultiAttemptDAG;
-import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
-import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
 import org.apache.tez.test.dag.SimpleVTestDAG;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -117,12 +114,7 @@ public class TestDAGRecovery2 {
     }
   }
 
-  @Before
-  public void setup()  throws Exception {
-    Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
-        .valueOf(new Random().nextInt(100000))));
-    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
-
+  private TezConfiguration createSessionConfig(Path remoteStagingDir) {
     TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
     tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 10);
     tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
@@ -132,6 +124,16 @@ public class TestDAGRecovery2 {
     tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
     tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
     tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
+    return tezConf;
+  }
+
+  @Before
+  public void setup()  throws Exception {
+    Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+        .valueOf(new Random().nextInt(100000))));
+    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+    TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
 
     AMConfiguration amConfig = new AMConfiguration(
         new HashMap<String, String>(), new HashMap<String, LocalResource>(),
@@ -157,16 +159,21 @@ public class TestDAGRecovery2 {
   }
 
   void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
-    TezSessionStatus status = tezSession.getSessionStatus();
+    runDAGAndVerify(dag, finalState, tezSession);
+  }
+
+  void runDAGAndVerify(DAG dag, DAGStatus.State finalState,
+                       TezSession session) throws Exception {
+    TezSessionStatus status = session.getSessionStatus();
     while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN) {
       LOG.info("Waiting for session to be ready. Current: " + status);
       Thread.sleep(100);
-      status = tezSession.getSessionStatus();
+      status = session.getSessionStatus();
     }
     if (status == TezSessionStatus.SHUTDOWN) {
       throw new TezUncheckedException("Unexpected Session shutdown");
     }
-    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGClient dagClient = session.submitDAG(dag);
     DAGStatus dagStatus = dagClient.getDAGStatus(null);
     while (!dagStatus.isCompleted()) {
       LOG.info("Waiting for dag to complete. Sleeping for 500ms."
@@ -193,4 +200,26 @@ public class TestDAGRecovery2 {
     runDAGAndVerify(dag, State.FAILED);
   }
 
+  @Test(timeout=120000)
+  public void testSessionDisableMultiAttempts() throws Exception {
+    tezSession.stop();
+    Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+        .valueOf(new Random().nextInt(100000))));
+    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+    TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
+    tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+    AMConfiguration amConfig = new AMConfiguration(
+        new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+        tezConf, null);
+    TezSessionConfiguration tezSessionConfig =
+        new TezSessionConfiguration(amConfig, tezConf);
+    TezSession session = new TezSession("TestDAGRecovery2SingleAttemptOnly", tezSessionConfig);
+    session.start();
+
+    // DAG should fail as it never completes on the first attempt
+    DAG dag = MultiAttemptDAG.createDAG("TestSingleAttemptDAG", null);
+    runDAGAndVerify(dag, State.FAILED, session);
+  }
+
+
 }