You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2019/11/06 00:08:08 UTC

[samza] branch revert-1180-container-placement-service created (now badf814)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a change to branch revert-1180-container-placement-service
in repository https://gitbox.apache.org/repos/asf/samza.git.


      at badf814  Revert "SAMZA-2340: [Container Placements] Introduce ContainerManager for handling validation for failures & starts of active & standby containers"

This branch includes the following new commits:

     new badf814  Revert "SAMZA-2340: [Container Placements] Introduce ContainerManager for handling validation for failures & starts of active & standby containers"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[samza] 01/01: Revert "SAMZA-2340: [Container Placements] Introduce ContainerManager for handling validation for failures & starts of active & standby containers"

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch revert-1180-container-placement-service
in repository https://gitbox.apache.org/repos/asf/samza.git

commit badf8144bdf6d78d232d58e0c333db8cdc22d98f
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Tue Nov 5 16:08:02 2019 -0800

    Revert "SAMZA-2340: [Container Placements] Introduce ContainerManager for handling validation for failures & starts of active & standby containers"
---
 .../samza/clustermanager/ContainerAllocator.java   |  50 ++++++-
 .../samza/clustermanager/ContainerManager.java     | 164 ---------------------
 .../clustermanager/ContainerProcessManager.java    |  55 ++++---
 .../MockContainerAllocatorWithHostAffinity.java    |   5 +-
 .../MockContainerAllocatorWithoutHostAffinity.java |   9 +-
 .../TestContainerAllocatorWithHostAffinity.java    |  40 +++--
 .../TestContainerAllocatorWithoutHostAffinity.java |  15 +-
 .../TestContainerProcessManager.java               |  53 ++-----
 8 files changed, 129 insertions(+), 262 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 38659f3..89855dc 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Map;
