You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2019/10/30 23:06:35 UTC

[samza] branch 1.3.0 updated: Revert "Samza-2330: Handle expired resource request for Container allocator when host affinity is disabled"

This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/1.3.0 by this push:
     new 5478bee  Revert "Samza-2330: Handle expired resource request for Container allocator when host affinity is disabled"
5478bee is described below

commit 5478beeb811c599c0f8e1b849a5d6c8533a3b7ba
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Fri Oct 25 11:43:01 2019 -0700

    Revert "Samza-2330: Handle expired resource request for Container allocator when host affinity is disabled"
    
    This reverts commit 2ae34502e3526cbb8e275d87c26c7bc4ce2d8ed4.
---
 .../versioned/jobs/samza-configurations.md         |  2 +-
 .../samza/clustermanager/ContainerAllocator.java   | 16 ++++---
 .../TestContainerAllocatorWithHostAffinity.java    | 10 ++--
 .../TestContainerAllocatorWithoutHostAffinity.java | 55 +---------------------
 .../TestContainerProcessManager.java               |  2 +-
 5 files changed, 18 insertions(+), 67 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 4ccc02b..a247460 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -299,7 +299,7 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
 |cluster-manager.container.fail.job.after.retries|true|This configuration sets the behavior of the job after all `cluster-manager.container.retry.count`s are exhausted and each retry is within the `cluster-manager.container.retry.window.ms` period on any single container. If set to true, the whole job will fail if any container fails after the last retry. If set to false, the job will continue to run without the failed container. The typical use cases of setting this to false is to aid i [...]
 |cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor of `job.jmx.enabled`|
 |cluster-manager.allocator.sleep.ms|3600|The container allocator thread is responsible for matching requests to allocated containers. The sleep interval for this thread is configured using this property.|
-|cluster-manager.container.request.timeout.ms|5000|The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. If no resource is obtained after cluster-manager.container.request.timeout.ms the request is declared to be expired.. When a request expires, it gets allocated to any available container that was returned by the cluster manager if none is available the existing resource [...]
+|cluster-manager.container.request.timeout.ms|5000|The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. This property determines the number of milliseconds before a container request is considered to have expired / timed-out. When a request expires, it gets allocated to any available container that was returned by the cluster manager.|
 |task.execute|bin/run-container.sh|The command that starts a Samza container. The script must be included in the [job package](./packaging.html). There is usually no need to customize this.|
 |task.java.home| |The JAVA_HOME path for Samza containers. By setting this property, you can use a java version that is different from your cluster's java version. Remember to set the `yarn.am.java.home` as well.|
 |yarn.am.container.<br>memory.mb|1024|Each Samza job when running in Yarn has one special container, the [ApplicationMaster](../yarn/application-master.html) (AM), which manages the execution of the job. This property determines how much memory, in megabytes, to request from YARN for running the ApplicationMaster.|
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 361d1eb..89855dc 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -56,10 +56,10 @@ import org.slf4j.LoggerFactory;
  *    When host-affinity is disabled, the resource-request's preferredHost param is set to {@link ResourceRequestState#ANY_HOST}
  *  </li>
  *  <li>
- *    When the preferred resource has not been obtained after {@code requestExpiryTimeout} milliseconds of the request
- *    being made, the resource is declared expired. Expired request are handled by allocating them to *ANY*
- *    allocated resource if available. If no surplus resources are available the current preferred resource-request
- *    is cancelled and resource-request for ANY_HOST is issued
+ *    When host-affinity is enabled and a preferred resource has not been obtained after {@code requestExpiryTimeout}
+ *    milliseconds of the request being made, the resource is declared expired. The expired request are handled by
+ *    allocating them to *ANY* allocated resource if available. If no surplus resources are available the current preferred
+ *    resource-request is cancelled and resource-request for ANY_HOST is issued
  *  </li>
  *  <li>
  *    When host-affinity is not enabled, this periodically wakes up to assign a processor to *ANY* allocated resource.
