You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2019/10/08 20:41:11 UTC

[samza] branch master updated: Samza-2330: Handle expired resource request for Container allocator when host affinity is disabled (#1170)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ae3450  Samza-2330: Handle expired resource request for Container allocator when host affinity is disabled	 (#1170)
2ae3450 is described below

commit 2ae34502e3526cbb8e275d87c26c7bc4ce2d8ed4
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Tue Oct 8 13:41:06 2019 -0700

    Samza-2330: Handle expired resource request for Container allocator when host affinity is disabled	 (#1170)
    
    * Adding expiry check for unresponsive Cluster Manager when host affinity is off
    
    * Fixing after rebase
    
    * Addressing Ray's feedback
    
    * Updating javadocs
    
    * Nitpick improvements
---
 .../versioned/jobs/samza-configurations.md         |  2 +-
 .../samza/clustermanager/ContainerAllocator.java   | 19 ++++----
 .../TestContainerAllocatorWithHostAffinity.java    | 12 ++---
 .../TestContainerAllocatorWithoutHostAffinity.java | 55 +++++++++++++++++++++-
 .../TestContainerProcessManager.java               |  2 +-
 5 files changed, 70 insertions(+), 20 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index a247460..4ccc02b 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -299,7 +299,7 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
 |cluster-manager.container.fail.job.after.retries|true|This configuration sets the behavior of the job after all `cluster-manager.container.retry.count`s are exhausted and each retry is within the `cluster-manager.container.retry.window.ms` period on any single container. If set to true, the whole job will fail if any container fails after the last retry. If set to false, the job will continue to run without the failed container. The typical use cases of setting this to false is to aid i [...]
 |cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor of `job.jmx.enabled`|
 |cluster-manager.allocator.sleep.ms|3600|The container allocator thread is responsible for matching requests to allocated containers. The sleep interval for this thread is configured using this property.|
-|cluster-manager.container.request.timeout.ms|5000|The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. This property determines the number of milliseconds before a container request is considered to have expired / timed-out. When a request expires, it gets allocated to any available container that was returned by the cluster manager.|
+|cluster-manager.container.request.timeout.ms|5000|The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. If no resource is obtained after cluster-manager.container.request.timeout.ms the request is declared to be expired.. When a request expires, it gets allocated to any available container that was returned by the cluster manager if none is available the existing resource [...]
 |task.execute|bin/run-container.sh|The command that starts a Samza container. The script must be included in the [job package](./packaging.html). There is usually no need to customize this.|
 |task.java.home| |The JAVA_HOME path for Samza containers. By setting this property, you can use a java version that is different from your cluster's java version. Remember to set the `yarn.am.java.home` as well.|
 |yarn.am.container.<br>memory.mb|1024|Each Samza job when running in Yarn has one special container, the [ApplicationMaster](../yarn/application-master.html) (AM), which manages the execution of the job. This property determines how much memory, in megabytes, to request from YARN for running the ApplicationMaster.|
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 5df5cd7..361d1eb 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -56,10 +56,10 @@ import org.slf4j.LoggerFactory;
  *    When host-affinity is disabled, the resource-request's preferredHost param is set to {@link ResourceRequestState#ANY_HOST}
  *  </li>
  *  <li>
- *    When host-affinity is enabled and a preferred resource has not been obtained after {@code requestExpiryTimeout}
- *    milliseconds of the request being made, the resource is declared expired. The expired request are handled by
- *    allocating them to *ANY* allocated resource if available. If no surplus resources are available the current preferred
- *    resource-request is cancelled and resource-request for ANY_HOST is issued
+ *    When the preferred resource has not been obtained after {@code requestExpiryTimeout} milliseconds of the request
+ *    being made, the resource is declared expired. Expired request are handled by allocating them to *ANY*
+ *    allocated resource if available. If no surplus resources are available the current preferred resource-request
+ *    is cancelled and resource-request for ANY_HOST is issued
  *  </li>
  *  <li>
  *    When host-affinity is not enabled, this periodically wakes up to assign a processor to *ANY* allocated resource.
@@ -219,9 +219,7 @@ public class ContainerAllocator implements Runnable {
 
         if (expired) {
           updateExpiryMetrics(request);
-          if (hostAffinityEnabled) {
-            handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request);
-          }
+          handleExpiredRequest(processorId, preferredHost, request);
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
                   + "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
@@ -233,11 +231,12 @@ public class ContainerAllocator implements Runnable {
   }
 
   /**
-   * Handles an expired resource request when {@code hostAffinityEnabled} is true, in this case since the
-   * preferred host, we try to see if a surplus ANY_HOST is available in the request queue.
+   * Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
+   * this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
+   * issues an ANY_HOST request. This behavior holds regardless of host-affinity enabled or not.
    */
   @VisibleForTesting
-  void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
+  void handleExpiredRequest(String processorId, String preferredHost,
       SamzaResourceRequest request) {
     boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST);
     if (standbyContainerManager.isPresent()) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index bb9cb79..6bc4e49 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -410,8 +410,8 @@ public class TestContainerAllocatorWithHostAffinity {
     // Request Preferred Resources
     spyAllocator.requestResources(new HashMap<String, String>() {
       {
-        put("0", "abc");
-        put("1", "def");
+        put("0", "hostname-0");
+        put("1", "hostname-1");
       }
     });
 
@@ -425,9 +425,9 @@ public class TestContainerAllocatorWithHostAffinity {
     // Verify that all the request that were created as preferred host requests expired
     assertTrue(state.preferredHostRequests.get() == 2);
     assertTrue(state.expiredPreferredHostRequests.get() == 2);
-    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("abc"),
+    verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
         any(SamzaResourceRequest.class));
-    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("def"),
+    verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
         any(SamzaResourceRequest.class));
 
     // Verify that preferred host request were cancelled and since no surplus resources were available
@@ -470,9 +470,9 @@ public class TestContainerAllocatorWithHostAffinity {
 
     // Verify that all the request that were created as preferred host requests expired
     assertTrue(state.expiredPreferredHostRequests.get() == 2);
-    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("abc"),
+    verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"), eq("abc"),
         any(SamzaResourceRequest.class));
-    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("def"),
+    verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"), eq("def"),
         any(SamzaResourceRequest.class));
 
     // Verify that runStreamProcessor was invoked with already available ANY_HOST requests
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index 16eac0b..bbccbcb 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobModelManager;
@@ -87,6 +88,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
         put("cluster-manager.container.count", "1");
         put("cluster-manager.container.retry.count", "1");
         put("cluster-manager.container.retry.window.ms", "1999999999");
+        put("cluster-manager.container.request.timeout.ms", "3");
         put("cluster-manager.allocator.sleep.ms", "10");
         put("cluster-manager.container.memory.mb", "512");
         put("yarn.package.path", "/foo");
@@ -282,12 +284,61 @@ public class TestContainerAllocatorWithoutHostAffinity {
     resourceRequestCaptor.getAllValues()
         .forEach(resourceRequest -> assertEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST));
     assertTrue(state.anyHostRequests.get() == containersToHostMapping.size());
-    // Expiry currently is only handled for host affinity enabled cases
-    verify(spyAllocator, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
+    // Expiry currently should not be invoked
+    verify(spyAllocator, never()).handleExpiredRequest(anyString(), anyString(),
         any(SamzaResourceRequest.class));
     // Only updated when host affinity is enabled
     assertTrue(state.matchedResourceRequests.get() == 0);
     assertTrue(state.preferredHostRequests.get() == 0);
     spyAllocator.stop();
   }
+
+  @Test
+  public void testExpiredRequestAllocationOnAnyHost() throws Exception {
+    MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
+    spyAllocator = Mockito.spy(
+        new ContainerAllocator(spyManager, config, state, false, Optional.empty()));
+
+    // Request Resources
+    spyAllocator.requestResources(new HashMap<String, String>() {
+      {
+        put("0", "host-0");
+        put("1", "host-1");
+      }
+    });
+
+    spyThread = new Thread(spyAllocator);
+    // Start the container allocator thread periodic assignment
+    spyThread.start();
+
+    // Let the request expire, expiration timeout is 3 ms
+    Thread.sleep(20);
+
+    // Verify that all the request that were created as ANY_HOST host
+    // and all created requests expired
+    assertEquals(state.preferredHostRequests.get(), 0);
+    // Atleast 2 requests should expire & 2 ANY_HOST requests should be generated
+    assertTrue(state.anyHostRequests.get() >= 4);
+    assertTrue(state.expiredAnyHostRequests.get() >= 2);
+
+    verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("0"), eq(ResourceRequestState.ANY_HOST),
+        any(SamzaResourceRequest.class));
+    verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("1"), eq(ResourceRequestState.ANY_HOST),
+        any(SamzaResourceRequest.class));
+
+    // Verify that preferred host request were cancelled and since no surplus resources were available
+    // requestResource was invoked with ANY_HOST requests
+    ArgumentCaptor<SamzaResourceRequest> cancelledRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
+    // At least 2 preferred host requests were cancelled
+    verify(spyManager, atLeast(2)).cancelResourceRequest(cancelledRequestCaptor.capture());
+    // Verify all the request cancelled were ANY_HOST
+    assertTrue(cancelledRequestCaptor.getAllValues()
+        .stream()
+        .map(resourceRequest -> resourceRequest.getPreferredHost())
+        .collect(Collectors.toSet())
+        .size() == 1);
+    containerAllocator.stop();
+
+  }
+
 }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index d0ea463..827bddf 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -71,7 +71,7 @@ public class TestContainerProcessManager {
       put("cluster-manager.container.retry.count", "1");
       put("cluster-manager.container.retry.window.ms", "1999999999");
       put("cluster-manager.allocator.sleep.ms", "1");
-      put("cluster-manager.container.request.timeout.ms", "2");
+      put("cluster-manager.container.request.timeout.ms", "100");
       put("cluster-manager.container.memory.mb", "512");
       put("yarn.package.path", "/foo");
       put("task.inputs", "test-system.test-stream");