You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/09/01 22:26:32 UTC
[samza] branch master updated: SAMZA-2585: Modify shutdown sequence
to handle orphaned AMs (#1422)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new a66fcd1 SAMZA-2585: Modify shutdown sequence to handle orphaned AMs (#1422)
a66fcd1 is described below
commit a66fcd16f5b9ed163798be67afb3f3caba6360f0
Author: Pawas Chhokra <pa...@gmail.com>
AuthorDate: Tue Sep 1 15:26:21 2020 -0700
SAMZA-2585: Modify shutdown sequence to handle orphaned AMs (#1422)
Changes: Modified the shutdown sequence inside YarnClusterResourceManager in order to bring down the AM when the NM on which it resides dies. This prevents the existence of orphaned AMs.
API Changes: None
Tests: Tested the change by manually bringing down the NM on which the AM resided with a yarn job deploy and LXC
Upgrade Instructions: None
Usage Instructions: None
---
.../samza/job/yarn/YarnClusterResourceManager.java | 9 ++-
.../job/yarn/SamzaYarnAppMasterLifecycle.scala | 16 +++-
.../job/yarn/TestYarnClusterResourceManager.java | 94 ++++++++++++++++++----
3 files changed, 97 insertions(+), 22 deletions(-)
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 46507d9..7a971a9 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -386,13 +386,14 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
*/
@Override
public void stop(SamzaApplicationState.SamzaAppStatus status) {
- log.info("Stopping AM client.");
+ log.info("Stopping the AM client on shutdown request.");
lifecycle.onShutdown(status);
amClient.stop();
- log.info("Stopping the AM service.");
+ log.info("Stopping the NM client on shutdown request.");
nmClientAsync.stop();
- log.info("Stopping the NM service.");
+ log.info("Stopping the SamzaYarnAppMasterService service on shutdown request.");
service.onShutdown();
+ log.info("Stopping SamzaAppMasterMetrics on shutdown request.");
metrics.stop();
if (status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
@@ -485,7 +486,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
//nodes being updated. We always return 0 when asked for progress by Yarn.
@Override
public void onShutdownRequest() {
- //not implemented currently.
+ stop(SamzaApplicationState.SamzaAppStatus.FAILED);
}
@Override
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
index c9c1e18..5c0dfac 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
@@ -19,9 +19,12 @@
package org.apache.samza.job.yarn
+import java.io.IOException
+
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.apache.hadoop.yarn.exceptions.{InvalidApplicationMasterRequestException, YarnException}
import org.apache.samza.SamzaException
import org.apache.samza.clustermanager.SamzaApplicationState
import SamzaApplicationState.SamzaAppStatus
@@ -68,8 +71,17 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
//allowing the RM to restart it (potentially on a different host)
if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
info("Unregistering AM from the RM.")
- amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
- info("Unregister complete.")
+ try {
+ amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+ info("Unregister complete.")
+ } catch {
+ case ex: InvalidApplicationMasterRequestException =>
+ // Once the NM dies, the corresponding app attempt ID is removed from the RM cache so that the RM can spin up a new AM and its containers.
+ // Hence, this throws InvalidApplicationMasterRequestException since that AM is unregistered with the RM already.
+ info("Removed application attempt from RM cache because the AM died. Unregister complete.")
+ case ex @ (_ : YarnException | _ : IOException) =>
+ error("Caught an exception while trying to unregister AM. Trying to stop other components.", ex)
+ }
}
else {
info("Not unregistering AM from the RM. This will enable RM retries")
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
index 0898800..1ed1d09 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -19,11 +19,13 @@
package org.apache.samza.job.yarn;
+import java.io.IOException;
import java.time.Duration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -31,16 +33,22 @@ import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.samza.clustermanager.ClusterResourceManager;
+import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.SamzaResource;
import org.apache.samza.config.Config;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.*;
+
public class TestYarnClusterResourceManager {
@Test
@@ -58,24 +66,19 @@ public class TestYarnClusterResourceManager {
ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
// start the cluster manager
- YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
- callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
-
- yarnAppState.pendingProcessors.put(String.valueOf(samzaContainerId),
- new YarnContainer(Container.newInstance(
- ContainerId.newContainerId(
- ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(10000L, 1), 1), 1),
- NodeId.newInstance("host1", 8088), "http://host1",
- Resource.newInstance(1024, 1), Priority.newInstance(1),
- Token.newInstance("id".getBytes(), "read", "password".getBytes(), "service"))));
+ YarnClusterResourceManager yarnClusterResourceManager =
+ new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+ yarnConfiguration, config);
+
+ yarnAppState.pendingProcessors.put(String.valueOf(samzaContainerId), new YarnContainer(Container.newInstance(
+ ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000L, 1), 1), 1),
+ NodeId.newInstance("host1", 8088), "http://host1", Resource.newInstance(1024, 1), Priority.newInstance(1),
+ Token.newInstance("id".getBytes(), "read", "password".getBytes(), "service"))));
yarnClusterResourceManager.start();
assertEquals(1, yarnAppState.pendingProcessors.size());
- yarnClusterResourceManager.onStartContainerError(ContainerId.newContainerId(
- ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(10000L, 1), 1), 1),
+ yarnClusterResourceManager.onStartContainerError(ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000L, 1), 1), 1),
new Exception());
assertEquals(0, yarnAppState.pendingProcessors.size());
@@ -95,12 +98,71 @@ public class TestYarnClusterResourceManager {
ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
// start the cluster manager
- YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
- callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
+ YarnClusterResourceManager yarnClusterResourceManager =
+ new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+ yarnConfiguration, config);
SamzaResource allocatedResource = mock(SamzaResource.class);
when(allocatedResource.getTimestamp()).thenReturn(System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
Assert.assertTrue(yarnClusterResourceManager.isResourceExpired(allocatedResource));
}
+
+ @Test
+ public void testAMShutdownOnRMCallback() throws IOException, YarnException {
+ // create mocks
+ YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+ SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+ Config config = mock(Config.class);
+ AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+ YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+ SamzaYarnAppMasterLifecycle lifecycle = Mockito.spy(new SamzaYarnAppMasterLifecycle(512, 2, mock(SamzaApplicationState.class), yarnAppState, asyncClient));
+ SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+ NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+ ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+ // start the cluster manager
+ YarnClusterResourceManager yarnClusterResourceManager =
+ new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+ yarnConfiguration, config);
+
+ yarnClusterResourceManager.onShutdownRequest();
+
+ verify(lifecycle, times(1)).onShutdown(SamzaApplicationState.SamzaAppStatus.FAILED);
+ verify(asyncClient, times(1)).unregisterApplicationMaster(FinalApplicationStatus.FAILED, null, null);
+ verify(asyncClient, times(1)).stop();
+ verify(asyncNMClient, times(1)).stop();
+ verify(service, times(1)).onShutdown();
+ verify(metrics, times(1)).stop();
+ }
+
+ @Test
+ public void testAMShutdownThrowingExceptionOnRMCallback() throws IOException, YarnException {
+ // create mocks
+ YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+ SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+ Config config = mock(Config.class);
+ AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+ YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+ SamzaYarnAppMasterLifecycle lifecycle = Mockito.spy(new SamzaYarnAppMasterLifecycle(512, 2, mock(SamzaApplicationState.class), yarnAppState, asyncClient));
+ SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+ NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+ ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+ doThrow(InvalidApplicationMasterRequestException.class).when(asyncClient).unregisterApplicationMaster(FinalApplicationStatus.FAILED, null, null);
+
+ // start the cluster manager
+ YarnClusterResourceManager yarnClusterResourceManager =
+ new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+ yarnConfiguration, config);
+
+ yarnClusterResourceManager.onShutdownRequest();
+
+ verify(lifecycle, times(1)).onShutdown(SamzaApplicationState.SamzaAppStatus.FAILED);
+ verify(asyncClient, times(1)).unregisterApplicationMaster(FinalApplicationStatus.FAILED, null, null);
+ verify(asyncClient, times(1)).stop();
+ verify(asyncNMClient, times(1)).stop();
+ verify(service, times(1)).onShutdown();
+ verify(metrics, times(1)).stop();
+ }
}
\ No newline at end of file