@@ -219,7 +219,9 @@ public class ContainerAllocator implements Runnable {
 
         if (expired) {
           updateExpiryMetrics(request);
-          handleExpiredRequest(processorId, preferredHost, request);
+          if (hostAffinityEnabled) {
+            handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request);
+          }
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
                   + "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
@@ -233,10 +235,10 @@ public class ContainerAllocator implements Runnable {
   /**
    * Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
    * this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
-   * issues an ANY_HOST request. This behavior holds regardless of host-affinity enabled or not.
+   * issues an ANY_HOST request.
    */
   @VisibleForTesting
-  void handleExpiredRequest(String processorId, String preferredHost,
+  void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
       SamzaResourceRequest request) {
     boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST);
     if (standbyContainerManager.isPresent()) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 823191b..927df89 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -425,9 +425,9 @@ public class TestContainerAllocatorWithHostAffinity {
     // Verify that all the request that were created as preferred host requests expired
     assertTrue(state.preferredHostRequests.get() == 2);
     assertTrue(state.expiredPreferredHostRequests.get() == 2);
-    verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
         any(SamzaResourceRequest.class));
-    verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
         any(SamzaResourceRequest.class));
 
     // Verify that preferred host request were cancelled and since no surplus resources were available
@@ -469,10 +469,10 @@ public class TestContainerAllocatorWithHostAffinity {
     Thread.sleep(100);
 
     // Verify that all the request that were created as preferred host requests expired
-    assertEquals(state.expiredPreferredHostRequests.get(), 2);
-    verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
+    assertTrue(state.expiredPreferredHostRequests.get() == 2);
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
         any(SamzaResourceRequest.class));
-    verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
         any(SamzaResourceRequest.class));
 
     // Verify that runStreamProcessor was invoked with already available ANY_HOST requests
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index f30f800..16eac0b 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobModelManager;
@@ -88,7 +87,6 @@ public class TestContainerAllocatorWithoutHostAffinity {
         put("cluster-manager.container.count", "1");
         put("cluster-manager.container.retry.count", "1");
         put("cluster-manager.container.retry.window.ms", "1999999999");
-        put("cluster-manager.container.request.timeout.ms", "3");
         put("cluster-manager.allocator.sleep.ms", "10");
         put("cluster-manager.container.memory.mb", "512");
         put("yarn.package.path", "/foo");
@@ -284,61 +282,12 @@ public class TestContainerAllocatorWithoutHostAffinity {
     resourceRequestCaptor.getAllValues()
         .forEach(resourceRequest -> assertEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST));
     assertTrue(state.anyHostRequests.get() == containersToHostMapping.size());
-    // Expiry currently should not be invoked
-    verify(spyAllocator, never()).handleExpiredRequest(anyString(), anyString(),
+    // Expiry currently is only handled for host affinity enabled cases
+    verify(spyAllocator, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
         any(SamzaResourceRequest.class));
     // Only updated when host affinity is enabled
     assertTrue(state.matchedResourceRequests.get() == 0);
     assertTrue(state.preferredHostRequests.get() == 0);
     spyAllocator.stop();
   }
-
-  @Test
-  public void testExpiredRequestAllocationOnAnyHost() throws Exception {
-    MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
-    spyAllocator = Mockito.spy(
-        new ContainerAllocator(spyManager, config, state, false, Optional.empty()));
-
-    // Request Resources
-    spyAllocator.requestResources(new HashMap<String, String>() {
-      {
-        put("0", "host-0");
-        put("1", "host-1");
-      }
-    });
-
-    spyThread = new Thread(spyAllocator);
-    // Start the container allocator thread periodic assignment
-    spyThread.start();
-
-    // Let the request expire, expiration timeout is 3 ms
-    Thread.sleep(100);
-
-    // Verify that all the request that were created as ANY_HOST host
-    // and all created requests expired
-    assertEquals(state.preferredHostRequests.get(), 0);
-    // Atleast 2 requests should expire & 2 ANY_HOST requests should be generated
-    assertTrue(state.anyHostRequests.get() >= 4);
-    assertTrue(state.expiredAnyHostRequests.get() >= 2);
-
-    verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("0"), eq(ResourceRequestState.ANY_HOST),
-        any(SamzaResourceRequest.class));
-    verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("1"), eq(ResourceRequestState.ANY_HOST),
-        any(SamzaResourceRequest.class));
-
-    // Verify that preferred host request were cancelled and since no surplus resources were available
-    // requestResource was invoked with ANY_HOST requests
-    ArgumentCaptor<SamzaResourceRequest> cancelledRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
-    // At least 2 preferred host requests were cancelled
-    verify(spyManager, atLeast(2)).cancelResourceRequest(cancelledRequestCaptor.capture());
-    // Verify all the request cancelled were ANY_HOST
-    assertTrue(cancelledRequestCaptor.getAllValues()
-        .stream()
-        .map(resourceRequest -> resourceRequest.getPreferredHost())
-        .collect(Collectors.toSet())
-        .size() == 1);
-    containerAllocator.stop();
-
-  }
-
 }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 1ee68aa..f100393 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -70,7 +70,7 @@ public class TestContainerProcessManager {
       put("cluster-manager.container.retry.count", "1");
       put("cluster-manager.container.retry.window.ms", "1999999999");
       put("cluster-manager.allocator.sleep.ms", "1");
-      put("cluster-manager.container.request.timeout.ms", "100");
+      put("cluster-manager.container.request.timeout.ms", "2");
       put("cluster-manager.container.memory.mb", "512");
       put("yarn.package.path", "/foo");
       put("task.inputs", "test-system.test-stream");