You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2017/01/23 22:48:46 UTC
tez git commit: TEZ-3584. amKeepAliveService in TezClient should
shutdown in case of AM failure. Contributed by Zhiyuan Yang.
Repository: tez
Updated Branches:
refs/heads/master 6d431469b -> b31cf3351
TEZ-3584. amKeepAliveService in TezClient should shutdown in case of AM
failure. Contributed by Zhiyuan Yang.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b31cf335
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b31cf335
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b31cf335
Branch: refs/heads/master
Commit: b31cf33518ca02a3e49743a0d349db293a1144e3
Parents: 6d43146
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jan 23 14:48:26 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jan 23 14:48:26 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/client/TezClient.java | 16 ++++++++--
.../org/apache/tez/client/TestTezClient.java | 33 ++++++++++++++++++++
.../java/org/apache/tez/test/TestTezJobs.java | 2 +-
4 files changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b31cf335/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 303f7f3..d6c4d49 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3584. amKeepAliveService in TezClient should shutdown in case of AM failure.
TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics
TEZ-3579. Wrong configuration key for max slow start fraction in CartesianProductVertexManager.
TEZ-3458. Auto grouping for cartesian product edge(unpartitioned case).
http://git-wip-us.apache.org/repos/asf/tez/blob/b31cf335/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index f4e9f10..65ce0fb 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -484,6 +484,9 @@ public class TezClient {
proxy = waitForProxy();
} catch (InterruptedException e) {
LOG.debug("Interrupted while trying to create a connection to the AM", e);
+ } catch (SessionNotRunning e) {
+ LOG.error("Cannot create a connection to the AM, stopping heartbeat to AM", e);
+ cancelAMKeepAlive(false);
}
}
if (proxy != null) {
@@ -1104,12 +1107,21 @@ public class TezClient {
@VisibleForTesting
@Private
- public synchronized void cancelAMKeepAlive() {
+ public synchronized void cancelAMKeepAlive(boolean shutdownNow) {
if (amKeepAliveService != null) {
- amKeepAliveService.shutdownNow();
+ if (shutdownNow) {
+ amKeepAliveService.shutdownNow();
+ } else {
+ amKeepAliveService.shutdown();
+ }
}
}
+ @VisibleForTesting
+ protected synchronized ScheduledExecutorService getAMKeepAliveService() {
+ return amKeepAliveService;
+ }
+
/**
* A builder for setting up an instance of {@link org.apache.tez.client.TezClient}
*/
http://git-wip-us.apache.org/repos/asf/tez/blob/b31cf335/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index dbbd619..c1f7fd1 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -830,4 +830,37 @@ public class TestTezClient {
}
+ @Test(timeout = 10000)
+ public void testAMHeartbeatFailOnGetAMProxy() throws Exception {
+ int amHeartBeatTimeoutSecs = 3;
+ TezConfiguration conf = new TezConfiguration();
+ conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, amHeartBeatTimeoutSecs);
+
+ final TezClientForTest client = configureAndCreateTezClient(conf);
+ client.callRealGetSessionAMProxy = true;
+ client.start();
+
+ when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+ .thenReturn(YarnApplicationState.FAILED);
+ Thread.sleep(2 * amHeartBeatTimeoutSecs * 1000);
+ assertTrue(client.getAMKeepAliveService().isTerminated());
+ }
+
+ @Test(timeout = 12000)
+ public void testAMHeartbeatFailOnGetAMStatus() throws Exception {
+ int amHeartBeatTimeoutSecs = 3;
+ TezConfiguration conf = new TezConfiguration();
+ conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, amHeartBeatTimeoutSecs);
+
+ final TezClientForTest client = configureAndCreateTezClient(conf);
+ client.start();
+
+ when(client.sessionAmProxy.getAMStatus(any(RpcController.class),
+ any(GetAMStatusRequestProto.class))).thenThrow(new ServiceException("error"));
+ client.callRealGetSessionAMProxy = true;
+ when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+ .thenReturn(YarnApplicationState.FAILED);
+ Thread.sleep(3 * amHeartBeatTimeoutSecs * 1000);
+ assertTrue(client.getAMKeepAliveService().isTerminated());
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b31cf335/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 479509d..5c50a34 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -1210,7 +1210,7 @@ public class TestTezJobs {
tezConf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, 5);
TezClient tezClient = TezClient.create("testAMClientHeartbeatTimeout", tezConf, true);
tezClient.start();
- tezClient.cancelAMKeepAlive();
+ tezClient.cancelAMKeepAlive(true);
ApplicationId appId = tezClient.getAppMasterApplicationId();