You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/02/01 05:38:21 UTC

samza git commit: SAMZA-1552: Host affinity improvements - Improve matching of hosts to allocated resources

Repository: samza
Updated Branches:
  refs/heads/master 7e68e4b10 -> 03e5026cf


SAMZA-1552: Host affinity improvements - Improve matching of hosts to allocated resources

Author: Jagadish <jv...@linkedin.com>

Reviewers: Prateek <pr...@cs.utexas.edu>

Closes #401 from vjagadish1989/host-affinity-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/03e5026c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/03e5026c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/03e5026c

Branch: refs/heads/master
Commit: 03e5026cf56d955c203e701c80e28c42d792fbce
Parents: 7e68e4b
Author: Jagadish <jv...@linkedin.com>
Authored: Wed Jan 31 21:38:14 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jan 31 21:38:14 2018 -0800

----------------------------------------------------------------------
 .../clustermanager/ResourceRequestState.java    |  29 +++-
 .../MockHostAwareContainerAllocator.java        |  68 +++++++++
 .../TestContainerProcessManager.java            | 138 ++++++++++++++-----
 .../TestHostAwareContainerAllocator.java        |  40 ++++++
 4 files changed, 234 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/03e5026c/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index 77c192e..fe2067c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -171,7 +171,9 @@ public class ResourceRequestState {
   public void updateStateAfterAssignment(SamzaResourceRequest request, String assignedHost, SamzaResource samzaResource) {
     synchronized (lock) {
       requestsQueue.remove(request);
+      // A reference for the resource could either be held in the preferred host buffer or in the ANY_HOST buffer.
       allocatedResources.get(assignedHost).remove(samzaResource);
+      allocatedResources.get(ANY_HOST).remove(samzaResource);
       if (hostAffinityEnabled) {
         // assignedHost may not always be the preferred host.
         // Hence, we should safely decrement the counter for the preferredHost
@@ -266,18 +268,33 @@ public class ResourceRequestState {
   /**
    * Retrieves, but does not remove, the first allocated resource on the specified host.
    *
-   * @param host  the host for which a resource is needed.
-   * @return      the first {@link SamzaResource} allocated for the specified host or {@code null} if there isn't one.
+   * @param host the host for which a resource is needed.
+   * @return a {@link SamzaResource} allocated for the specified host or {@code null} if there isn't one.
    */
-
   public SamzaResource peekResource(String host) {
     synchronized (lock) {
-      List<SamzaResource> resourcesOnTheHost = this.allocatedResources.get(host);
+      List<SamzaResource> resourcesOnPreferredHostBuffer = this.allocatedResources.get(host);
+      List<SamzaResource> resourcesOnAnyHostBuffer = this.allocatedResources.get(ANY_HOST);
 
-      if (resourcesOnTheHost == null || resourcesOnTheHost.isEmpty()) {
+      // First search for the preferred host buffers
+      if (resourcesOnPreferredHostBuffer != null && !resourcesOnPreferredHostBuffer.isEmpty()) {
+        SamzaResource resource = resourcesOnPreferredHostBuffer.get(0);
+        log.info("Returning a buffered resource: {} for {} from preferred-host buffer.", resource.getResourceID(), host);
+        return resource;
+      } else if (resourcesOnAnyHostBuffer != null && !resourcesOnAnyHostBuffer.isEmpty()) {
+        // If preferred host buffers are empty, scan the ANY_HOST buffer
+        log.debug("No resources on preferred-host buffer. Scanning ANY_HOST buffer");
+        SamzaResource resource = resourcesOnAnyHostBuffer.stream()
+            .filter(resrc -> resrc.getHost().equals(host))
+            .findAny().orElse(null);
+        if (resource != null) {
+          log.info("Returning a buffered resource: {} for {} from ANY_HOST buffer.", resource.getResourceID(), host);
+        }
+        return resource;
+      } else {
+        log.debug("Cannot find any resource in the ANY_HOST buffer for {} because both buffers are empty", host);
         return null;
       }
-      return resourcesOnTheHost.get(0);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/03e5026c/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
new file mode 100644
index 0000000..ea05fff
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.config.Config;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class MockHostAwareContainerAllocator extends HostAwareContainerAllocator {
+  private static final int ALLOCATOR_TIMEOUT_MS = 10000;
+  private Semaphore semaphore = new Semaphore(0);
+
+  public MockHostAwareContainerAllocator(ClusterResourceManager manager,
+      Config config, SamzaApplicationState state) {
+    super(manager, ALLOCATOR_TIMEOUT_MS, config, state);
+  }
+
+  /**
+   * Causes the current thread to block until the expected number of containers have started.
+   *
+   * @param numExpectedContainers the number of containers expected to start
+   * @param timeout the maximum time to wait
+   * @param unit the time unit of the {@code timeout} argument
+   *
+   * @return a boolean that specifies whether containers started within the timeout.
+   * @throws InterruptedException  if the current thread is interrupted while waiting
+   */
+  boolean awaitContainersStart(int numExpectedContainers, long timeout, TimeUnit unit) throws InterruptedException {
+    return semaphore.tryAcquire(numExpectedContainers, timeout, unit);
+  }
+
+  @Override
+  public void requestResources(Map<String, String> containerToHostMappings) {
+    super.requestResources(containerToHostMappings);
+  }
+
+  public ResourceRequestState getContainerRequestState() throws Exception {
+    Field field = AbstractContainerAllocator.class.getDeclaredField("resourceRequestState");
+    field.setAccessible(true);
+
+    return (ResourceRequestState) field.get(this);
+  }
+
+  @Override
+  protected void runStreamProcessor(SamzaResourceRequest request, String preferredHost) {
+    super.runStreamProcessor(request, preferredHost);
+    semaphore.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/03e5026c/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 5c2fe4a..d648a80 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.clustermanager;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -52,7 +53,7 @@ public class TestContainerProcessManager {
 
   private static volatile boolean isRunning = false;
 
-  private Map<String, String> configVals = new HashMap<String, String>()  {
+  private Map<String, String> configVals = new HashMap<String, String>() {
     {
       put("cluster-manager.container.count", "1");
       put("cluster-manager.container.retry.count", "1");
@@ -86,15 +87,16 @@ public class TestContainerProcessManager {
 
   private SamzaApplicationState state = null;
 
-  private JobModelManager getJobModelManagerWithHostAffinity(int containerCount) {
+  private JobModelManager getJobModelManagerWithHostAffinity(Map<String, String> containerIdToHost) {
     Map<String, Map<String, String>> localityMap = new HashMap<>();
-    localityMap.put("0", new HashMap<String, String>() { {
-        put(SetContainerHostMapping.HOST_KEY, "abc");
-      } });
+    containerIdToHost.forEach((containerId, host) -> {
+        localityMap.put(containerId, ImmutableMap.of(SetContainerHostMapping.HOST_KEY, containerIdToHost.get(containerId)));
+      });
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
 
-    return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), containerCount, mockLocalityManager, this.server);
+    return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(),
+        containerIdToHost.size(), mockLocalityManager, this.server);
   }
 
   private JobModelManager getJobModelManagerWithoutHostAffinity(int containerCount) {
@@ -140,7 +142,7 @@ public class TestContainerProcessManager {
     conf.put("cluster-manager.container.memory.mb", "500");
     conf.put("cluster-manager.container.cpu.cores", "5");
 
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(1));
+    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1")));
     taskManager = new ContainerProcessManager(
         new MapConfig(conf),
         state,
@@ -203,7 +205,7 @@ public class TestContainerProcessManager {
     Config conf = getConfig();
     state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
 
-    ContainerProcessManager taskManager =  new ContainerProcessManager(
+    ContainerProcessManager taskManager = new ContainerProcessManager(
         new MapConfig(conf),
         state,
         new MetricsRegistryMap(),
@@ -228,16 +230,16 @@ public class TestContainerProcessManager {
     state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
-            new MapConfig(conf),
-            state,
-            new MetricsRegistryMap(),
+        new MapConfig(conf),
+        state,
+        new MetricsRegistryMap(),
         clusterResourceManager
     );
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
-            conf,
-            state);
+        conf,
+        state);
 
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
 
@@ -250,7 +252,7 @@ public class TestContainerProcessManager {
     assertFalse(taskManager.shouldShutdown());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
 
-    SamzaResource container = new SamzaResource(1, 1024, "abc", "id0");
+    SamzaResource container = new SamzaResource(1, 1024, "host1", "id0");
     taskManager.onResourceAllocated(container);
 
     // Allow container to run and update state
@@ -299,7 +301,7 @@ public class TestContainerProcessManager {
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
 
 
-    SamzaResource container = new SamzaResource(1, 1024, "abc", "id0");
+    SamzaResource container = new SamzaResource(1, 1024, "host1", "id0");
     taskManager.onResourceAllocated(container);
 
     // Allow container to run and update state
@@ -353,16 +355,16 @@ public class TestContainerProcessManager {
     state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
-            new MapConfig(conf),
-            state,
-            new MetricsRegistryMap(),
+        new MapConfig(conf),
+        state,
+        new MetricsRegistryMap(),
         clusterResourceManager
     );
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
-            conf,
-            state);
+        conf,
+        state);
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
 
     Thread thread = new Thread(allocator);
@@ -371,7 +373,7 @@ public class TestContainerProcessManager {
     // Start the task clusterResourceManager
     taskManager.start();
 
-    SamzaResource container = new SamzaResource(1, 1000, "abc", "id1");
+    SamzaResource container = new SamzaResource(1, 1000, "host1", "id1");
     taskManager.onResourceAllocated(container);
 
     // Allow container to run and update state
@@ -391,7 +393,7 @@ public class TestContainerProcessManager {
 
   @Test
   public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(1));
+    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", "host1")));
     Map<String, String> configMap = new HashMap<>();
     configMap.putAll(getConfig());
 
@@ -404,7 +406,7 @@ public class TestContainerProcessManager {
         clusterResourceManager);
 
     manager.start();
-    SamzaResource resource = new SamzaResource(1, 1024, "abc", "resource-1");
+    SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
     state.pendingContainers.put("1", resource);
     Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 1);
     manager.onStreamProcessorLaunchFailure(resource, new Exception("cannot launch container!"));
@@ -413,6 +415,72 @@ public class TestContainerProcessManager {
   }
 
   @Test
+  public void testAllBufferedResourcesAreUtilized() throws Exception {
+    Map<String, String> config = new HashMap<>();
+    config.putAll(getConfigWithHostAffinity());
+    config.put("cluster-manager.container.count", "2");
+    Config cfg = new MapConfig(config);
+    // 1. Request two containers on hosts - host1 and host2
+    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1",
+        "1", "host2")));
+
+    ContainerProcessManager taskManager = new ContainerProcessManager(
+        cfg,
+        state,
+        new MetricsRegistryMap(),
+        clusterResourceManager
+    );
+
+    MockHostAwareContainerAllocator allocator = new MockHostAwareContainerAllocator(
+        clusterResourceManager,
+        cfg,
+        state);
+    getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
+
+    Thread thread = new Thread(allocator);
+    getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
+
+    taskManager.start();
+    assertFalse(taskManager.shouldShutdown());
+    // 2. When the task manager starts, there should have been a pending request on host1 and host2
+    assertEquals(2, allocator.getContainerRequestState().numPendingRequests());
+
+    // 3. Allocate an extra resource on host1 and no resource on host2 yet.
+    SamzaResource resource1 = new SamzaResource(1, 1000, "host1", "id1");
+    SamzaResource resource2 = new SamzaResource(1, 1000, "host1", "id2");
+    taskManager.onResourceAllocated(resource1);
+    taskManager.onResourceAllocated(resource2);
+
+    // 4. Wait for the container to start on host1 and immediately fail
+    if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
+    taskManager.onStreamProcessorLaunchSuccess(resource1);
+    assertEquals("host2", allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
+    assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+
+    taskManager.onResourceCompleted(new SamzaResourceStatus(resource1.getResourceID(), "App Error", 1));
+    assertEquals(2, allocator.getContainerRequestState().numPendingRequests());
+
+    assertFalse(taskManager.shouldShutdown());
+    assertFalse(state.jobHealthy.get());
+    assertEquals(3, clusterResourceManager.resourceRequests.size());
+    assertEquals(0, clusterResourceManager.releasedResources.size());
+
+    // 5. Do not allocate any further resource on host1, and verify that the re-run of the container on host1 uses the
+    // previously allocated extra resource
+    SamzaResource resource3 = new SamzaResource(1, 1000, "host2", "id3");
+    taskManager.onResourceAllocated(resource3);
+
+    if (!allocator.awaitContainersStart(2, 2, TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
+    taskManager.onStreamProcessorLaunchSuccess(resource3);
+
+    assertTrue(state.jobHealthy.get());
+  }
+
+  @Test
   public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception {
     Config conf = getConfig();
 
@@ -421,16 +489,16 @@ public class TestContainerProcessManager {
     state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
 
     ContainerProcessManager taskManager = new ContainerProcessManager(
-            new MapConfig(conf),
-            state,
-            new MetricsRegistryMap(),
+        new MapConfig(conf),
+        state,
+        new MetricsRegistryMap(),
         clusterResourceManager
     );
 
     MockContainerAllocator allocator = new MockContainerAllocator(
         clusterResourceManager,
-            conf,
-            state);
+        conf,
+        state);
     getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
 
     Thread thread = new Thread(allocator);
@@ -441,7 +509,7 @@ public class TestContainerProcessManager {
     assertFalse(taskManager.shouldShutdown());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
 
-    SamzaResource container1 = new SamzaResource(1, 1000, "abc", "id1");
+    SamzaResource container1 = new SamzaResource(1, 1000, "host1", "id1");
     taskManager.onResourceAllocated(container1);
 
     // Allow container to run and update state
@@ -462,7 +530,7 @@ public class TestContainerProcessManager {
     assertEquals(0, clusterResourceManager.releasedResources.size());
     assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
 
-    SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2");
+    SamzaResource container2 = new SamzaResource(1, 1000, "host1", "id2");
     taskManager.onResourceAllocated(container2);
 
     // Allow container to run and update state
@@ -515,7 +583,7 @@ public class TestContainerProcessManager {
     assertFalse(taskManager.shouldShutdown());
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
 
-    SamzaResource container1 = new SamzaResource(1, 1000, "abc", "id1");
+    SamzaResource container1 = new SamzaResource(1, 1000, "host1", "id1");
     taskManager.onResourceAllocated(container1);
 
     // Allow container to run and update state
@@ -525,7 +593,7 @@ public class TestContainerProcessManager {
     assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
     taskManager.onStreamProcessorLaunchSuccess(container1);
     // Create container failure - with ContainerExitStatus.DISKS_FAILED
-    taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL));
+    taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "App error", 1));
 
     // The above failure should trigger a container request
     assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
@@ -535,7 +603,7 @@ public class TestContainerProcessManager {
     assertEquals(0, clusterResourceManager.releasedResources.size());
     assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
 
-    SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2");
+    SamzaResource container2 = new SamzaResource(1, 1000, "host1", "id2");
     taskManager.onResourceAllocated(container2);
 
     // Allow container to run and update state
@@ -545,7 +613,7 @@ public class TestContainerProcessManager {
     taskManager.onStreamProcessorLaunchSuccess(container2);
 
     // Create container failure - with ContainerExitStatus.PREEMPTED
-    taskManager.onResourceCompleted(new SamzaResourceStatus(container2.getResourceID(), "Preemption",  SamzaResourceStatus.PREEMPTED));
+    taskManager.onResourceCompleted(new SamzaResourceStatus(container2.getResourceID(), "Preemption", SamzaResourceStatus.PREEMPTED));
     assertEquals(3, clusterResourceManager.resourceRequests.size());
 
     // The above failure should trigger a container request
@@ -553,7 +621,7 @@ public class TestContainerProcessManager {
     assertFalse(taskManager.shouldShutdown());
     assertFalse(state.jobHealthy.get());
     assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
-    SamzaResource container3 = new SamzaResource(1, 1000, "abc", "id3");
+    SamzaResource container3 = new SamzaResource(1, 1000, "host1", "id3");
     taskManager.onResourceAllocated(container3);
 
     // Allow container to run and update state

http://git-wip-us.apache.org/repos/asf/samza/blob/03e5026c/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 6260b71..c26d727 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -138,6 +138,46 @@ public class TestHostAwareContainerAllocator {
     assertEquals("ID2", requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).get(0).getResourceID());
   }
 
+  /**
+   * Test that extra resources are buffered under ANY_HOST
+   */
+  @Test
+  public void testSurplusResourcesAreBufferedUnderAnyHost() throws Exception {
+    containerAllocator.requestResources(new HashMap<String, String>() {
+      {
+        put("0", "abc");
+        put("1", "xyz");
+      }
+    });
+
+    assertNotNull(requestState.getResourcesOnAHost("abc"));
+    assertEquals(0, requestState.getResourcesOnAHost("abc").size());
+
+    assertNotNull(requestState.getResourcesOnAHost("xyz"));
+    assertEquals(0, requestState.getResourcesOnAHost("xyz").size());
+
+    assertNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
+
+    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID1"));
+    containerAllocator.addResource(new SamzaResource(1, 10, "xyz", "ID2"));
+    // surplus resources for host - "abc"
+    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID3"));
+    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID4"));
+    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID5"));
+    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID6"));
+
+    assertNotNull(requestState.getResourcesOnAHost("abc"));
+    assertEquals(1, requestState.getResourcesOnAHost("abc").size());
+
+    assertNotNull(requestState.getResourcesOnAHost("xyz"));
+    assertEquals(1, requestState.getResourcesOnAHost("xyz").size());
+
+    assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
+    // assert that the surplus resources goto the ANY_HOST buffer
+    assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 4);
+    assertEquals("ID3", requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).get(0).getResourceID());
+  }
+
   @Test
   public void testAllocatorReleasesExtraContainers() throws Exception {
     final SamzaResource resource0 = new SamzaResource(1, 1024, "abc", "id1");