@@ -114,13 +115,13 @@ public class ContainerAllocator implements Runnable {
    */
   private final int requestExpiryTimeout;
 
-  private final ContainerManager containerManager;
+  private final Optional<StandbyContainerManager> standbyContainerManager;
 
   public ContainerAllocator(ClusterResourceManager clusterResourceManager,
       Config config,
       SamzaApplicationState state,
       boolean hostAffinityEnabled,
-      ContainerManager containerManager) {
+      Optional<StandbyContainerManager> standbyContainerManager) {
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     this.clusterResourceManager = clusterResourceManager;
     this.allocatorSleepIntervalMs = clusterManagerConfig.getAllocatorSleepTime();
@@ -131,7 +132,7 @@ public class ContainerAllocator implements Runnable {
     this.state = state;
     this.config = config;
     this.hostAffinityEnabled = hostAffinityEnabled;
-    this.containerManager = containerManager;
+    this.standbyContainerManager = standbyContainerManager;
     this.requestExpiryTimeout = clusterManagerConfig.getContainerRequestTimeout();
   }
 
@@ -203,7 +204,12 @@ public class ContainerAllocator implements Runnable {
           state.matchedResourceRequests.incrementAndGet();
         }
 
-        containerManager.handleContainerLaunch(request, preferredHost, peekAllocatedResource(preferredHost), resourceRequestState, this);
+        // If hot-standby is enabled, check standby constraints are met before launching a processor
+        if (this.standbyContainerManager.isPresent()) {
+          checkStandByContrainsAndRunStreamProcessor(request, preferredHost);
+        } else {
+          runStreamProcessor(request, preferredHost);
+        }
 
       } else {
 
@@ -214,7 +220,7 @@ public class ContainerAllocator implements Runnable {
         if (expired) {
           updateExpiryMetrics(request);
           if (hostAffinityEnabled) {
-            containerManager.handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request, this, resourceRequestState);
+            handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request);
           }
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
@@ -227,6 +233,30 @@ public class ContainerAllocator implements Runnable {
   }
 
   /**
+   * Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
+   * this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
+   * issues an ANY_HOST request.
+   */
+  @VisibleForTesting
+  void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
+      SamzaResourceRequest request) {
+    boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST);
+    if (standbyContainerManager.isPresent()) {
+      standbyContainerManager.get()
+          .handleExpiredResourceRequest(processorId, request,
+              Optional.ofNullable(peekAllocatedResource(ResourceRequestState.ANY_HOST)), this, resourceRequestState);
+    } else if (resourceAvailableOnAnyHost) {
+      LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost);
+      runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+    } else {
+      LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.",
+          processorId, preferredHost);
+      resourceRequestState.cancelResourceRequest(request);
+      requestResource(processorId, ResourceRequestState.ANY_HOST);
+    }
+  }
+
+  /**
    * Updates the request state and runs a processor on the specified host. Assumes a resource
    * is available on the preferred host, so the caller must verify that before invoking this method.
    *
@@ -261,6 +291,16 @@ public class ContainerAllocator implements Runnable {
   }
 
   /**
+   * If {@code StandbyContainerManager} is present check standBy constraints are met before attempting to launch
+   * @param request outstanding request which has an allocated resource
+   * @param preferredHost to run the request
+   */
+  private void checkStandByContrainsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost) {
+    standbyContainerManager.get().checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost,
+        peekAllocatedResource(preferredHost), this, resourceRequestState);
+  }
+
+  /**
    * Called during initial request for resources
    *
    * @param processorToHostMapping A Map of [processorId, hostName], where processorId is the ID of the Samza processor
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
deleted file mode 100644
index 6b29bc8..0000000
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.clustermanager;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.time.Duration;
-import java.util.Optional;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * ContainerManager is a centralized entity that manages control actions like start, stop for both active and standby containers
- * ContainerManager acts as a brain for validating and issuing any actions on containers in the Job Coordinator.
- *
- * The requests to allocate resources resources made by {@link ContainerAllocator} can either expire or succeed.
- * When the requests succeeds the ContainerManager validates those requests before starting the container
- * When the requests expires the ContainerManager decides the next set of actions for the pending request.
- *
- * Callbacks issued from  {@link ClusterResourceManager} aka {@link ContainerProcessManager} are intercepted
- * by ContainerManager to handle container failure and completions for both active and standby containers
- */
-public class ContainerManager {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class);
-
-  /**
-   * Resource-manager, used to stop containers
-   */
-  private final ClusterResourceManager clusterResourceManager;
-  private final SamzaApplicationState samzaApplicationState;
-
-  private final Optional<StandbyContainerManager> standbyContainerManager;
-
-  public ContainerManager(SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      Boolean standByEnabled) {
-    this.samzaApplicationState = samzaApplicationState;
-    this.clusterResourceManager = clusterResourceManager;
-    // Enable standby container manager if required
-    if (standByEnabled) {
-      this.standbyContainerManager =
-          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager));
-    } else {
-      this.standbyContainerManager = Optional.empty();
-    }
-  }
-
-  /**
-   * Handles the container start action for both active & standby containers.
-   *
-   * @param request pending request for the preferred host
-   * @param preferredHost preferred host to start the container
-   * @param allocatedResource resource allocated from {@link ClusterResourceManager}
-   * @param resourceRequestState state of request in {@link ContainerAllocator}
-   * @param allocator to request resources from @{@link ClusterResourceManager}
-   */
-  void handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource,
-      ResourceRequestState resourceRequestState, ContainerAllocator allocator) {
-    if (this.standbyContainerManager.isPresent()) {
-      standbyContainerManager.get()
-          .checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost, allocatedResource, allocator,
-              resourceRequestState);
-    } else {
-      allocator.runStreamProcessor(request, preferredHost);
-    }
-  }
-
-  /**
-   * Handles the action to be taken after the container has been stopped.
-   * Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerStop} to check constraints.
-   * Case 2. When standby is disabled there are two cases according to host-affinity being enabled
-   *    Case 2.1. When host-affinity is enabled resources are requested on host where container was last seen
-   *    Case 2.2. When host-affinity is disabled resources are requested for ANY_HOST
-   *
-   * @param processorId logical id of the container
-   * @param containerId last known id of the container deployed
-   * @param preferredHost host on which container was last deployed
-   * @param exitStatus exit code returned by the container
-   * @param preferredHostRetryDelay delay to be incurred before requesting resources
-   * @param containerAllocator allocator for requesting resources
-   */
-  void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus,
-      Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {
-    if (standbyContainerManager.isPresent()) {
-      standbyContainerManager.get()
-          .handleContainerStop(processorId, containerId, preferredHost, exitStatus, containerAllocator,
-              preferredHostRetryDelay);
-    } else {
-      // If StandbyTasks are not enabled, we simply make a request for the preferredHost
-      containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay);
-    }
-  }
-
-  /**
-   * Handle the container launch failure for active containers and standby (if enabled).
-   * Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerLaunchFail} to check behavior
-   * Case 2. When standby is disabled the allocator issues a request for ANY_HOST resources
-   *
-   * @param processorId logical id of the container
-   * @param containerId last known id of the container deployed
-   * @param preferredHost host on which container is requested to be deployed
-   * @param containerAllocator allocator for requesting resources
-   */
-  void handleContainerLaunchFail(String processorId, String containerId, String preferredHost,
-      ContainerAllocator containerAllocator) {
-    if (processorId != null && standbyContainerManager.isPresent()) {
-      standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator);
-    } else if (processorId != null) {
-      LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}",
-          processorId, containerId, preferredHost);
-      containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
-    } else {
-      LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. "
-          + "Ignoring invalid/redundant notification.", containerId, preferredHost);
-    }
-  }
-
-  /**
-   * Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
-   * this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
-   * issues an ANY_HOST request. Only applies to HOST_AFFINITY enabled cases
-   *
-   * @param processorId logical id of the container
-   * @param preferredHost host on which container is requested to be deployed
-   * @param request pending request for the preferred host
-   * @param allocator allocator for requesting resources
-   * @param resourceRequestState state of request in {@link ContainerAllocator}
-   */
-  @VisibleForTesting
-  void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
-      SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {
-    boolean resourceAvailableOnAnyHost = allocator.hasAllocatedResource(ResourceRequestState.ANY_HOST);
-    if (standbyContainerManager.isPresent()) {
-      standbyContainerManager.get()
-          .handleExpiredResourceRequest(processorId, request,
-              Optional.ofNullable(allocator.peekAllocatedResource(ResourceRequestState.ANY_HOST)), allocator,
-              resourceRequestState);
-    } else if (resourceAvailableOnAnyHost) {
-      LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost);
-      allocator.runStreamProcessor(request, ResourceRequestState.ANY_HOST);
-    } else {
-      LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.",
-          processorId, preferredHost);
-      resourceRequestState.cancelResourceRequest(request);
-      allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
-    }
-  }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 398ed6d..cb0e537 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -95,8 +95,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
   private final ContainerAllocator containerAllocator;
   private final Thread allocatorThread;
 
