You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2019/11/06 19:37:19 UTC

[samza] branch master updated: Revert "SAMZA-2340: [Container Placements] Introduce ContainerManager for handling validation for failures & starts of active & standby containers" (#1208)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a9896b  Revert "SAMZA-2340: [Container Placements] Introduce ContainerManager for handling validation for failures & starts of active & standby containers" (#1208)
6a9896b is described below

commit 6a9896bd2c1b5c08f3dc707baa81ed73de38c647
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Wed Nov 6 11:37:14 2019 -0800

    Revert "SAMZA-2340: [Container Placements] Introduce ContainerManager for handling validation for failures & starts of active & standby containers" (#1208)
---
 .../samza/clustermanager/ContainerAllocator.java   |  50 ++++++-
 .../samza/clustermanager/ContainerManager.java     | 164 ---------------------
 .../clustermanager/ContainerProcessManager.java    |  55 ++++---
 .../MockContainerAllocatorWithHostAffinity.java    |   5 +-
 .../MockContainerAllocatorWithoutHostAffinity.java |   9 +-
 .../TestContainerAllocatorWithHostAffinity.java    |  40 +++--
 .../TestContainerAllocatorWithoutHostAffinity.java |  15 +-
 .../TestContainerProcessManager.java               |  53 ++-----
 8 files changed, 129 insertions(+), 262 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 38659f3..89855dc 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Map;
@@ -114,13 +115,13 @@ public class ContainerAllocator implements Runnable {
    */
   private final int requestExpiryTimeout;
 
-  private final ContainerManager containerManager;
+  private final Optional<StandbyContainerManager> standbyContainerManager;
 
   public ContainerAllocator(ClusterResourceManager clusterResourceManager,
       Config config,
       SamzaApplicationState state,
       boolean hostAffinityEnabled,
-      ContainerManager containerManager) {
+      Optional<StandbyContainerManager> standbyContainerManager) {
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
     this.clusterResourceManager = clusterResourceManager;
     this.allocatorSleepIntervalMs = clusterManagerConfig.getAllocatorSleepTime();
@@ -131,7 +132,7 @@ public class ContainerAllocator implements Runnable {
     this.state = state;
     this.config = config;
     this.hostAffinityEnabled = hostAffinityEnabled;
-    this.containerManager = containerManager;
+    this.standbyContainerManager = standbyContainerManager;
     this.requestExpiryTimeout = clusterManagerConfig.getContainerRequestTimeout();
   }
 
@@ -203,7 +204,12 @@ public class ContainerAllocator implements Runnable {
           state.matchedResourceRequests.incrementAndGet();
         }
 
-        containerManager.handleContainerLaunch(request, preferredHost, peekAllocatedResource(preferredHost), resourceRequestState, this);
+        // If hot-standby is enabled, check standby constraints are met before launching a processor
+        if (this.standbyContainerManager.isPresent()) {
+          checkStandByContrainsAndRunStreamProcessor(request, preferredHost);
+        } else {
+          runStreamProcessor(request, preferredHost);
+        }
 
       } else {
 
@@ -214,7 +220,7 @@ public class ContainerAllocator implements Runnable {
         if (expired) {
           updateExpiryMetrics(request);
           if (hostAffinityEnabled) {
-            containerManager.handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request, this, resourceRequestState);
+            handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request);
           }
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
@@ -227,6 +233,30 @@ public class ContainerAllocator implements Runnable {
   }
 
   /**
+   * Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
+   * this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
+   * issues an ANY_HOST request.
+   */
+  @VisibleForTesting
+  void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
+      SamzaResourceRequest request) {
+    boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST);
+    if (standbyContainerManager.isPresent()) {
+      standbyContainerManager.get()
+          .handleExpiredResourceRequest(processorId, request,
+              Optional.ofNullable(peekAllocatedResource(ResourceRequestState.ANY_HOST)), this, resourceRequestState);
+    } else if (resourceAvailableOnAnyHost) {
+      LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost);
+      runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+    } else {
+      LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.",
+          processorId, preferredHost);
+      resourceRequestState.cancelResourceRequest(request);
+      requestResource(processorId, ResourceRequestState.ANY_HOST);
+    }
+  }
+
+  /**
    * Updates the request state and runs a processor on the specified host. Assumes a resource
    * is available on the preferred host, so the caller must verify that before invoking this method.
    *
@@ -261,6 +291,16 @@ public class ContainerAllocator implements Runnable {
   }
 
   /**
+   * If {@code StandbyContainerManager} is present check standBy constraints are met before attempting to launch
+   * @param request outstanding request which has an allocated resource
+   * @param preferredHost to run the request
+   */
+  private void checkStandByContrainsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost) {
+    standbyContainerManager.get().checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost,
+        peekAllocatedResource(preferredHost), this, resourceRequestState);
+  }
+
+  /**
    * Called during initial request for resources
    *
    * @param processorToHostMapping A Map of [processorId, hostName], where processorId is the ID of the Samza processor
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
deleted file mode 100644
index 6b29bc8..0000000
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.clustermanager;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.time.Duration;
-import java.util.Optional;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * ContainerManager is a centralized entity that manages control actions like start, stop for both active and standby containers
- * ContainerManager acts as a brain for validating and issuing any actions on containers in the Job Coordinator.
- *
- * The requests to allocate resources resources made by {@link ContainerAllocator} can either expire or succeed.
- * When the requests succeeds the ContainerManager validates those requests before starting the container
- * When the requests expires the ContainerManager decides the next set of actions for the pending request.
- *
- * Callbacks issued from  {@link ClusterResourceManager} aka {@link ContainerProcessManager} are intercepted
- * by ContainerManager to handle container failure and completions for both active and standby containers
- */
-public class ContainerManager {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class);
-
-  /**
-   * Resource-manager, used to stop containers
-   */
-  private final ClusterResourceManager clusterResourceManager;
-  private final SamzaApplicationState samzaApplicationState;
-
-  private final Optional<StandbyContainerManager> standbyContainerManager;
-
-  public ContainerManager(SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      Boolean standByEnabled) {
-    this.samzaApplicationState = samzaApplicationState;
-    this.clusterResourceManager = clusterResourceManager;
-    // Enable standby container manager if required
-    if (standByEnabled) {
-      this.standbyContainerManager =
-          Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager));
-    } else {
-      this.standbyContainerManager = Optional.empty();
-    }
-  }
-
-  /**
-   * Handles the container start action for both active & standby containers.
-   *
-   * @param request pending request for the preferred host
-   * @param preferredHost preferred host to start the container
-   * @param allocatedResource resource allocated from {@link ClusterResourceManager}
-   * @param resourceRequestState state of request in {@link ContainerAllocator}
-   * @param allocator to request resources from @{@link ClusterResourceManager}
-   */
-  void handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource,
-      ResourceRequestState resourceRequestState, ContainerAllocator allocator) {
-    if (this.standbyContainerManager.isPresent()) {
-      standbyContainerManager.get()
-          .checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost, allocatedResource, allocator,
-              resourceRequestState);
-    } else {
-      allocator.runStreamProcessor(request, preferredHost);
-    }
-  }
-
-  /**
-   * Handles the action to be taken after the container has been stopped.
-   * Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerStop} to check constraints.
-   * Case 2. When standby is disabled there are two cases according to host-affinity being enabled
-   *    Case 2.1. When host-affinity is enabled resources are requested on host where container was last seen
-   *    Case 2.2. When host-affinity is disabled resources are requested for ANY_HOST
-   *
-   * @param processorId logical id of the container
-   * @param containerId last known id of the container deployed
-   * @param preferredHost host on which container was last deployed
-   * @param exitStatus exit code returned by the container
-   * @param preferredHostRetryDelay delay to be incurred before requesting resources
-   * @param containerAllocator allocator for requesting resources
-   */
-  void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus,
-      Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {
-    if (standbyContainerManager.isPresent()) {
-      standbyContainerManager.get()
-          .handleContainerStop(processorId, containerId, preferredHost, exitStatus, containerAllocator,
-              preferredHostRetryDelay);
-    } else {
-      // If StandbyTasks are not enabled, we simply make a request for the preferredHost
-      containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay);
-    }
-  }
-
-  /**
-   * Handle the container launch failure for active containers and standby (if enabled).
-   * Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerLaunchFail} to check behavior
-   * Case 2. When standby is disabled the allocator issues a request for ANY_HOST resources
-   *
-   * @param processorId logical id of the container
-   * @param containerId last known id of the container deployed
-   * @param preferredHost host on which container is requested to be deployed
-   * @param containerAllocator allocator for requesting resources
-   */
-  void handleContainerLaunchFail(String processorId, String containerId, String preferredHost,
-      ContainerAllocator containerAllocator) {
-    if (processorId != null && standbyContainerManager.isPresent()) {
-      standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator);
-    } else if (processorId != null) {
-      LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}",
-          processorId, containerId, preferredHost);
-      containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
-    } else {
-      LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. "
-          + "Ignoring invalid/redundant notification.", containerId, preferredHost);
-    }
-  }
-
-  /**
-   * Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
-   * this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
-   * issues an ANY_HOST request. Only applies to HOST_AFFINITY enabled cases
-   *
-   * @param processorId logical id of the container
-   * @param preferredHost host on which container is requested to be deployed
-   * @param request pending request for the preferred host
-   * @param allocator allocator for requesting resources
-   * @param resourceRequestState state of request in {@link ContainerAllocator}
-   */
-  @VisibleForTesting
-  void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
-      SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {
-    boolean resourceAvailableOnAnyHost = allocator.hasAllocatedResource(ResourceRequestState.ANY_HOST);
-    if (standbyContainerManager.isPresent()) {
-      standbyContainerManager.get()
-          .handleExpiredResourceRequest(processorId, request,
-              Optional.ofNullable(allocator.peekAllocatedResource(ResourceRequestState.ANY_HOST)), allocator,
-              resourceRequestState);
-    } else if (resourceAvailableOnAnyHost) {
-      LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost);
-      allocator.runStreamProcessor(request, ResourceRequestState.ANY_HOST);
-    } else {
-      LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.",
-          processorId, preferredHost);
-      resourceRequestState.cancelResourceRequest(request);
-      allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
-    }
-  }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 398ed6d..cb0e537 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -95,8 +95,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
   private final ContainerAllocator containerAllocator;
   private final Thread allocatorThread;
 
