You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2019/11/06 00:08:09 UTC
[samza] 01/01: Revert "SAMZA-2340: [Container Placements] Introduce
ContainerManager for handling validation for failures & starts of active &
standby containers"
This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch revert-1180-container-placement-service
in repository https://gitbox.apache.org/repos/asf/samza.git
commit badf8144bdf6d78d232d58e0c333db8cdc22d98f
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Tue Nov 5 16:08:02 2019 -0800
Revert "SAMZA-2340: [Container Placements] Introduce ContainerManager for handling validation for failures & starts of active & standby containers"
---
.../samza/clustermanager/ContainerAllocator.java | 50 ++++++-
.../samza/clustermanager/ContainerManager.java | 164 ---------------------
.../clustermanager/ContainerProcessManager.java | 55 ++++---
.../MockContainerAllocatorWithHostAffinity.java | 5 +-
.../MockContainerAllocatorWithoutHostAffinity.java | 9 +-
.../TestContainerAllocatorWithHostAffinity.java | 40 +++--
.../TestContainerAllocatorWithoutHostAffinity.java | 15 +-
.../TestContainerProcessManager.java | 53 ++-----
8 files changed, 129 insertions(+), 262 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 38659f3..89855dc 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager;
+import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
@@ -114,13 +115,13 @@ public class ContainerAllocator implements Runnable {
*/
private final int requestExpiryTimeout;
- private final ContainerManager containerManager;
+ private final Optional<StandbyContainerManager> standbyContainerManager;
public ContainerAllocator(ClusterResourceManager clusterResourceManager,
Config config,
SamzaApplicationState state,
boolean hostAffinityEnabled,
- ContainerManager containerManager) {
+ Optional<StandbyContainerManager> standbyContainerManager) {
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
this.clusterResourceManager = clusterResourceManager;
this.allocatorSleepIntervalMs = clusterManagerConfig.getAllocatorSleepTime();
@@ -131,7 +132,7 @@ public class ContainerAllocator implements Runnable {
this.state = state;
this.config = config;
this.hostAffinityEnabled = hostAffinityEnabled;
- this.containerManager = containerManager;
+ this.standbyContainerManager = standbyContainerManager;
this.requestExpiryTimeout = clusterManagerConfig.getContainerRequestTimeout();
}
@@ -203,7 +204,12 @@ public class ContainerAllocator implements Runnable {
state.matchedResourceRequests.incrementAndGet();
}
- containerManager.handleContainerLaunch(request, preferredHost, peekAllocatedResource(preferredHost), resourceRequestState, this);
+ // If hot-standby is enabled, check standby constraints are met before launching a processor
+ if (this.standbyContainerManager.isPresent()) {
+ checkStandByContrainsAndRunStreamProcessor(request, preferredHost);
+ } else {
+ runStreamProcessor(request, preferredHost);
+ }
} else {
@@ -214,7 +220,7 @@ public class ContainerAllocator implements Runnable {
if (expired) {
updateExpiryMetrics(request);
if (hostAffinityEnabled) {
- containerManager.handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request, this, resourceRequestState);
+ handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request);
}
} else {
LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
@@ -227,6 +233,30 @@ public class ContainerAllocator implements Runnable {
}
/**
+ * Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
+ * this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
+ * issues an ANY_HOST request.
+ */
+ @VisibleForTesting
+ void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
+ SamzaResourceRequest request) {
+ boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST);
+ if (standbyContainerManager.isPresent()) {
+ standbyContainerManager.get()
+ .handleExpiredResourceRequest(processorId, request,
+ Optional.ofNullable(peekAllocatedResource(ResourceRequestState.ANY_HOST)), this, resourceRequestState);
+ } else if (resourceAvailableOnAnyHost) {
+ LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost);
+ runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+ } else {
+ LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.",
+ processorId, preferredHost);
+ resourceRequestState.cancelResourceRequest(request);
+ requestResource(processorId, ResourceRequestState.ANY_HOST);
+ }
+ }
+
+ /**
* Updates the request state and runs a processor on the specified host. Assumes a resource
* is available on the preferred host, so the caller must verify that before invoking this method.
*
@@ -261,6 +291,16 @@ public class ContainerAllocator implements Runnable {
}
/**
+ * If {@code StandbyContainerManager} is present check standBy constraints are met before attempting to launch
+ * @param request outstanding request which has an allocated resource
+ * @param preferredHost to run the request
+ */
+ private void checkStandByContrainsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost) {
+ standbyContainerManager.get().checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost,
+ peekAllocatedResource(preferredHost), this, resourceRequestState);
+ }
+
+ /**
* Called during initial request for resources
*
* @param processorToHostMapping A Map of [processorId, hostName], where processorId is the ID of the Samza processor
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
deleted file mode 100644
index 6b29bc8..0000000
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.clustermanager;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.time.Duration;
-import java.util.Optional;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * ContainerManager is a centralized entity that manages control actions like start, stop for both active and standby containers
- * ContainerManager acts as a brain for validating and issuing any actions on containers in the Job Coordinator.
- *
- * The requests to allocate resources resources made by {@link ContainerAllocator} can either expire or succeed.
- * When the requests succeeds the ContainerManager validates those requests before starting the container
- * When the requests expires the ContainerManager decides the next set of actions for the pending request.
- *
- * Callbacks issued from {@link ClusterResourceManager} aka {@link ContainerProcessManager} are intercepted
- * by ContainerManager to handle container failure and completions for both active and standby containers
- */
-public class ContainerManager {
-
- private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class);
-
- /**
- * Resource-manager, used to stop containers
- */
- private final ClusterResourceManager clusterResourceManager;
- private final SamzaApplicationState samzaApplicationState;
-
- private final Optional<StandbyContainerManager> standbyContainerManager;
-
- public ContainerManager(SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
- Boolean standByEnabled) {
- this.samzaApplicationState = samzaApplicationState;
- this.clusterResourceManager = clusterResourceManager;
- // Enable standby container manager if required
- if (standByEnabled) {
- this.standbyContainerManager =
- Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager));
- } else {
- this.standbyContainerManager = Optional.empty();
- }
- }
-
- /**
- * Handles the container start action for both active & standby containers.
- *
- * @param request pending request for the preferred host
- * @param preferredHost preferred host to start the container
- * @param allocatedResource resource allocated from {@link ClusterResourceManager}
- * @param resourceRequestState state of request in {@link ContainerAllocator}
- * @param allocator to request resources from @{@link ClusterResourceManager}
- */
- void handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource,
- ResourceRequestState resourceRequestState, ContainerAllocator allocator) {
- if (this.standbyContainerManager.isPresent()) {
- standbyContainerManager.get()
- .checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost, allocatedResource, allocator,
- resourceRequestState);
- } else {
- allocator.runStreamProcessor(request, preferredHost);
- }
- }
-
- /**
- * Handles the action to be taken after the container has been stopped.
- * Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerStop} to check constraints.
- * Case 2. When standby is disabled there are two cases according to host-affinity being enabled
- * Case 2.1. When host-affinity is enabled resources are requested on host where container was last seen
- * Case 2.2. When host-affinity is disabled resources are requested for ANY_HOST
- *
- * @param processorId logical id of the container
- * @param containerId last known id of the container deployed
- * @param preferredHost host on which container was last deployed
- * @param exitStatus exit code returned by the container
- * @param preferredHostRetryDelay delay to be incurred before requesting resources
- * @param containerAllocator allocator for requesting resources
- */
- void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus,
- Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {
- if (standbyContainerManager.isPresent()) {
- standbyContainerManager.get()
- .handleContainerStop(processorId, containerId, preferredHost, exitStatus, containerAllocator,
- preferredHostRetryDelay);
- } else {
- // If StandbyTasks are not enabled, we simply make a request for the preferredHost
- containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay);
- }
- }
-
- /**
- * Handle the container launch failure for active containers and standby (if enabled).
- * Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerLaunchFail} to check behavior
- * Case 2. When standby is disabled the allocator issues a request for ANY_HOST resources
- *
- * @param processorId logical id of the container
- * @param containerId last known id of the container deployed
- * @param preferredHost host on which container is requested to be deployed
- * @param containerAllocator allocator for requesting resources
- */
- void handleContainerLaunchFail(String processorId, String containerId, String preferredHost,
- ContainerAllocator containerAllocator) {
- if (processorId != null && standbyContainerManager.isPresent()) {
- standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator);
- } else if (processorId != null) {
- LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}",
- processorId, containerId, preferredHost);
- containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
- } else {
- LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. "
- + "Ignoring invalid/redundant notification.", containerId, preferredHost);
- }
- }
-
- /**
- * Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
- * this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
- * issues an ANY_HOST request. Only applies to HOST_AFFINITY enabled cases
- *
- * @param processorId logical id of the container
- * @param preferredHost host on which container is requested to be deployed
- * @param request pending request for the preferred host
- * @param allocator allocator for requesting resources
- * @param resourceRequestState state of request in {@link ContainerAllocator}
- */
- @VisibleForTesting
- void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
- SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {
- boolean resourceAvailableOnAnyHost = allocator.hasAllocatedResource(ResourceRequestState.ANY_HOST);
- if (standbyContainerManager.isPresent()) {
- standbyContainerManager.get()
- .handleExpiredResourceRequest(processorId, request,
- Optional.ofNullable(allocator.peekAllocatedResource(ResourceRequestState.ANY_HOST)), allocator,
- resourceRequestState);
- } else if (resourceAvailableOnAnyHost) {
- LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost);
- allocator.runStreamProcessor(request, ResourceRequestState.ANY_HOST);
- } else {
- LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.",
- processorId, preferredHost);
- resourceRequestState.cancelResourceRequest(request);
- allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
- }
- }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 398ed6d..cb0e537 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -95,8 +95,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
private final ContainerAllocator containerAllocator;
private final Thread allocatorThread;
- // The ContainerManager manages control actions for both active & standby containers
- private final ContainerManager containerManager;
+ // The StandbyContainerManager manages standby-aware allocation and failover of containers
+ private final Optional<StandbyContainerManager> standbyContainerManager;
private final Option<DiagnosticsManager> diagnosticsManager;
@@ -158,9 +158,14 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
// Wire all metrics to all reporters
this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));
- this.containerManager = new ContainerManager(state, clusterResourceManager, jobConfig.getStandbyTasksEnabled());
+ // Enable standby container manager if required
+ if (jobConfig.getStandbyTasksEnabled()) {
+ this.standbyContainerManager = Optional.of(new StandbyContainerManager(state, clusterResourceManager));
+ } else {
+ this.standbyContainerManager = Optional.empty();
+ }
- this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager);
+ this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.standbyContainerManager);
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
LOG.info("Finished container process manager initialization.");
}
@@ -170,8 +175,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
SamzaApplicationState state,
MetricsRegistryMap registry,
ClusterResourceManager resourceManager,
- Optional<ContainerAllocator> allocator,
- ContainerManager containerManager) {
+ Optional<ContainerAllocator> allocator) {
this.state = state;
this.clusterManagerConfig = clusterManagerConfig;
this.jobConfig = new JobConfig(clusterManagerConfig);
@@ -179,11 +183,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
this.clusterResourceManager = resourceManager;
- this.containerManager = containerManager;
+ this.standbyContainerManager = Optional.empty();
this.diagnosticsManager = Option.empty();
this.containerAllocator = allocator.orElseGet(
() -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state,
- hostAffinityEnabled, this.containerManager));
+ hostAffinityEnabled, this.standbyContainerManager));
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
LOG.info("Finished container process manager initialization");
}
@@ -423,7 +427,16 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
// 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host, if standby are not enabled
// otherwise calling standbyContainerManager
- containerManager.handleContainerLaunchFail(processorId, containerId, containerHost, containerAllocator);
+ if (processorId != null && standbyContainerManager.isPresent()) {
+ standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator);
+ } else if (processorId != null) {
+ LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}",
+ processorId, containerId, containerHost);
+ containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
+ } else {
+ LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. " +
+ "Ignoring invalid/redundant notification.", containerId, containerHost);
+ }
}
/**
@@ -591,25 +604,25 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
/**
* Obtains the ID of the Samza processor pending launch on the provided resource (container).
*
- * ContainerProcessManager [INFO] Container ID: container_e66_1569376389369_0221_01_000049 matched pending Processor ID: 0 on host: ltx1-app0772.stg.linkedin.com
- *
- * @param containerId last known id of the container deployed
- * @return the logical processorId of the processor (e.g., 0, 1, 2 ...)
+ * @param resourceId the ID of the resource (container)
+ * @return the ID of the Samza processor on this resource
*/
- private String getPendingProcessorId(String containerId) {
+ private String getPendingProcessorId(String resourceId) {
for (Map.Entry<String, SamzaResource> entry: state.pendingProcessors.entrySet()) {
- if (entry.getValue().getContainerId().equals(containerId)) {
- LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", containerId, entry.getKey(), entry.getValue().getHost());
+ if (entry.getValue().getContainerId().equals(resourceId)) {
+ LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", resourceId, entry.getKey(), entry.getValue().getHost());
return entry.getKey();
}
}
return null;
}
- /**
- * Request {@link ContainerManager#handleContainerStop} to determine next step of actions for the stopped container
- */
- private void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
- containerManager.handleContainerStop(processorId, containerId, preferredHost, exitStatus, preferredHostRetryDelay, containerAllocator);
+ private void handleContainerStop(String processorId, String resourceID, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
+ if (standbyContainerManager.isPresent()) {
+ standbyContainerManager.get().handleContainerStop(processorId, resourceID, preferredHost, exitStatus, containerAllocator, preferredHostRetryDelay);
+ } else {
+ // If StandbyTasks are not enabled, we simply make a request for the preferredHost
+ containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay);
+ }
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java
index 649435a..99ef433 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithHostAffinity.java
@@ -22,6 +22,7 @@ import org.apache.samza.config.Config;
import java.lang.reflect.Field;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -29,8 +30,8 @@ public class MockContainerAllocatorWithHostAffinity extends ContainerAllocator {
private Semaphore semaphore = new Semaphore(0);
public MockContainerAllocatorWithHostAffinity(ClusterResourceManager manager,
- Config config, SamzaApplicationState state, ContainerManager containerManager) {
- super(manager, config, state, true, containerManager);
+ Config config, SamzaApplicationState state) {
+ super(manager, config, state, true, Optional.empty());
}
/**
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
index 7448e57..0b3ff80 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
@@ -18,9 +18,11 @@
*/
package org.apache.samza.clustermanager;
+import java.util.Optional;
import org.apache.samza.config.Config;
import java.lang.reflect.Field;
+
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -29,9 +31,10 @@ public class MockContainerAllocatorWithoutHostAffinity extends ContainerAllocato
public int requestedContainers = 0;
private Semaphore semaphore = new Semaphore(0);
- public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager resourceManager,
- Config config, SamzaApplicationState state, ContainerManager containerManager) {
- super(resourceManager, config, state, false, containerManager);
+ public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager manager,
+ Config config,
+ SamzaApplicationState state) {
+ super(manager, config, state, false, Optional.empty());
}
/**
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 3451c4c..927df89 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -24,6 +24,7 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -60,7 +61,6 @@ public class TestContainerAllocatorWithHostAffinity {
private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
- private final ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
private JobModelManager initializeJobModelManager(Config config, int containerCount) {
Map<String, Map<String, String>> localityMap = new HashMap<>();
@@ -83,8 +83,7 @@ public class TestContainerAllocatorWithHostAffinity {
@Before
public void setup() throws Exception {
- containerAllocator =
- new ContainerAllocator(clusterResourceManager, config, state, true, containerManager);
+ containerAllocator = new ContainerAllocator(clusterResourceManager, config, state, true, Optional.empty());
requestState = new MockContainerRequestState(clusterResourceManager, true);
Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
requestStateField.setAccessible(true);
@@ -360,8 +359,6 @@ public class TestContainerAllocatorWithHostAffinity {
@Test
public void testRequestAllocationOnPreferredHostWithRunStreamProcessor() throws Exception {
ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
- ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
- ContainerManager containerManager = new ContainerManager(state, mockClusterResourceManager, false);
// Mock the callback from ClusterManager to add resources to the allocator
doAnswer((InvocationOnMock invocation) -> {
SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -370,7 +367,8 @@ public class TestContainerAllocatorWithHostAffinity {
}).when(mockCPM).onResourcesAvailable(anyList());
spyAllocator = Mockito.spy(
- new ContainerAllocator(mockClusterResourceManager, config, state, true, containerManager));
+ new ContainerAllocator(new MockClusterResourceManager(mockCPM, state), config, state, true,
+ Optional.empty()));
// Request Resources
spyAllocator.requestResources(new HashMap<String, String>() {
@@ -407,9 +405,8 @@ public class TestContainerAllocatorWithHostAffinity {
@Test
public void testExpiredRequestAllocationOnAnyHost() throws Exception {
MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
- ContainerManager spyContainerManager = spy(new ContainerManager(state, spyManager, false));
- spyAllocator = Mockito.spy(
- new ContainerAllocator(spyManager, config, state, true, spyContainerManager));
+ spyAllocator = Mockito.spy(new ContainerAllocator(spyManager, config, state, true, Optional.empty()));
+
// Request Preferred Resources
spyAllocator.requestResources(new HashMap<String, String>() {
{
@@ -428,10 +425,10 @@ public class TestContainerAllocatorWithHostAffinity {
// Verify that all the request that were created as preferred host requests expired
assertTrue(state.preferredHostRequests.get() == 2);
assertTrue(state.expiredPreferredHostRequests.get() == 2);
- verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
- any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
- verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
- any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+ verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
+ any(SamzaResourceRequest.class));
+ verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
+ any(SamzaResourceRequest.class));
// Verify that preferred host request were cancelled and since no surplus resources were available
// requestResource was invoked with ANY_HOST requests
@@ -449,11 +446,10 @@ public class TestContainerAllocatorWithHostAffinity {
@Test
public void testExpiredRequestAllocationOnSurplusAnyHostWithRunStreamProcessor() throws Exception {
// Add Extra Resources
- MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state));
- ContainerManager spyContainerManager = spy(new ContainerManager(state, spyClusterResourceManager, false));
-
spyAllocator = Mockito.spy(
- new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager));
+ new ContainerAllocator(new MockClusterResourceManager(callback, state), config, state, true,
+ Optional.empty()));
+
spyAllocator.addResource(new SamzaResource(1, 1000, "xyz", "id1"));
spyAllocator.addResource(new SamzaResource(1, 1000, "zzz", "id2"));
@@ -473,11 +469,11 @@ public class TestContainerAllocatorWithHostAffinity {
Thread.sleep(100);
// Verify that all the request that were created as preferred host requests expired
- assertEquals(state.expiredPreferredHostRequests.get(), 2);
- verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
- any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
- verify(spyContainerManager, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
- any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+ assertTrue(state.expiredPreferredHostRequests.get() == 2);
+ verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
+ any(SamzaResourceRequest.class));
+ verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
+ any(SamzaResourceRequest.class));
// Verify that runStreamProcessor was invoked with already available ANY_HOST requests
ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index 9b0d1b7..16eac0b 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.JobModelManager;
@@ -66,8 +67,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
@Before
public void setup() throws Exception {
- containerAllocator = new ContainerAllocator(manager, config, state, false,
- new ContainerManager(state, manager, false));
+ containerAllocator = new ContainerAllocator(manager, config, state, false, Optional.empty());
requestState = new MockContainerRequestState(manager, false);
Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
requestStateField.setAccessible(true);
@@ -261,10 +261,9 @@ public class TestContainerAllocatorWithoutHostAffinity {
};
ClusterResourceManager.Callback mockCPM = mock(ClusterResourceManager.Callback.class);
- ClusterResourceManager mockManager = new MockClusterResourceManager(mockCPM, state);
- ContainerManager spyContainerManager = spy(new ContainerManager(state, mockManager, false));
spyAllocator = Mockito.spy(
- new ContainerAllocator(mockManager, config, state, false, spyContainerManager));
+ new ContainerAllocator(new MockClusterResourceManager(mockCPM, state), config, state, false,
+ Optional.empty()));
// Mock the callback from ClusterManager to add resources to the allocator
doAnswer((InvocationOnMock invocation) -> {
SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
@@ -283,9 +282,9 @@ public class TestContainerAllocatorWithoutHostAffinity {
resourceRequestCaptor.getAllValues()
.forEach(resourceRequest -> assertEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST));
assertTrue(state.anyHostRequests.get() == containersToHostMapping.size());
- // Expiry currently should not be invoked for host affinity enabled cases only
- verify(spyContainerManager, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
- any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+ // Expiry currently is only handled for host affinity enabled cases
+ verify(spyAllocator, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
+ any(SamzaResourceRequest.class));
// Only updated when host affinity is enabled
assertTrue(state.matchedResourceRequests.get() == 0);
assertTrue(state.preferredHostRequests.get() == 0);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 639b247..f100393 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -141,7 +141,7 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
+
ContainerProcessManager cpm =
buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());
@@ -165,8 +165,7 @@ public class TestContainerProcessManager {
state,
new MetricsRegistryMap(),
clusterResourceManager,
- Optional.empty(),
- containerManager
+ Optional.empty()
);
allocator =
@@ -184,7 +183,6 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());
@@ -192,8 +190,7 @@ public class TestContainerProcessManager {
MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
conf,
- state,
- containerManager);
+ state);
getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator);
CountDownLatch latch = new CountDownLatch(1);
@@ -250,13 +247,11 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
conf,
- state,
- containerManager);
+ state);
ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -296,13 +291,11 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
conf,
- state,
- containerManager);
+ state);
ContainerProcessManager cpm = spy(
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -393,13 +386,11 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
clusterManagerConfig,
- state,
- containerManager);
+ state);
ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
@@ -477,13 +468,11 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
clusterManagerConfig,
- state,
- containerManager);
+ state);
ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
@@ -585,13 +574,11 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
conf,
- state,
- containerManager);
+ state);
ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -625,17 +612,15 @@ public class TestContainerProcessManager {
configMap.putAll(getConfig());
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
new MapConfig(config),
- state,
- containerManager);
+ state);
ContainerProcessManager manager =
- new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(), clusterResourceManager,
- Optional.of(allocator), containerManager);
+ new ContainerProcessManager(new ClusterManagerConfig(config), state, new MetricsRegistryMap(),
+ clusterResourceManager, Optional.of(allocator));
manager.start();
SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
@@ -659,13 +644,11 @@ public class TestContainerProcessManager {
"1", "host2")));
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
MockContainerAllocatorWithHostAffinity allocator = new MockContainerAllocatorWithHostAffinity(
clusterResourceManager,
cfg,
- state,
- containerManager);
+ state);
ContainerProcessManager cpm =
spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator)));
@@ -722,13 +705,11 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(conf)));
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
conf,
- state,
- containerManager);
+ state);
ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -795,13 +776,11 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(new MapConfig(config)));
- ContainerManager containerManager = new ContainerManager(state, clusterResourceManager, false);
MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
conf,
- state,
- containerManager);
+ state);
ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
@@ -887,7 +866,7 @@ public class TestContainerProcessManager {
private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
ClusterResourceManager clusterResourceManager, Optional<ContainerAllocator> allocator) {
- return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager, allocator,
- new ContainerManager(state, clusterResourceManager, false));
+ return new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), clusterResourceManager,
+ allocator);
}
}