-  // The ContainerManager manages control actions for both active & standby containers
-  private final ContainerManager containerManager;
+  // The StandbyContainerManager manages standby-aware allocation and failover of containers
+  private final Optional<StandbyContainerManager> standbyContainerManager;
 
   private final Option<DiagnosticsManager> diagnosticsManager;
 
@@ -158,9 +158,14 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     // Wire all metrics to all reporters
     this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));
 
-    this.containerManager = new ContainerManager(state, clusterResourceManager, jobConfig.getStandbyTasksEnabled());
+    // Enable standby container manager if required
+    if (jobConfig.getStandbyTasksEnabled()) {
+      this.standbyContainerManager = Optional.of(new StandbyContainerManager(state, clusterResourceManager));
+    } else {
+      this.standbyContainerManager = Optional.empty();
+    }
 
-    this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager);
+    this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.standbyContainerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
     LOG.info("Finished container process manager initialization.");
   }
@@ -170,8 +175,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
       SamzaApplicationState state,
       MetricsRegistryMap registry,
       ClusterResourceManager resourceManager,
-      Optional<ContainerAllocator> allocator,
-      ContainerManager containerManager) {
+      Optional<ContainerAllocator> allocator) {
     this.state = state;
     this.clusterManagerConfig = clusterManagerConfig;
     this.jobConfig = new JobConfig(clusterManagerConfig);
@@ -179,11 +183,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
 
     this.clusterResourceManager = resourceManager;
-    this.containerManager = containerManager;
+    this.standbyContainerManager = Optional.empty();
     this.diagnosticsManager = Option.empty();
     this.containerAllocator = allocator.orElseGet(
         () -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state,
-            hostAffinityEnabled, this.containerManager));
+            hostAffinityEnabled, this.standbyContainerManager));
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
     LOG.info("Finished container process manager initialization");
   }
