You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/08/27 19:57:33 UTC

[GitHub] [samza] PawasChhokra opened a new pull request #1422: SAMZA-2585: Modify shutdown sequence to handle orphaned AMs

PawasChhokra opened a new pull request #1422:
URL: https://github.com/apache/samza/pull/1422


   **Changes**: Modified the shutdown sequence inside YarnClusterResourceManager in order to bring down the AM when the NM on which it resides dies. This prevents the existence of orphaned AMs.
   **API Changes**: None
   **Tests**: Tested the change by manually bringing down the NM on which the AM resided with a yarn job deploy and LXC
   **Upgrade Instructions**: None
   **Usage Instructions**: None


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1422: SAMZA-2585: Modify shutdown sequence to handle orphaned AMs

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1422:
URL: https://github.com/apache/samza/pull/1422#discussion_r480243707



##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -68,7 +69,12 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     //allowing the RM to restart it (potentially on a different host)
     if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
       info("Unregistering AM from the RM.")
-      amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+      try {
+        amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)

Review comment:
       Handled all the exceptions this can throw.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1422: SAMZA-2585: Modify shutdown sequence to handle orphaned AMs

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1422:
URL: https://github.com/apache/samza/pull/1422


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1422: SAMZA-2585: Modify shutdown sequence to handle orphaned AMs

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1422:
URL: https://github.com/apache/samza/pull/1422#discussion_r480367391



##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -68,8 +71,17 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     //allowing the RM to restart it (potentially on a different host)
     if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
       info("Unregistering AM from the RM.")