-  // The ContainerManager manages control actions for both active & standby containers
-  private final ContainerManager containerManager;
+  // The StandbyContainerManager manages standby-aware allocation and failover of containers
+  private final Optional<StandbyContainerManager> standbyContainerManager;
 
   private final Option<DiagnosticsManager> diagnosticsManager;
 
@@ -158,9 +158,14 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     // Wire all metrics to all reporters
     this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));
 
-    this.containerManager = new ContainerManager(state, clusterResourceManager, jobConfig.getStandbyTasksEnabled());
+    // Enable standby container manager if required
+    if (jobConfig.getStandbyTasksEnabled()) {
+      this.standbyContainerManager = Optional.of(new StandbyContainerManager(state, clusterResourceManager));
+    } else {
+      this.standbyContainerManager = Optional.empty();
+    }
 
-    this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager);
+    this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.standbyContainerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
     LOG.info("Finished container process manager initialization.");
   }
@@ -170,8 +175,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
       SamzaApplicationState state,
       MetricsRegistryMap registry,
       ClusterResourceManager resourceManager,
-      Optional<ContainerAllocator> allocator,
-      ContainerManager containerManager) {
+      Optional<ContainerAllocator> allocator) {
     this.state = state;
     this.clusterManagerConfig = clusterManagerConfig;
     this.jobConfig = new JobConfig(clusterManagerConfig);
@@ -179,11 +183,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
 
     this.clusterResourceManager = resourceManager;
-    this.containerManager = containerManager;
+    this.standbyContainerManager = Optional.empty();
     this.diagnosticsManager = Option.empty();
     this.containerAllocator = allocator.orElseGet(
         () -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state,
-            hostAffinityEnabled, this.containerManager));
+            hostAffinityEnabled, this.standbyContainerManager));
     this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
     LOG.info("Finished container process manager initialization");
   }
