You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/09/01 22:26:32 UTC

[samza] branch master updated: SAMZA-2585: Modify shutdown sequence to handle orphaned AMs (#1422)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new a66fcd1  SAMZA-2585: Modify shutdown sequence to handle orphaned AMs (#1422)
a66fcd1 is described below

commit a66fcd16f5b9ed163798be67afb3f3caba6360f0
Author: Pawas Chhokra <pa...@gmail.com>
AuthorDate: Tue Sep 1 15:26:21 2020 -0700

    SAMZA-2585: Modify shutdown sequence to handle orphaned AMs (#1422)
    
    Changes: Modified the shutdown sequence inside YarnClusterResourceManager in order to bring down the AM when the NM on which it resides dies. This prevents the existence of orphaned AMs.
    API Changes: None
    Tests: Tested the change by manually bringing down the NM on which the AM resided with a yarn job deploy and LXC
    Upgrade Instructions: None
    Usage Instructions: None
---
 .../samza/job/yarn/YarnClusterResourceManager.java |  9 ++-
 .../job/yarn/SamzaYarnAppMasterLifecycle.scala     | 16 +++-
 .../job/yarn/TestYarnClusterResourceManager.java   | 94 ++++++++++++++++++----
 3 files changed, 97 insertions(+), 22 deletions(-)

diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 46507d9..7a971a9 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -386,13 +386,14 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
    */
   @Override
   public void stop(SamzaApplicationState.SamzaAppStatus status) {
-    log.info("Stopping AM client.");
+    log.info("Stopping the AM client on shutdown request.");
     lifecycle.onShutdown(status);
     amClient.stop();
-    log.info("Stopping the AM service.");
+    log.info("Stopping the NM client on shutdown request.");
     nmClientAsync.stop();
-    log.info("Stopping the NM service.");
+    log.info("Stopping the SamzaYarnAppMasterService service on shutdown request.");
     service.onShutdown();
+    log.info("Stopping SamzaAppMasterMetrics on shutdown request.");
     metrics.stop();
 
     if (status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
@@ -485,7 +486,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   //nodes being updated. We always return 0 when asked for progress by Yarn.
   @Override
   public void onShutdownRequest() {
-    //not implemented currently.
+    stop(SamzaApplicationState.SamzaAppStatus.FAILED);
   }
 
   @Override
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
index c9c1e18..5c0dfac 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
@@ -19,9 +19,12 @@
 
 package org.apache.samza.job.yarn
 
+import java.io.IOException
+
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.apache.hadoop.yarn.exceptions.{InvalidApplicationMasterRequestException, YarnException}
 import org.apache.samza.SamzaException
 import org.apache.samza.clustermanager.SamzaApplicationState
 import SamzaApplicationState.SamzaAppStatus
@@ -68,8 +71,17 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     //allowing the RM to restart it (potentially on a different host)
     if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
       info("Unregistering AM from the RM.")
-      amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
-      info("Unregister complete.")
+      try {
+        amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+        info("Unregister complete.")
+      } catch {
+        case ex: InvalidApplicationMasterRequestException =>
+          // Once the NM dies, the corresponding app attempt ID is removed from the RM cache so that the RM can spin up a new AM and its containers.
+          // Hence, this throws InvalidApplicationMasterRequestException since that AM is unregistered with the RM already.
+          info("Removed application attempt from RM cache because the AM died. Unregister complete.")
+        case ex @ (_ : YarnException | _ : IOException) =>
+          error("Caught an exception while trying to unregister AM. Trying to stop other components.", ex)
+      }
     }
     else {
       info("Not unregistering AM from the RM. This will enable RM retries")
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
index 0898800..1ed1d09 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -19,11 +19,13 @@
 
 package org.apache.samza.job.yarn;
 
+import java.io.IOException;
 import java.time.Duration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -31,16 +33,22 @@ import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.samza.clustermanager.ClusterResourceManager;
+import org.apache.samza.clustermanager.SamzaApplicationState;
 import org.apache.samza.clustermanager.SamzaResource;
 import org.apache.samza.config.Config;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.*;
 
+
 public class TestYarnClusterResourceManager {
 
   @Test
@@ -58,24 +66,19 @@ public class TestYarnClusterResourceManager {
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
 
     // start the cluster manager
-    YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
-        callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
-
-    yarnAppState.pendingProcessors.put(String.valueOf(samzaContainerId),
-        new YarnContainer(Container.newInstance(
-            ContainerId.newContainerId(
-                ApplicationAttemptId.newInstance(
-                    ApplicationId.newInstance(10000L, 1), 1), 1),
-            NodeId.newInstance("host1", 8088), "http://host1",
-            Resource.newInstance(1024, 1), Priority.newInstance(1),
-            Token.newInstance("id".getBytes(), "read", "password".getBytes(), "service"))));
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
+
+    yarnAppState.pendingProcessors.put(String.valueOf(samzaContainerId), new YarnContainer(Container.newInstance(
+        ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000L, 1), 1), 1),
+        NodeId.newInstance("host1", 8088), "http://host1", Resource.newInstance(1024, 1), Priority.newInstance(1),
+        Token.newInstance("id".getBytes(), "read", "password".getBytes(), "service"))));
 
     yarnClusterResourceManager.start();
     assertEquals(1, yarnAppState.pendingProcessors.size());
 
-    yarnClusterResourceManager.onStartContainerError(ContainerId.newContainerId(
-        ApplicationAttemptId.newInstance(
-            ApplicationId.newInstance(10000L, 1), 1), 1),
+    yarnClusterResourceManager.onStartContainerError(ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000L, 1), 1), 1),
         new Exception());
 
     assertEquals(0, yarnAppState.pendingProcessors.size());
@@ -95,12 +98,71 @@ public class TestYarnClusterResourceManager {
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
 
     // start the cluster manager
-    YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
-        callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
 
     SamzaResource allocatedResource = mock(SamzaResource.class);
     when(allocatedResource.getTimestamp()).thenReturn(System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
 
     Assert.assertTrue(yarnClusterResourceManager.isResourceExpired(allocatedResource));
   }
+
+  @Test
+  public void testAMShutdownOnRMCallback() throws IOException, YarnException {
+    // create mocks
+    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+    Config config = mock(Config.class);
+    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+    SamzaYarnAppMasterLifecycle lifecycle = Mockito.spy(new SamzaYarnAppMasterLifecycle(512, 2, mock(SamzaApplicationState.class), yarnAppState, asyncClient));
+    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+    // start the cluster manager
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
+
+    yarnClusterResourceManager.onShutdownRequest();
+
+    verify(lifecycle, times(1)).onShutdown(SamzaApplicationState.SamzaAppStatus.FAILED);
+    verify(asyncClient, times(1)).unregisterApplicationMaster(FinalApplicationStatus.FAILED, null, null);
+    verify(asyncClient, times(1)).stop();
+    verify(asyncNMClient, times(1)).stop();
+    verify(service, times(1)).onShutdown();
+    verify(metrics, times(1)).stop();
+  }
+
+  @Test
+  public void testAMShutdownThrowingExceptionOnRMCallback() throws IOException, YarnException {
+    // create mocks
+    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+    Config config = mock(Config.class);
+    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+    SamzaYarnAppMasterLifecycle lifecycle = Mockito.spy(new SamzaYarnAppMasterLifecycle(512, 2, mock(SamzaApplicationState.class), yarnAppState, asyncClient));
+    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+    doThrow(InvalidApplicationMasterRequestException.class).when(asyncClient).unregisterApplicationMaster(FinalApplicationStatus.FAILED, null, null);
+
+    // start the cluster manager
+    YarnClusterResourceManager yarnClusterResourceManager =
+        new YarnClusterResourceManager(asyncClient, asyncNMClient, callback, yarnAppState, lifecycle, service, metrics,
+            yarnConfiguration, config);
+
+    yarnClusterResourceManager.onShutdownRequest();
+
+    verify(lifecycle, times(1)).onShutdown(SamzaApplicationState.SamzaAppStatus.FAILED);
+    verify(asyncClient, times(1)).unregisterApplicationMaster(FinalApplicationStatus.FAILED, null, null);
+    verify(asyncClient, times(1)).stop();
+    verify(asyncNMClient, times(1)).stop();
+    verify(service, times(1)).onShutdown();
+    verify(metrics, times(1)).stop();
+  }
 }
\ No newline at end of file