You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/05/28 23:43:00 UTC
git commit: TEZ-1158. Disable multiple AM attempts if recovery is
disabled. (hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master ae267de54 -> b3a9ec0e7
TEZ-1158. Disable multiple AM attempts if recovery is disabled. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b3a9ec0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b3a9ec0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b3a9ec0e
Branch: refs/heads/master
Commit: b3a9ec0e7026ec3e5aea26a499dee5b7824c310e
Parents: ae267de
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed May 28 14:42:41 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed May 28 14:42:41 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/client/TezSession.java | 8 +++
.../org/apache/tez/test/TestDAGRecovery2.java | 53 +++++++++++++++-----
2 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3a9ec0e/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index 5950c2e..a085774 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -129,6 +129,14 @@ public class TezSession {
sessionConfig.getTezConfiguration(), applicationId,
null, sessionName, sessionConfig.getAMConfiguration(),
tezJarResources, sessionCredentials);
+
+ // Set Tez Sessions to not retry on AM crashes if recovery is disabled
+ if (!sessionConfig.getAMConfiguration().getAMConf().getBoolean(
+ TezConfiguration.DAG_RECOVERY_ENABLED,
+ TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+ appContext.setMaxAppAttempts(1);
+ }
+
yarnClient.submitApplication(appContext);
} catch (YarnException e) {
throw new TezException(e);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3a9ec0e/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index d6d78ee..b1eeea4 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -32,7 +32,6 @@ import org.apache.tez.client.TezSession;
import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -40,8 +39,6 @@ import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatus.State;
import org.apache.tez.test.dag.MultiAttemptDAG;
-import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
-import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
import org.apache.tez.test.dag.SimpleVTestDAG;
import org.junit.After;
import org.junit.AfterClass;
@@ -117,12 +114,7 @@ public class TestDAGRecovery2 {
}
}
- @Before
- public void setup() throws Exception {
- Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
- .valueOf(new Random().nextInt(100000))));
- TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
-
+ private TezConfiguration createSessionConfig(Path remoteStagingDir) {
TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 10);
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
@@ -132,6 +124,16 @@ public class TestDAGRecovery2 {
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
+ return tezConf;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+ .valueOf(new Random().nextInt(100000))));
+ TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+ TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
AMConfiguration amConfig = new AMConfiguration(
new HashMap<String, String>(), new HashMap<String, LocalResource>(),
@@ -157,16 +159,21 @@ public class TestDAGRecovery2 {
}
void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
- TezSessionStatus status = tezSession.getSessionStatus();
+ runDAGAndVerify(dag, finalState, tezSession);
+ }
+
+ void runDAGAndVerify(DAG dag, DAGStatus.State finalState,
+ TezSession session) throws Exception {
+ TezSessionStatus status = session.getSessionStatus();
while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN) {
LOG.info("Waiting for session to be ready. Current: " + status);
Thread.sleep(100);
- status = tezSession.getSessionStatus();
+ status = session.getSessionStatus();
}
if (status == TezSessionStatus.SHUTDOWN) {
throw new TezUncheckedException("Unexpected Session shutdown");
}
- DAGClient dagClient = tezSession.submitDAG(dag);
+ DAGClient dagClient = session.submitDAG(dag);
DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
LOG.info("Waiting for dag to complete. Sleeping for 500ms."
@@ -193,4 +200,26 @@ public class TestDAGRecovery2 {
runDAGAndVerify(dag, State.FAILED);
}
+ @Test(timeout=120000)
+ public void testSessionDisableMultiAttempts() throws Exception {
+ tezSession.stop();
+ Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+ .valueOf(new Random().nextInt(100000))));
+ TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+ TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
+ tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+ AMConfiguration amConfig = new AMConfiguration(
+ new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+ tezConf, null);
+ TezSessionConfiguration tezSessionConfig =
+ new TezSessionConfiguration(amConfig, tezConf);
+ TezSession session = new TezSession("TestDAGRecovery2SingleAttemptOnly", tezSessionConfig);
+ session.start();
+
+ // DAG should fail as it never completes on the first attempt
+ DAG dag = MultiAttemptDAG.createDAG("TestSingleAttemptDAG", null);
+ runDAGAndVerify(dag, State.FAILED, session);
+ }
+
+
}