@@ -423,7 +427,16 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
     // 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host, if standby are not enabled
     // otherwise calling standbyContainerManager
-    containerManager.handleContainerLaunchFail(processorId, containerId, containerHost, containerAllocator);
+    if (processorId != null && standbyContainerManager.isPresent()) {
+      standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator);
+    } else if (processorId != null) {
+      LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}",
+          processorId, containerId, containerHost);
+      containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
+    } else {
+      LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. " +
+          "Ignoring invalid/redundant notification.", containerId, containerHost);
+    }
   }
 
   /**
@@ -591,25 +604,25 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
   /**
    * Obtains the ID of the Samza processor pending launch on the provided resource (container).
    *
-   * ContainerProcessManager [INFO] Container ID: container_e66_1569376389369_0221_01_000049 matched pending Processor ID: 0 on host: ltx1-app0772.stg.linkedin.com
-   *
-   * @param containerId last known id of the container deployed
-   * @return the logical processorId of the processor (e.g., 0, 1, 2 ...)
+   * @param resourceId the ID of the resource (container)
+   * @return the ID of the Samza processor on this resource
    */
-  private String getPendingProcessorId(String containerId) {
+  private String getPendingProcessorId(String resourceId) {
     for (Map.Entry<String, SamzaResource> entry: state.pendingProcessors.entrySet()) {
-      if (entry.getValue().getContainerId().equals(containerId)) {
-        LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", containerId, entry.getKey(), entry.getValue().getHost());
+      if (entry.getValue().getContainerId().equals(resourceId)) {
+        LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", resourceId, entry.getKey(), entry.getValue().getHost());
         return entry.getKey();
       }
     }
     return null;
   }
 
-  /**
-   * Request {@link ContainerManager#handleContainerStop} to determine next step of actions for the stopped container
-   */
-  private void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
-    containerManager.handleContainerStop(processorId, containerId, preferredHost, exitStatus, preferredHostRetryDelay, containerAllocator);
+  private void handleContainerStop(String processorId, String resourceID, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
+    if (standbyContainerManager.isPresent()) {
+      standbyContainerManager.get().handleContainerStop(processorId, resourceID, preferredHost, exitStatus, containerAllocator, preferredHostRetryDelay);
+    } else {
+      // If StandbyTasks are not enabled, we simply make a request for the preferredHost
+      containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay);
+    }
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java
index 649435a..99ef433 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java
@@ -22,6 +22,7 @@ import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -29,8 +30,8 @@ public class MockContainerAllocatorWithHostAffinity extends ContainerAllocator {
   private Semaphore semaphore = new Semaphore(0);
 
   public MockContainerAllocatorWithHostAffinity(ClusterResourceManager manager,
-      Config config, SamzaApplicationState state, ContainerManager containerManager) {
-    super(manager, config, state, true, containerManager);
+      Config config, SamzaApplicationState state) {
+    super(manager, config, state, true, Optional.empty());
   }
 
   /**
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
index 7448e57..0b3ff80 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
@@ -18,9 +18,11 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.Optional;
 import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
+
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -29,9 +31,10 @@ public class MockContainerAllocatorWithoutHostAffinity extends ContainerAllocato
   public int requestedContainers = 0;
   private Semaphore semaphore = new Semaphore(0);
 
-  public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager resourceManager,
-                                Config config, SamzaApplicationState state, ContainerManager containerManager) {
-    super(resourceManager, config, state, false, containerManager);
+  public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager manager,
+                                Config config,
+                                SamzaApplicationState state) {
+    super(manager, config, state, false, Optional.empty());
   }
 
   /**
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 3451c4c..927df89 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -24,6 +24,7 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -60,7 +61,6 @@ public class TestContainerAllocatorWithHostAffinity {
   private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
 
   private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-  private final ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
   private JobModelManager initializeJobModelManager(Config config, int containerCount) {
     Map<String, Map<String, String>> localityMap = new HashMap<>();
@@ -83,8 +83,7 @@ public class TestContainerAllocatorWithHostAffinity {
 
   @Before
   public void setup() throws Exception {
-    containerAllocator =
-        new ContainerAllocator(clusterResourceManager, config, state, true, containerManager);
+    containerAllocator = new ContainerAllocator(clusterResourceManager, config, state, true, Optional.empty());
     requestState = new MockContainerRequestState(clusterResourceManager, true);
     Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -360,8 +359,6 @@ public class TestContainerAllocatorWithHostAffinity {
   @Test
   public void testRequestAllocationOnPreferredHostWithRunStreamProcessor() throws Exception {
     ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
-    ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
-    ContainerManager containerManager = new ContainerManager(state, mockClusterResourceManager, false);
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
         SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -370,7 +367,8 @@ public class TestContainerAllocatorWithHostAffinity {
       }).when(mockCPM).onResourcesAvailable(anyList());
 
     spyAllocator = Mockito.spy(
-        new ContainerAllocator(mockClusterResourceManager, config, state, true, containerManager));
+        new ContainerAllocator(new MockClusterResourceManager(mockCPM, state), config, state, true,
+            Optional.empty()));
 
     // Request Resources
     spyAllocator.requestResources(new HashMap<String, String>() {
@@ -407,9 +405,8 @@ public class TestContainerAllocatorWithHostAffinity {
   @Test
   public void testExpiredRequestAllocationOnAnyHost() throws Exception {
     MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
-    ContainerManager spyContainerManager = spy(new ContainerManager(state, spyManager, false));
-    spyAllocator = Mockito.spy(
-        new ContainerAllocator(spyManager, config, state, true, spyContainerManager));
+    spyAllocator = Mockito.spy(new ContainerAllocator(spyManager, config, state, true, Optional.empty()));
+
     // Request Preferred Resources
     spyAllocator.requestResources(new HashMap<String, String>() {
       {
@@ -428,10 +425,10 @@ public class TestContainerAllocatorWithHostAffinity {
     // Verify that all the request that were created as preferred host requests expired
     assertTrue(state.preferredHostRequests.get() == 2);
     assertTrue(state.expiredPreferredHostRequests.get() == 2);
-    verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
-        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
-    verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
-        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
+        any(SamzaResourceRequest.class));
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
+        any(SamzaResourceRequest.class));
 
     // Verify that preferred host request were cancelled and since no surplus resources were available
     // requestResource was invoked with ANY_HOST requests
@@ -449,11 +446,10 @@ public class TestContainerAllocatorWithHostAffinity {
   @Test
   public void testExpiredRequestAllocationOnSurplusAnyHostWithRunStreamProcessor() throws Exception {
     // Add Extra Resources
-    MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state));
-    ContainerManager spyContainerManager = spy(new ContainerManager(state, spyClusterResourceManager, false));
-
     spyAllocator = Mockito.spy(
-        new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager));
+        new ContainerAllocator(new MockClusterResourceManager(callback, state), config, state, true,
+            Optional.empty()));
+
     spyAllocator.addResource(new SamzaResource(1, 1000, "xyz", "id1"));
     spyAllocator.addResource(new SamzaResource(1, 1000, "zzz", "id2"));
 
@@ -473,11 +469,11 @@ public class TestContainerAllocatorWithHostAffinity {
     Thread.sleep(100);
 
     // Verify that all the request that were created as preferred host requests expired
-    assertEquals(state.expiredPreferredHostRequests.get(), 2);
-    verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
-        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
-    verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
-        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+    assertTrue(state.expiredPreferredHostRequests.get() == 2);
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
+        any(SamzaResourceRequest.class));
+    verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
+        any(SamzaResourceRequest.class));
 
     // Verify that runStreamProcessor was invoked with already available ANY_HOST requests
     ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index 9b0d1b7..16eac0b 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobModelManager;
@@ -66,8 +67,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
 
   @Before
   public void setup() throws Exception {
-    containerAllocator = new ContainerAllocator(manager, config, state, false,
-        new ContainerManager(state, manager, false));
+    containerAllocator = new ContainerAllocator(manager, config, state, false, Optional.empty());
     requestState = new MockContainerRequestState(manager, false);
     Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -261,10 +261,9 @@ public class TestContainerAllocatorWithoutHostAffinity {
     };
 
     ClusterResourceManager.Callback mockCPM = mock(ClusterResourceManager.Callback.class);
-    ClusterResourceManager mockManager = new MockClusterResourceManager(mockCPM, state);
-    ContainerManager spyContainerManager = spy(new ContainerManager(state, mockManager, false));
     spyAllocator = Mockito.spy(
-        new ContainerAllocator(mockManager, config, state, false, spyContainerManager));
+        new ContainerAllocator(new MockClusterResourceManager(mockCPM, state), config, state, false,
+            Optional.empty()));
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
         SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -283,9 +282,9 @@ public class TestContainerAllocatorWithoutHostAffinity {
     resourceRequestCaptor.getAllValues()
         .forEach(resourceRequest -> assertEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST));
     assertTrue(state.anyHostRequests.get() == containersToHostMapping.size());
-    // Expiry currently should not be invoked for host affinity enabled cases only
-    verify(spyContainerManager, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
-        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+    // Expiry currently is only handled for host affinity enabled cases
+    verify(spyAllocator, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
+        any(SamzaResourceRequest.class));
     // Only updated when host affinity is enabled
     assertTrue(state.matchedResourceRequests.get() == 0);
     assertTrue(state.preferredHostRequests.get() == 0);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 639b247..f100393 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -141,7 +141,7 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
+
     ContainerProcessManager cpm =
         buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
@@ -165,8 +165,7 @@ public class TestContainerProcessManager {
         state,
         new MetricsRegistryMap(),
         clusterResourceManager,
-        Optional.empty(),
-        containerManager
+        Optional.empty()
     );
 
     allocator =
@@ -184,7 +183,6 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     ContainerProcessManager cpm =
         buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());
@@ -192,8 +190,7 @@ public class TestContainerProcessManager {
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator);
     CountDownLatch latch = new CountDownLatch(1);
@@ -250,13 +247,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -296,13 +291,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm = spy(
         buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -393,13 +386,11 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         clusterManagerConfig,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
@@ -477,13 +468,11 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         clusterManagerConfig,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
@@ -585,13 +574,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -625,17 +612,15 @@ public class TestContainerProcessManager {
     configMap.putAll(getConfig());
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         new MapConfig(config),
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager manager =
-        new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(), clusterResourceManager,
-            Optional.of(allocator), containerManager);
+        new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(),
+            clusterResourceManager, Optional.of(allocator));
 
     manager.start();
     SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
@@ -659,13 +644,11 @@ public class TestContainerProcessManager {
         "1", "host2")));
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithHostAffinity allocator = new MockContainerAllocatorWithHostAffinity(
         clusterResourceManager,
         cfg,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator)));
@@ -722,13 +705,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(conf)));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -795,13 +776,11 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(config)));
-    ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
         conf,
-        state,
-        containerManager);
+        state);
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -887,7 +866,7 @@ public class TestContainerProcessManager {
 
   private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator) {
-    return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager, allocator,
-         new ContainerManager(state, clusterResourceManager, false));
+    return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager,
+        allocator);
   }
 }