You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/08/24 18:59:34 UTC

[samza] branch master updated: SAMZA-2579: Force restart feature for Container Placements (#1414)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 29f2ef7  SAMZA-2579: Force restart feature for Container Placements (#1414)
29f2ef7 is described below

commit 29f2ef7cd9169ce8e0b474228c59a3a1da71e5c2
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Mon Aug 24 11:59:26 2020 -0700

    SAMZA-2579: Force restart feature for Container Placements (#1414)
    
    Changes: The current restart ability for container placements works in the following way:
    
    Tries to fetch resources on a host
    Stops the active container if resources are accrued
    Tried to start the container on host accrued
    In production, we have seen the following observation at Linkedin
    
    Some jobs are configured to use resources for the peak which leads to no headroom left on a host for requesting additional resources
    This leads to restart requests failing due to not able to get resources on that host
    A fix to this is to implement a force-restart utility , in this version we will stop the container first and then accrue resources. The upside being we will at least free up the resources on the host before issuing resource request, the downside being it will be a best-effort scenario to bring that container back up on that host
    
    API Changes: Added new param values to destinationHost param for container placement request message
    
    LAST_SEEN: Tries to restart a container on last seen host with RESERVE -> STOP -> MOVE policy
    
    FORCE_RESTART_LAST_SEEN: Tries to restart a container on last seen host with STOP -> RESERVE -> MOVE policy
---
 .../samza/clustermanager/ContainerManager.java     | 39 ++++++++++++++++++++--
 1 file changed, 37 insertions(+), 2 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 70a050c..2730c0c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -54,6 +54,8 @@ public class ContainerManager {
 
   private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class);
   private static final String ANY_HOST = ResourceRequestState.ANY_HOST;
+  private static final String LAST_SEEN = "LAST_SEEN";
+  private static final String FORCE_RESTART_LAST_SEEN = "FORCE_RESTART_LAST_SEEN";
   private static final int UUID_CACHE_SIZE = 20000;
 
   /**
@@ -366,9 +368,18 @@ public class ContainerManager {
    * Container placement requests are tied to deploymentId which is currently {@link org.apache.samza.config.ApplicationConfig#APP_RUN_ID}
    * On job restarts container placement requests queued for the previous deployment are deleted using this
    *
+   * All kinds of container placement request except for when destination host is "FORCE_RESTART_LAST_SEEN" work with
+   * a RESERVE - STOP - START policy, which means resources are accrued first before issuing a container stop, failure to
+   * do so will leave the running container untouched. Requests with destination host "FORCE_RESTART_LAST_SEEN" works with
+   * STOP - RESERVE - START policy, which means running container is stopped first then resource request are issued, this case
+   * is equivalent to doing a kill -9 on a container
+   *
    * @param requestMessage request containing logical processor id 0,1,2 and host where container is desired to be moved,
-   *                       acceptable values of this param are any valid hostname or "ANY_HOST"(in this case the request
-   *                       is sent to resource manager for any host)
+   *                       acceptable values of this param are
+   *                       - valid hostname
+   *                       - "ANY_HOST" in this case the request is sent to resource manager for any host
+   *                       - "LAST_SEEN" in this case request is sent to resource manager for last seen host
+   *                       - "FORCE_RESTART_LAST_SEEN" in this case request is sent to resource manager for last seen host
    * @param containerAllocator to request physical resources
    */
   public void registerContainerPlacementAction(ContainerPlacementRequestMessage requestMessage, ContainerAllocator containerAllocator) {
@@ -391,6 +402,30 @@ public class ContainerManager {
       return;
     }
 
+    /*
+     * When destination host is {@code FORCE_RESTART_LAST_SEEN} its treated as eqvivalent to kill -9 operation for the container
+     * In this scenario container is stopped first and we fallback to normal restart path so the policy here is
+     * stop - reserve - move
+     */
+    if (destinationHost.equals(FORCE_RESTART_LAST_SEEN)) {
+      LOG.info("Issuing a force restart for Processor ID: {} for ContainerPlacement action request {}", processorId, requestMessage);
+      clusterResourceManager.stopStreamProcessor(samzaApplicationState.runningProcessors.get(processorId));
+      writeContainerPlacementResponseMessage(requestMessage, ContainerPlacementMessage.StatusCode.SUCCEEDED,
+          "Successfully issued a stop container request falling back to normal restart path");
+      return;
+    }
+
+    /**
+     * When destination host is {@code LAST_SEEN} its treated as a restart request on the host where container is running
+     * on or has been seen last, but in this policy would be reserve - stop - move, which means reserve resources first
+     * only if resources are accrued stop the active container and issue a start on it on resource acquired
+     */
+    if (destinationHost.equals(LAST_SEEN)) {
+      String lastSeenHost = getSourceHostForContainer(requestMessage);
+      LOG.info("Changing the requested host for placement action to {} because requested host is LAST_SEEN", lastSeenHost);
+      destinationHost = lastSeenHost;
+    }
+
     // TODO: SAMZA-2457: Allow host affinity disabled jobs to move containers to specific host
     if (!hostAffinityEnabled) {
       LOG.info("Changing the requested host for placement action to {} because host affinity is disabled", ResourceRequestState.ANY_HOST);