@@ -423,7 +427,16 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
     // 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host, if standby are not enabled
     // otherwise calling standbyContainerManager
-    containerManager.handleContainerLaunchFail(processorId, containerId, containerHost, containerAllocator);
+    if (processorId != null && standbyContainerManager.isPresent()) {
+      standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator);
+    } else if (processorId != null) {
+      LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}",
+          processorId, containerId, containerHost);
+      containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
+    } else {
+      LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. " +
+          "Ignoring invalid/redundant notification.", containerId, containerHost);
+    }
   }
 
   /**
@@ -591,25 +604,25 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
   /**
    * Obtains the ID of the Samza processor pending launch on the provided resource (container).
    *
-   * ContainerProcessManager [INFO] Container ID: container_e66_1569376389369_0221_01_000049 matched pending Processor ID: 0 on host: ltx1-app0772.stg.linkedin.com
-   *
-   * @param containerId last known id of the container deployed
-   * @return the logical processorId of the processor (e.g., 0, 1, 2 ...)
+   * @param resourceId the ID of the resource (container)
+   * @return the ID of the Samza processor on this resource
    */
-  private String getPendingProcessorId(String containerId) {
+  private String getPendingProcessorId(String resourceId) {
     for (Map.Entry<String, SamzaResource> entry: state.pendingProcessors.entrySet()) {
-      if (entry.getValue().getContainerId().equals(containerId)) {
-        LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", containerId, entry.getKey(), entry.getValue().getHost());
+      if (entry.getValue().getContainerId().equals(resourceId)) {
+        LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", resourceId, entry.getKey(), entry.getValue().getHost());
         return entry.getKey();
       }
     }
     return null;
   }
 
-  /**
-   * Request {@link ContainerManager#handleContainerStop} to determine next step of actions for the stopped container
-   */
-  private void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
-    containerManager.handleContainerStop(processorId, containerId, preferredHost, exitStatus, preferredHostRetryDelay, containerAllocator);
+  private void handleContainerStop(String processorId, String resourceID, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
+    if (standbyContainerManager.isPresent()) {
+      standbyContainerManager.get().handleContainerStop(processorId, resourceID, preferredHost, exitStatus, containerAllocator, preferredHostRetryDelay);
+    } else {
+      // If StandbyTasks are not enabled, we simply make a request for the preferredHost
+      containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay);
+    }
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java
index 649435a..99ef433 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java
@@ -22,6 +22,7 @@ import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -29,8 +30,8 @@ public class MockContainerAllocatorWithHostAffinity extends ContainerAllocator {
   private Semaphore semaphore = new Semaphore(0);
 
   public MockContainerAllocatorWithHostAffinity(ClusterResourceManager manager,
-      Config config, SamzaApplicationState state, ContainerManager containerManager) {
-    super(manager, config, state, true, containerManager);
+      Config config, SamzaApplicationState state) {
+    super(manager, config, state, true, Optional.empty());
   }
 
   /**
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
index 7448e57..0b3ff80 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
@@ -18,9 +18,11 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.Optional;
 import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
+
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -29,9 +31,10 @@ public class MockContainerAllocatorWithoutHostAffinity extends ContainerAllocato
   public int requestedContainers = 0;
   private Semaphore semaphore = new Semaphore(0);
 
-  public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager resourceManager,
-                                Config config, SamzaApplicationState state, ContainerManager containerManager) {
-    super(resourceManager, config, state, false, containerManager);
+  public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager manager,
+                                Config config,
+                                SamzaApplicationState state) {
+    super(manager, config, state, false, Optional.empty());
   }
 
   /**
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 3451c4c..927df89 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -24,6 +24,7 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -60,7 +61,6 @@ public class TestContainerAllocatorWithHostAffinity {
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
 
   private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-  private final ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
   private JobModelManager initializeJobModelManager(Config config, int containerCount) {
     Map<String, Map<String, String>> localityMap = new HashMap<>();
@@ -83,8 +83,7 @@ public class TestContainerAllocatorWithHostAffinity {
 
   @Before
   public void setup() throws Exception {
-    containerAllocator =
-        new ContainerAllocator(clusterResourceManager, config, state, true, containerManager);
+    containerAllocator = new ContainerAllocator(clusterResourceManager, config, state, true, Optional.empty());
     requestState = new MockContainerRequestState(clusterResourceManager, true);
     Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -360,8 +359,6 @@ public class TestContainerAllocatorWithHostAffinity {
   @Test
   public void testRequestAllocationOnPreferredHostWithRunStreamProcessor() throws Exception {
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
-    ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
-    ContainerManager containerManager = new ContainerManager(state, mockClusterResourceManager, false);
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
         SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -370,7 +367,8 @@ public class TestContainerAllocatorWithHostAffinity {
       }).when(mockCPM).onResourcesAvailable(anyList());
 
     spyAllocator = Mockito.spy(
-        new ContainerAllocator(mockClusterResourceManager, config, state, true, containerManager));
+        new ContainerAllocator(new MockClusterResourceManager(mockCPM, state), config, state, true,
+            Optional.empty()));
 
     // Request Resources
     spyAllocator.requestResources(new HashMap<String, String>() {
@@ -407,9 +405,8 @@ public class TestContainerAllocatorWithHostAffinity {
   @Test
   public void testExpiredRequestAllocationOnAnyHost() throws Exception {
     MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
-    ContainerManager spyContainerManager = spy(new ContainerManager(state, spyManager, false));
-    spyAllocator = Mockito.spy(
-        new ContainerAllocator(spyManager, config, state, true, spyContainerManager));
+    spyAllocator = Mockito.spy(new ContainerAllocator(spyManager, config, state, true, Optional.empty()));
+
     // Request Preferred Resources
     spyAllocator.requestResources(new HashMap<String, String>() {
       {
@@ -428,10 +425,10 @@ public class TestContainerAllocatorWithHostAffinity {
     // Verify that all the request that were created as preferred host requests expired
     assertTrue(state.preferredHostRequests.get() == 2);
     assertTrue(state.expiredPreferredHostRequests.get() == 2);
-    verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
-        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
-    verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
-        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
+        any(SamzaResourceRequest.class));
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
+        any(SamzaResourceRequest.class));
 
     // Verify that preferred host request were cancelled and since no surplus resources were available
     // requestResource was invoked with ANY_HOST requests
@@ -449,11 +446,10 @@ public class TestContainerAllocatorWithHostAffinity {
   @Test
   public void testExpiredRequestAllocationOnSurplusAnyHostWithRunStreamProcessor() throws Exception {
     // Add Extra Resources
-    MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state));
-    ContainerManager spyContainerManager = spy(new ContainerManager(state, spyClusterResourceManager, false));
-
     spyAllocator = Mockito.spy(
-        new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager));
+        new ContainerAllocator(new MockClusterResourceManager(callback, state), config, state, true,
+            Optional.empty()));
+
     spyAllocator.addResource(new SamzaResource(1, 1000, "xyz", "id1"));
     spyAllocator.addResource(new SamzaResource(1, 1000, "zzz", "id2"));
 
@@ -473,11 +469,11 @@ public class TestContainerAllocatorWithHostAffinity {
     Thread.sleep(100);
 
     // Verify that all the request that were created as preferred host requests expired
-    assertEquals(state.expiredPreferredHostRequests.get(), 2);
-    verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
-        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
-    verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
-        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+    assertTrue(state.expiredPreferredHostRequests.get() == 2);
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
+        any(SamzaResourceRequest.class));
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
+        any(SamzaResourceRequest.class));
 
     // Verify that runStreamProcessor was invoked with already available ANY_HOST requests
     ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index 9b0d1b7..16eac0b 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobModelManager;
@@ -66,8 +67,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
 
   @Before
   public void setup() throws Exception {
-    containerAllocator = new ContainerAllocator(manager, config, state, false,
-        new ContainerManager(state, manager, false));
+    containerAllocator = new ContainerAllocator(manager, config, state, false, Optional.empty());
     requestState = new MockContainerRequestState(manager, false);
     Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -261,10 +261,9 @@ public class TestContainerAllocatorWithoutHostAffinity {
     };
 
     ClusterResourceManager.Callback mockCPM = mock(ClusterResourceManager.Callback.class);
-    ClusterResourceManager mockManager = new MockClusterResourceManager(mockCPM, state);
-    ContainerManager spyContainerManager = spy(new ContainerManager(state, mockManager, false));
     spyAllocator = Mockito.spy(
-        new ContainerAllocator(mockManager, config, state, false, spyContainerManager));
+        new ContainerAllocator(new MockClusterResourceManager(mockCPM, state), config, state, false,
+            Optional.empty()));
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
         SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -283,9 +282,9 @@ public class TestContainerAllocatorWithoutHostAffinity {
     resourceRequestCaptor.getAllValues()
         .forEach(resourceRequest -> assertEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST));
     assertTrue(state.anyHostRequests.get() == containersToHostMapping.size());
-    // Expiry currently should not be invoked for host affinity enabled cases only
-    verify(spyContainerManager, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
-        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+    // Expiry currently is only handled for host affinity enabled cases
+    verify(spyAllocator, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
+        any(SamzaResourceRequest.class));
     // Only updated when host affinity is enabled
     assertTrue(state.matchedResourceRequests.get() == 0);
     assertTrue(state.preferredHostRequests.get() == 0);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 639b247..f100393 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -141,7 +141,7 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
+
     ContainerProcessManager cpm =
         buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
@@ -165,8 +165,7 @@ public class TestContainerProcessManager {
         state,
         new MetricsRegistryMap(),
         clusterResourceManager,
-        Optional.empty(),
-        containerManager
+        Optional.empty()
     );
 
     allocator =
@@ -184,7 +183,6 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     ContainerProcessManager cpm =
         buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());
@@ -192,8 +190,7 @@ public class TestContainerProcessManager {
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator);
     CountDownLatch latch = new CountDownLatch(1);
@@ -250,13 +247,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -296,13 +291,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm = spy(
         buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -393,13 +386,11 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         clusterManagerConfig,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
@@ -477,13 +468,11 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         clusterManagerConfig,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
@@ -585,13 +574,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -625,17 +612,15 @@ public class TestContainerProcessManager {
     configMap.putAll(getConfig());
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         new MapConfig(config),
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager manager =
-        new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(), clusterResourceManager,
-            Optional.of(allocator), containerManager);
+        new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(),
+            clusterResourceManager, Optional.of(allocator));
 
     manager.start();
     SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
@@ -659,13 +644,11 @@ public class TestContainerProcessManager {
         "1", "host2")));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithHostAffinity allocator = new MockContainerAllocatorWithHostAffinity(
         clusterResourceManager,
         cfg,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator)));
@@ -722,13 +705,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(conf)));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -795,13 +776,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(config)));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -887,7 +866,7 @@ public class TestContainerProcessManager {
 
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator) {
-    return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager, allocator,
-         new ContainerManager(state, clusterResourceManager, false));
+    return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager,
+        allocator);
   }
 }