-      amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
-      info("Unregister complete.")
+      try {
+        amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+        info("Unregister complete.")
+      } catch {
+        case ex: InvalidApplicationMasterRequestException =>
+          // Once the AM dies, the corresponding app attempt ID is removed from the RM cache so that the RM can spin up a new AM and its containers.

Review comment:
       AM isn't dead yet right? Shouldn't this be `once the NM dies` Which brings up another interesting question. Have you validated the behavior when `yarn.nodemanager.recovery.enabled` is set to `true` vs `false`?

##########
File path: samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
##########
@@ -19,28 +19,36 @@
 
 package org.apache.samza.job.yarn;
 
+import java.io.IOException;
 import java.time.Duration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.samza.clustermanager.ClusterResourceManager;
+import org.apache.samza.clustermanager.SamzaApplicationState;
 import org.apache.samza.clustermanager.SamzaResource;
 import org.apache.samza.config.Config;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyObject;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.*;

Review comment:
       nit: avoid `*` imports and import explicitly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1422: SAMZA-2585: Modify shutdown sequence to handle orphaned AMs

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1422:
URL: https://github.com/apache/samza/pull/1422#discussion_r480244117



##########
File path: samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
##########
@@ -103,4 +104,30 @@ public void testAllocatedResourceExpiryForYarn() {
 
     Assert.assertTrue(yarnClusterResourceManager.isResourceExpired(allocatedResource));
   }
+
+  @Test
+  public void testAMShutdownOnRMCallback() {
+    // create mocks
+    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+    Config config = mock(Config.class);
+    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+    SamzaYarnAppMasterLifecycle lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
+    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+    // start the cluster manager
+    YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
+        callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
+
+    yarnClusterResourceManager.onShutdownRequest();
+
+    verify(lifecycle, times(1)).onShutdown(SamzaApplicationState.SamzaAppStatus.FAILED);

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1422: SAMZA-2585: Modify shutdown sequence to handle orphaned AMs

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1422:
URL: https://github.com/apache/samza/pull/1422#discussion_r480596684



##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -68,8 +71,17 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     //allowing the RM to restart it (potentially on a different host)
     if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
       info("Unregistering AM from the RM.")
-      amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
-      info("Unregister complete.")
+      try {
+        amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+        info("Unregister complete.")
+      } catch {
+        case ex: InvalidApplicationMasterRequestException =>
+          // Once the AM dies, the corresponding app attempt ID is removed from the RM cache so that the RM can spin up a new AM and its containers.

Review comment:
       Fixed the log. Also validated with `yarn.nodemanager.recovery.enabled` set to `true`. It works as expected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1422: SAMZA-2585: Modify shutdown sequence to handle orphaned AMs

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1422:
URL: https://github.com/apache/samza/pull/1422#discussion_r478678631



##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -485,7 +485,15 @@ public void onContainersAllocated(List<Container> containers) {
   //nodes being updated. We always return 0 when asked for progress by Yarn.
   @Override
   public void onShutdownRequest() {
-    //not implemented currently.
+    log.info("Stopping the AM client on shutdown request.");
+    lifecycle.onShutdown(SamzaApplicationState.SamzaAppStatus.FAILED);
+    amClient.stop();
+    log.info("Stopping the NM client on shutdown request.");
+    nmClientAsync.stop();
+    log.info("Stopping the SamzaYarnAppMasterService service on shutdown request.");
+    service.onShutdown();
+    log.info("Stopping SamzaAppMasterMetrics on shutdown request.");
+    metrics.stop();

Review comment:
       why not leverage `stop(SamzaAppStatus.FAILED)` up top? I like some additional logs. so maybe modify the `stop(...)` to include this logs for better debuggability.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1422: SAMZA-2585: Modify shutdown sequence to handle orphaned AMs

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1422:
URL: https://github.com/apache/samza/pull/1422#discussion_r478680496



##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -68,7 +69,12 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     //allowing the RM to restart it (potentially on a different host)
     if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
       info("Unregistering AM from the RM.")
-      amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+      try {
+        amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+      } catch {
+        case ex: InvalidApplicationMasterRequestException =>
+          info("Removed application attempt from RM cache.")

Review comment:
       can we add some comments around what this scenario (app attempt removed from RM cache) means and also update the log statement to reflect it? It will be helpful for folks who don't have much context on YARN.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -485,7 +485,15 @@ public void onContainersAllocated(List<Container> containers) {
   //nodes being updated. We always return 0 when asked for progress by Yarn.
   @Override
   public void onShutdownRequest() {
-    //not implemented currently.
+    log.info("Stopping the AM client on shutdown request.");
+    lifecycle.onShutdown(SamzaApplicationState.SamzaAppStatus.FAILED);
+    amClient.stop();
+    log.info("Stopping the NM client on shutdown request.");
+    nmClientAsync.stop();
+    log.info("Stopping the SamzaYarnAppMasterService service on shutdown request.");
+    service.onShutdown();
+    log.info("Stopping SamzaAppMasterMetrics on shutdown request.");
+    metrics.stop();

Review comment:
       why not leverage `stop(SamzaAppStatus.FAILED)` up top?

##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -68,7 +69,12 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     //allowing the RM to restart it (potentially on a different host)
     if(samzaAppStatus != SamzaAppStatus.UNDEFINED) {
       info("Unregistering AM from the RM.")
-      amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)
+      try {
+        amClient.unregisterApplicationMaster(yarnStatus, shutdownMessage, null)

Review comment:
       Can this throw some other exception? What is our expectation if this is unsuccessful? Should we still try to shutdown the rest of the components or just crash the JVM?
   
   I'd prefer former, to handle other kind of exception if any and try to shutdown the AM as clean as possible.

##########
File path: samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
##########
@@ -103,4 +104,30 @@ public void testAllocatedResourceExpiryForYarn() {
 
     Assert.assertTrue(yarnClusterResourceManager.isResourceExpired(allocatedResource));
   }
+
+  @Test
+  public void testAMShutdownOnRMCallback() {
+    // create mocks
+    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+    Config config = mock(Config.class);
+    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+    SamzaYarnAppMasterLifecycle lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
+    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+    // start the cluster manager
+    YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
+        callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
+
+    yarnClusterResourceManager.onShutdownRequest();
+
+    verify(lifecycle, times(1)).onShutdown(SamzaApplicationState.SamzaAppStatus.FAILED);

Review comment:
       should we also verify if unregister is called? Since if we don't unregister, we will end up retrying and its important this code path doesn't get broken when someone modifies to use a different status to invoke `lifecycle.onShutdown(...)`

##########
File path: samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
##########
@@ -103,4 +104,30 @@ public void testAllocatedResourceExpiryForYarn() {
 
     Assert.assertTrue(yarnClusterResourceManager.isResourceExpired(allocatedResource));
   }
+
+  @Test
+  public void testAMShutdownOnRMCallback() {

Review comment:
       Perhaps add cases for unregister throwing exceptions too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org