You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/08/27 20:42:25 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1422: SAMZA-2585: Modify shutdown sequence to handle orphaned AMs

mynameborat commented on a change in pull request #1422:
URL: https://github.com/apache/samza/pull/1422#discussion_r478680496



##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -68,7 +69,12 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     //allowing the RM to restart it (potentially on a different host)
     if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
       info("Unregistering AM from the RM.")
-      amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+      try {
+        amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+      } catch {
+        case ex: InvalidApplicationMasterRequestException =>
+          info("Removed application attempt from RM cache.")

Review comment:
       can we add some comments around what this scenario (app attempt removed from RM cache) means and also update the log statement to reflect it? It will be helpful for folks who don't have much context on YARN.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -485,7 +485,15 @@ public void onContainersAllocated(List<Container> containers) {
   //nodes being updated. We always return 0 when asked for progress by Yarn.
   @Override
   public void onShutdownRequest() {
-    //not implemented currently.
+    log.info("Stopping the AM client on shutdown request.");
+    lifecycle.onShutdown(SamzaApplicationState.SamzaAppStatus.FAILED);
+    amClient.stop();
+    log.info("Stopping the NM client on shutdown request.");
+    nmClientAsync.stop();
+    log.info("Stopping the SamzaYarnAppMasterService service on shutdown request.");
+    service.onShutdown();
+    log.info("Stopping SamzaAppMasterMetrics on shutdown request.");
+    metrics.stop();

Review comment:
       why not leverage `stop(SamzaAppStatus.FAILED)` up top?

##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -68,7 +69,12 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     //allowing the RM to restart it (potentially on a different host)
     if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
       info("Unregistering AM from the RM.")
-      amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+      try {
+        amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)

Review comment:
       Can this throw some other exception? What is our expectation if this is unsuccessful? Should we still try to shutdown the rest of the components or just crash the JVM?
   
   I'd prefer former, to handle other kind of exception if any and try to shutdown the AM as clean as possible.

##########
File path: samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
##########
@@ -103,4 +104,30 @@ public void testAllocatedResourceExpiryForYarn() {
 
     Assert.assertTrue(yarnClusterResourceManager.isResourceExpired(allocatedResource));
   }
+
+  @Test
+  public void testAMShutdownOnRMCallback() {
+    // create mocks
+    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+    Config config = mock(Config.class);
+    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+    SamzaYarnAppMasterLifecycle lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
+    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+    // start the cluster manager
+    YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
+        callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
+
+    yarnClusterResourceManager.onShutdownRequest();
+
+    verify(lifecycle, times(1)).onShutdown(SamzaApplicationState.SamzaAppStatus.FAILED);

Review comment:
       should we also verify if unregister is called? Since if we don't unregister, we will end up retrying and its important this code path doesn't get broken when someone modifies to use a different status to invoke `lifecycle.onShutdown(...)`

##########
File path: samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
##########
@@ -103,4 +104,30 @@ public void testAllocatedResourceExpiryForYarn() {
 
     Assert.assertTrue(yarnClusterResourceManager.isResourceExpired(allocatedResource));
   }
+
+  @Test
+  public void testAMShutdownOnRMCallback() {

Review comment:
       Perhaps add cases for unregister throwing exceptions too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org