You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/03/04 20:35:09 UTC
[samza] branch master updated: SEP-19: Allocator changes for
standby-aware container allocation, and active container failover
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new e2b9a76 SEP-19: Allocator changes for standby-aware container allocation, and active container failover
e2b9a76 is described below
commit e2b9a76fcba7488a4457907d3915c9a3cbf7cf51
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Mon Mar 4 12:35:05 2019 -0800
SEP-19: Allocator changes for standby-aware container allocation, and active container failover
This PR makes the following changes:
* Adds a map called standbyContainerConstraints which stores standby constraints for each container in the job model. The logic for populating the map using the job model is added as Util class.
* Adds a check before runStreamProcessor, where the standbyContainerConstraints is checked before launching, if the check succeeds container is launched. If standbyTasks are not enabled in config, existing behaviour is retained.
* Adds logic to handle standbyContainerConstraints check failures, which entails releasing the resource, and making an any-host request for the container.
* Adds logic in SamzaResourceRequest to order container request such that active-containers take precedence over standby, otherwise based on request-timestamp.
* Adds logic in HostAwareContainerAllocator to trap resourceRequests issued by the CPM and the HACA, and
a. if it is an any-host for an active container, translate it for a stop a standby.
b. if it is for a standby stopped by us, translate it for a resource request for active on standby-host and standby on anyhost.
c. in all other cases, proceeds asis.
* Adds metrics to capture FailedStandbyAllocations (due to standby constraints), successful standby allocations, number of failovers of active to a valid standby, number of failovers of active to anyhost, in case no standby was found, and number of standby-container-stops completed.
Tested on dev setup, a VPC, and a test cluster.
Author: Ray Matharu <rm...@linkedin.com>
Author: rmatharu <40...@users.noreply.github.com>
Reviewers: Jagadish Venkatraman <vj...@gmail.com>
Closes #903 from rmatharu/test-basicStandbyFailover
---
.../clustermanager/AbstractContainerAllocator.java | 12 +-
.../clustermanager/ClusterResourceManager.java | 8 +
.../clustermanager/ContainerProcessManager.java | 47 +-
.../HostAwareContainerAllocator.java | 33 +-
.../samza/clustermanager/ResourceRequestState.java | 24 +-
.../clustermanager/SamzaApplicationState.java | 23 +-
.../samza/clustermanager/SamzaResourceRequest.java | 12 +-
.../clustermanager/StandbyContainerManager.java | 516 +++++++++++++++++++++
.../samza/clustermanager/StandbyTaskUtil.java | 121 +++++
.../grouper/task/TaskNameGrouperProxy.java | 21 +-
.../apache/samza/storage/StorageManagerUtil.java | 9 +-
.../metrics/ContainerProcessManagerMetrics.scala | 4 +
.../clustermanager/MockClusterResourceManager.java | 5 +
.../clustermanager/MockContainerRequestState.java | 4 +-
.../MockHostAwareContainerAllocator.java | 3 +-
.../TestContainerProcessManager.java | 3 +-
.../TestHostAwareContainerAllocator.java | 3 +-
.../samza/clustermanager/TestStandbyAllocator.java | 126 +++++
.../samza/job/yarn/YarnClusterResourceManager.java | 26 +-
19 files changed, 943 insertions(+), 57 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 5547a32..7adb1cc 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -194,11 +194,19 @@ public abstract class AbstractContainerAllocator implements Runnable {
* @param preferredHost Name of the host that you prefer to run the container on
*/
public final void requestResource(String containerID, String preferredHost) {
- SamzaResourceRequest request = new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb,
+ SamzaResourceRequest request = getResourceRequest(containerID, preferredHost);
+ issueResourceRequest(request);
+ }
+
+ public final SamzaResourceRequest getResourceRequest(String containerID, String preferredHost) {
+ return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb,
preferredHost, containerID);
+ }
+
+ public final void issueResourceRequest(SamzaResourceRequest request) {
resourceRequestState.addResourceRequest(request);
state.containerRequests.incrementAndGet();
- if (ResourceRequestState.ANY_HOST.equals(preferredHost)) {
+ if (ResourceRequestState.ANY_HOST.equals(request.getPreferredHost())) {
state.anyHostRequests.incrementAndGet();
} else {
state.preferredHostRequests.incrementAndGet();
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index f8a8c8b..19f3ef0 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -121,6 +121,14 @@ public abstract class ClusterResourceManager {
*/
public abstract void launchStreamProcessor(SamzaResource resource, CommandBuilder builder);
+ /**
+ * Requests the stopping of a StreamProcessor, identified by the given resource.
+ * {@link Callback#onResourcesCompleted(List)} will be invoked to indicate the completion of this operation.
+ *
+ * @param resource the resource being used for the StreamProcessor.
+ */
+ public abstract void stopStreamProcessor(SamzaResource resource);
+
public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index a089ed9..e63b425 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager;
+import java.util.Optional;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
@@ -77,6 +78,9 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
private final AbstractContainerAllocator containerAllocator;
private final Thread allocatorThread;
+ // The StandbyContainerManager manages standby-aware allocation and failover of containers
+ private final Optional<StandbyContainerManager> standbyContainerManager;
+
/**
* A standard interface to request resources.
*/
@@ -115,6 +119,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
this.containerAllocator = allocator;
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
+ this.standbyContainerManager = Optional.empty();
}
public ContainerProcessManager(Config config,
@@ -130,8 +135,14 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
+ if (jobConfig.getStandbyTasksEnabled()) {
+ this.standbyContainerManager = Optional.of(new StandbyContainerManager(state, clusterResourceManager));
+ } else {
+ this.standbyContainerManager = Optional.empty();
+ }
+
if (this.hostAffinityEnabled) {
- this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, state);
+ this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, standbyContainerManager, state);
} else {
this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
}
@@ -155,10 +166,10 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
this.clusterResourceManager = resourceManager;
this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
-
+ this.standbyContainerManager = Optional.empty();
if (this.hostAffinityEnabled) {
- this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, state);
+ this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, this.standbyContainerManager, state);
} else {
this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
}
@@ -185,14 +196,12 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
clusterResourceManager.start();
log.info("Starting the Samza task manager");
- final int containerCount = jobConfig.getContainerCount();
- state.containerCount.set(containerCount);
- state.neededContainers.set(containerCount);
+ state.containerCount.set(state.jobModelManager.jobModel().getContainers().size());
+ state.neededContainers.set(state.jobModelManager.jobModel().getContainers().size());
// Request initial set of containers
Map<String, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
-
containerAllocator.requestResources(containerToHostMapping);
// Start container allocator thread
@@ -297,8 +306,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
state.neededContainers.incrementAndGet();
state.jobHealthy.set(false);
- // request a container on new host
- containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST);
+ // handle container stop due to node fail
+ this.handleContainerStop(containerId, containerStatus.getResourceID(), ResourceRequestState.ANY_HOST, exitStatus);
break;
default:
@@ -371,9 +380,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
}
if (!tooManyFailedContainers) {
- log.info("Requesting a new container ");
- // Request a new container
- containerAllocator.requestResource(containerId, lastSeenOn);
+ handleContainerStop(containerId, containerStatus.getResourceID(), lastSeenOn, exitStatus);
}
}
@@ -428,8 +435,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
String containerId = getPendingContainerId(resource.getResourceID());
log.info("Failed container ID: {} for resourceId: {}", containerId, resource.getResourceID());
- // 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host.
- if (containerId != null) {
+ // 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host, if standby are not enabled
+ // otherwise calling standbyContainerManager
+ if (containerId != null && standbyContainerManager.isPresent()) {
+ this.standbyContainerManager.get().handleContainerLaunchFail(containerId, resource.getResourceID(), containerAllocator);
+ } else if (containerId != null) {
log.info("Launch of container ID: {} failed on host: {}. Falling back to ANY_HOST", containerId, resource.getHost());
containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST);
} else {
@@ -484,5 +494,12 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
return null;
}
-
+ private void handleContainerStop(String containerID, String resourceID, String preferredHost, int exitStatus) {
+ if (standbyContainerManager.isPresent()) {
+ standbyContainerManager.get().handleContainerStop(containerID, resourceID, preferredHost, exitStatus, containerAllocator);
+ } else {
+ // If StandbyTasks are not enabled, we simply make a request for the preferredHost
+ containerAllocator.requestResource(containerID, preferredHost);
+ }
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index 27d1caa..6bfa1a6 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -18,11 +18,14 @@
*/
package org.apache.samza.clustermanager;
+
+import java.util.Optional;
import java.util.Map;
import org.apache.samza.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* This is the allocator thread that will be used by ContainerProcessManager when host-affinity is enabled for a job. It is similar
* to {@link ContainerAllocator}, except that it considers locality for allocation.
@@ -45,11 +48,13 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
* Tracks the expiration of a request for resources.
*/
private final int requestTimeout;
+ private final Optional<StandbyContainerManager> standbyContainerManager;
public HostAwareContainerAllocator(ClusterResourceManager manager,
- int timeout, Config config, SamzaApplicationState state) {
+ int timeout, Config config, Optional<StandbyContainerManager> standbyContainerManager, SamzaApplicationState state) {
super(manager, new ResourceRequestState(true, manager), config, state);
this.requestTimeout = timeout;
+ this.standbyContainerManager = standbyContainerManager;
}
/**
@@ -70,7 +75,8 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
if (hasAllocatedResource(preferredHost)) {
// Found allocated container at preferredHost
log.info("Found a matched-container {} on the preferred host. Running on {}", containerID, preferredHost);
- runStreamProcessor(request, preferredHost);
+ // Try to launch streamProcessor on this preferredHost if it all standby constraints are met
+ checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost, peekAllocatedResource(preferredHost));
state.matchedResourceRequests.incrementAndGet();
} else {
log.info("Did not find any allocated resources on preferred host {} for running container id {}",
@@ -81,14 +87,21 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
if (expired) {
updateExpiryMetrics(request);
- if (resourceAvailableOnAnyHost) {
+
+ if (standbyContainerManager.isPresent()) {
+ standbyContainerManager.get().handleExpiredResourceRequest(containerID, request,
+ Optional.ofNullable(peekAllocatedResource(ResourceRequestState.ANY_HOST)), this, resourceRequestState);
+
+ } else if (resourceAvailableOnAnyHost) {
log.info("Request for container: {} on {} has expired. Running on ANY_HOST", request.getContainerID(), request.getPreferredHost());
runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+
} else {
log.info("Request for container: {} on {} has expired. Requesting additional resources on ANY_HOST.", request.getContainerID(), request.getPreferredHost());
resourceRequestState.cancelResourceRequest(request);
requestResource(containerID, ResourceRequestState.ANY_HOST);
}
+
} else {
log.info("Request for container: {} on {} has not yet expired. Request creation time: {}. Request timeout: {}",
new Object[]{request.getContainerID(), request.getPreferredHost(), request.getRequestTimestampMs(), requestTimeout});
@@ -98,6 +111,7 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
}
}
+
/**
* Since host-affinity is enabled, all container processes will be requested on their preferred host. If the job is
* run for the first time, it will get matched to any available host.
@@ -142,4 +156,15 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
state.expiredPreferredHostRequests.incrementAndGet();
}
}
-}
+
+ private void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost, SamzaResource samzaResource) {
+ // If standby tasks are not enabled run streamprocessor on the given host
+ if (!this.standbyContainerManager.isPresent()) {
+ runStreamProcessor(request, preferredHost);
+ return;
+ }
+
+ this.standbyContainerManager.get().checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost,
+ samzaResource, this, resourceRequestState);
+ }
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index aa7a509..0296d4c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -152,8 +152,8 @@ public class ResourceRequestState {
* assigned to ANY_HOST
*/
log.info("The number of containers already allocated on {} is greater than what was " +
- "requested, which is {}. Hence, saving the samzaResource {} in the buffer for ANY_HOST",
- new Object[]{hostName, requestCountOnThisHost, samzaResource.getResourceID()});
+ "requested, which is {}. Hence, saving the samzaResource {} in the buffer for ANY_HOST",
+ new Object[]{hostName, requestCountOnThisHost, samzaResource.getResourceID()});
addToAllocatedResourceList(ANY_HOST, samzaResource);
}
}
@@ -238,9 +238,21 @@ public class ResourceRequestState {
*
* @param resource the {@link SamzaResource} to release.
*/
- public void releaseUnstartableContainer(SamzaResource resource) {
- log.info("Releasing unstartable container {}", resource.getResourceID());
- manager.releaseResources(resource);
+ public void releaseUnstartableContainer(SamzaResource resource, String preferredHost) {
+ synchronized (lock) {
+ log.info("Releasing unstartable container {} on host {}", resource.getResourceID(), resource.getHost());
+ manager.releaseResources(resource);
+
+ // A reference for the resource could either be held in the preferred host buffer or in the ANY_HOST buffer.
+ if (allocatedResources.get(preferredHost) != null) {
+ allocatedResources.get(preferredHost).remove(resource);
+ log.info("Resource {} removed from allocated resource buffer for host {}", resource.getResourceID(), preferredHost);
+ }
+ if (allocatedResources.get(ANY_HOST) != null) {
+ allocatedResources.get(ANY_HOST).remove(resource);
+ log.info("Resource {} removed from allocated resource buffer for host {}", resource.getResourceID(), ANY_HOST);
+ }
+ }
}
@@ -366,4 +378,4 @@ public class ResourceRequestState {
}
-}
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index 4e6fc33..2a7b6a1 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -78,7 +78,7 @@ public class SamzaApplicationState {
public final AtomicInteger releasedContainers = new AtomicInteger(0);
/**
- * ContainerStatus of failed containers.
+ * ContainerStatuses of failed containers.
*/
public final ConcurrentMap<String, SamzaResourceStatus> failedContainersStatus = new ConcurrentHashMap<String, SamzaResourceStatus>();
@@ -110,7 +110,7 @@ public class SamzaApplicationState {
*/
public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0);
- /**
+ /**
* Final status of the application. Made to be volatile s.t. changes will be visible in multiple threads.
*/
public volatile SamzaAppStatus status = SamzaAppStatus.UNDEFINED;
@@ -141,6 +141,25 @@ public class SamzaApplicationState {
*/
public final AtomicInteger redundantNotifications = new AtomicInteger(0);
+ /**
+ * Number of container allocations from the RM, that did not meet standby container constraints, in which case the
+ * existing resource was given back to the RM, and a new ANY-HOST request had to be made.
+ */
+ public final AtomicInteger failedStandbyAllocations = new AtomicInteger(0);
+
+ /**
+ * Number of occurrences in which a failover of an active container was initiated (due to a node failure), in which a
+ * running standby container was available for the failover.
+ * If two standby containers were used for one failing active, it counts as two.
+ */
+ public final AtomicInteger failoversToStandby = new AtomicInteger(0);
+
+ /**
+ * Number of occurrences in which a failover of an active container was initiated (due to a node failure), in which no
+ * running standby container was available for the failover.
+ */
+ public final AtomicInteger failoversToAnyHost = new AtomicInteger(0);
+
public SamzaApplicationState(JobModelManager jobModelManager) {
this.jobModelManager = jobModelManager;
}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 4159ff2..df70459 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -109,11 +109,21 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
}
/**
- * Requests are ordered by the time at which they were created.
+ * Requests are ordered by the container type and the time at which they were created.
+ * Active containers take precedence over standby containers, regardless of timestamp.
* @param o the other
*/
@Override
public int compareTo(SamzaResourceRequest o) {
+
+ if (!StandbyTaskUtil.isStandbyContainer(this.containerID) && StandbyTaskUtil.isStandbyContainer(o.containerID)) {
+ return -1;
+ }
+
+ if (StandbyTaskUtil.isStandbyContainer(this.containerID) && !StandbyTaskUtil.isStandbyContainer(o.containerID)) {
+ return 1;
+ }
+
if (this.requestTimestampMs < o.requestTimestampMs)
return -1;
if (this.requestTimestampMs > o.requestTimestampMs)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
new file mode 100644
index 0000000..db27238
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.storage.kv.Entry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Encapsulates logic and state concerning standby-containers.
+ */
+public class StandbyContainerManager {
+
+ private static final Logger log = LoggerFactory.getLogger(StandbyContainerManager.class);
+
+ private final SamzaApplicationState samzaApplicationState;
+
+ // Map of samza containerIDs to their corresponding active and standby containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1}
+ // This is used for checking no two standbys or active-standby-pair are started on the same host
+ private final Map<String, List<String>> standbyContainerConstraints;
+
+ // Map of active containers that are in failover, indexed by the active container's resourceID (at the time of failure)
+ private final Map<String, FailoverMetadata> failovers;
+
+ // Resource-manager, used to stop containers
+ private ClusterResourceManager clusterResourceManager;
+
+ public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
+ ClusterResourceManager clusterResourceManager) {
+ this.failovers = new ConcurrentHashMap<>();
+ this.standbyContainerConstraints = new HashMap<>();
+ this.samzaApplicationState = samzaApplicationState;
+ JobModel jobModel = samzaApplicationState.jobModelManager.jobModel();
+
+ // populate the standbyContainerConstraints map by iterating over all containers
+ jobModel.getContainers()
+ .keySet()
+ .forEach(containerId -> standbyContainerConstraints.put(containerId,
+ StandbyTaskUtil.getStandbyContainerConstraints(containerId, jobModel)));
+ this.clusterResourceManager = clusterResourceManager;
+
+ log.info("Populated standbyContainerConstraints map {}", standbyContainerConstraints);
+ }
+
+ /**
+ * We handle the stopping of a container depending on the case which is decided using the exit-status:
+ * Case 1. an Active-Container which has stopped for an "unknown" reason, then we start it on the given preferredHost (but we record the resource-request)
+ * Case 2. Active container has stopped because of node failure, thene we initiate a failover
+ * Case 3. StandbyContainer has stopped after it was chosen for failover, see {@link StandbyContainerManager#handleStandbyContainerStop}
+ * Case 4. StandbyContainer has stopped but not because of a failover, see {@link StandbyContainerManager#handleStandbyContainerStop}
+ *
+ * @param containerID containerID of the stopped container
+ * @param resourceID last resourceID of the stopped container
+ * @param preferredHost the host on which the container was running
+ * @param exitStatus the exit status of the failed container
+ * @param containerAllocator the container allocator
+ */
+ public void handleContainerStop(String containerID, String resourceID, String preferredHost, int exitStatus,
+ AbstractContainerAllocator containerAllocator) {
+
+ if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+ handleStandbyContainerStop(containerID, resourceID, preferredHost, containerAllocator);
+ } else {
+ // initiate failover for the active container based on the exitStatus
+ switch (exitStatus) {
+ case SamzaResourceStatus.DISK_FAIL:
+ case SamzaResourceStatus.ABORTED:
+ case SamzaResourceStatus.PREEMPTED:
+ initiateActiveContainerFailover(containerID, resourceID, containerAllocator);
+ break;
+ // in all other cases, request-resource for the failed container, but record the resource-request, so that
+ // if this request expires, we can do a failover -- select a standby to stop & start the active on standby's host
+ default:
+ log.info("Requesting resource for active-container {} on host {}", containerID, preferredHost);
+ SamzaResourceRequest resourceRequest = containerAllocator.getResourceRequest(containerID, preferredHost);
+ FailoverMetadata failoverMetadata = registerActiveContainerFailure(containerID, resourceID);
+ failoverMetadata.recordResourceRequest(resourceRequest);
+ containerAllocator.issueResourceRequest(resourceRequest);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Handle the failed launch of a container, based on
+ * Case 1. If it is an active container, then initiate a failover for it.
+ * Case 2. If it is standby container, request a new resource on AnyHost.
+ * @param containerID the ID of the container that has failed
+ */
+ public void handleContainerLaunchFail(String containerID, String resourceID,
+ AbstractContainerAllocator containerAllocator) {
+
+ if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+ log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID);
+ containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+ } else {
+ initiateActiveContainerFailover(containerID, resourceID, containerAllocator);
+ }
+ }
+
+ /**
+ * If a standby container has stopped, then there are two possible cases
+ * Case 1. during a failover, the standby container was stopped for an active's start, then we
+ * 1. request a resource on the standby's host to place the activeContainer, and
+ * 2. request anyhost to place this standby
+ *
+ * Case 2. independent of a failover, the standby container stopped, in which proceed with its resource-request
+ * @param standbyContainerID SamzaContainerID of the standby container
+ * @param preferredHost Preferred host of the standby container
+ */
+ private void handleStandbyContainerStop(String standbyContainerID, String resourceID, String preferredHost,
+ AbstractContainerAllocator containerAllocator) {
+
+ // if this standbyContainerResource was stopped for a failover, we will find a metadata entry
+ Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = this.checkIfUsedForFailover(resourceID);
+
+ if (failoverMetadata.isPresent()) {
+ String activeContainerID = failoverMetadata.get().activeContainerID;
+ String standbyContainerHostname = failoverMetadata.get().getStandbyContainerHostname(resourceID);
+
+ log.info("Requesting resource for active container {} on host {}, and backup container {} on any host",
+ activeContainerID, standbyContainerHostname, standbyContainerID);
+
+ // request standbycontainer's host for active-container
+ SamzaResourceRequest resourceRequestForActive = containerAllocator.getResourceRequest(activeContainerID, standbyContainerHostname);
+ // record the resource request, before issuing it to avoid race with allocation-thread
+ failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
+ containerAllocator.issueResourceRequest(resourceRequestForActive);
+
+ // request any-host for standby container
+ containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST);
+ } else {
+ log.info("Issuing request for standby container {} on host {}, since this is not for a failover",
+ standbyContainerID, preferredHost);
+ containerAllocator.requestResource(standbyContainerID, preferredHost);
+ }
+ }
+
+ /** Method to handle failover for an active container.
+ * We try to find a standby for the active container, and issue a stop on it.
+ * If we do not find a standby container, we simply issue an anyhost request to place it.
+ *
+ * @param containerID the samzaContainerID of the active-container
+ * @param resourceID the samza-resource-ID of the container when it failed (used to index failover-state)
+ */
+ private void initiateActiveContainerFailover(String containerID, String resourceID,
+ AbstractContainerAllocator containerAllocator) {
+
+ Optional<Entry<String, SamzaResource>> standbyContainer = this.selectStandby(containerID, resourceID);
+
+ // If we find a standbyContainer, we initiate a failover
+ if (standbyContainer.isPresent()) {
+
+ String standbyContainerId = standbyContainer.get().getKey();
+ SamzaResource standbyResource = standbyContainer.get().getValue();
+ String standbyResourceID = standbyResource.getResourceID();
+ String standbyHost = standbyResource.getHost();
+
+ // update the state
+ FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(containerID, resourceID);
+ failoverMetadata.updateStandbyContainer(standbyResourceID, standbyHost);
+
+ log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, "
+ + "for active container {}", standbyContainerId, standbyResourceID, containerID);
+ samzaApplicationState.failoversToStandby.incrementAndGet();
+ this.clusterResourceManager.stopStreamProcessor(standbyResource);
+
+ } else {
+
+ // If we dont find a standbyContainer, we proceed with the ANYHOST request
+ log.info("No standby container found for active container {}, making a request for {}", containerID,
+ ResourceRequestState.ANY_HOST);
+ samzaApplicationState.failoversToAnyHost.incrementAndGet();
+ containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+ }
+ }
+
+ /**
+ * Method to select a standby container for a given active container that has stopped.
+ * TODO: enrich this method to select standby's intelligently based on lag, timestamp, load-balencing, etc.
+ * @param activeContainerID Samza containerID of the active container
+ * @param activeContainerResourceID ResourceID of the active container at the time of its last failure
+ * @return
+ */
+ private Optional<Entry<String, SamzaResource>> selectStandby(String activeContainerID,
+ String activeContainerResourceID) {
+
+ log.info("Standby containers {} for active container {}", this.standbyContainerConstraints.get(activeContainerID), activeContainerID);
+
+ // obtain any existing failover metadata
+ Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata =
+ activeContainerResourceID == null ? Optional.empty() : this.getFailoverMetadata(activeContainerResourceID);
+
+ // Iterate over the list of running standby containers, to find a standby resource that we have not already
+ // used for a failover for this active resoruce
+ for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {
+
+ if (samzaApplicationState.runningContainers.containsKey(standbyContainerID)) {
+ SamzaResource standbyContainerResource = samzaApplicationState.runningContainers.get(standbyContainerID);
+
+ // use this standby if there was no previous failover for which this standbyResource was used
+ if (!(failoverMetadata.isPresent() && failoverMetadata.get().isStandbyResourceUsed(standbyContainerResource.getResourceID()))) {
+
+ log.info("Returning standby container {} in running state for active container {}", standbyContainerID,
+ activeContainerID);
+ return Optional.of(new Entry<>(standbyContainerID, standbyContainerResource));
+ }
+ }
+ }
+
+ log.info("Did not find any running standby container for active container {}", activeContainerID);
+ return Optional.empty();
+ }
+
+ /**
+ * Register the failure of an active container (identified by its resource ID).
+ */
+ private FailoverMetadata registerActiveContainerFailure(String activeContainerID, String activeContainerResourceID) {
+
+ // this active container's resource ID is already registered, in which case update the metadata
+ FailoverMetadata failoverMetadata;
+ if (failovers.containsKey(activeContainerResourceID)) {
+ failoverMetadata = failovers.get(activeContainerResourceID);
+ } else {
+ failoverMetadata = new FailoverMetadata(activeContainerID, activeContainerResourceID);
+ }
+ this.failovers.put(activeContainerResourceID, failoverMetadata);
+ return failoverMetadata;
+ }
+
+ /**
+ * Check if this standbyContainerResource is present in the failoverState for an active container.
+ * This is used to determine if we requested a stop a container.
+ */
+ private Optional<FailoverMetadata> checkIfUsedForFailover(String standbyContainerResourceId) {
+
+ if (standbyContainerResourceId == null) {
+ return Optional.empty();
+ }
+
+ for (FailoverMetadata failoverMetadata : failovers.values()) {
+ if (failoverMetadata.isStandbyResourceUsed(standbyContainerResourceId)) {
+ log.info("Standby container with resource id {} was selected for failover of active container {}",
+ standbyContainerResourceId, failoverMetadata.activeContainerID);
+ return Optional.of(failoverMetadata);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+ *
+ * @param request The resource request to match.
+ * @param samzaResource The samzaResource to potentially match the resource to.
+ * @return
+ */
+ private boolean checkStandbyConstraints(SamzaResourceRequest request, SamzaResource samzaResource) {
+ String containerIDToStart = request.getContainerID();
+ String host = samzaResource.getHost();
+ List<String> containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIDToStart);
+
+ // Check if any of these conflicting containers are running/launching on host
+ for (String containerID : containerIDsForStandbyConstraints) {
+ SamzaResource resource = samzaApplicationState.pendingContainers.get(containerID);
+
+ // return false if a conflicting container is pending for launch on the host
+ if (resource != null && resource.getHost().equals(host)) {
+ log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+ containerIDToStart, samzaResource.getHost(), containerID);
+ return false;
+ }
+
+ // return false if a conflicting container is running on the host
+ resource = samzaApplicationState.runningContainers.get(containerID);
+ if (resource != null && resource.getHost().equals(host)) {
+ log.info("Container {} cannot be started on host {} because container {} is already running on this host",
+ containerIDToStart, samzaResource.getHost(), containerID);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Attempt to the run a container on the given candidate resource, if doing so meets the standby container constraints.
+ * @param request The Samza container request
+ * @param preferredHost the preferred host associated with the container
+ * @param samzaResource the resource candidate
+ */
+ public void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost,
+ SamzaResource samzaResource, AbstractContainerAllocator containerAllocator,
+ ResourceRequestState resourceRequestState) {
+ String containerID = request.getContainerID();
+
+ if (checkStandbyConstraints(request, samzaResource)) {
+ // This resource can be used to launch this container
+ log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID, samzaResource.getHost(), preferredHost);
+ containerAllocator.runStreamProcessor(request, preferredHost);
+ } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+ // This resource cannot be used to launch this standby container, so we make a new anyhost request
+ log.info("Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request",
+ containerID, samzaResource.getHost());
+ resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost);
+ resourceRequestState.cancelResourceRequest(request);
+ containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+ samzaApplicationState.failedStandbyAllocations.incrementAndGet();
+ } else {
+ // This resource cannot be used to launch this active container container, so we initiate a failover
+ log.warn("Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource",
+ containerID, samzaResource.getHost());
+ resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost);
+ resourceRequestState.cancelResourceRequest(request);
+
+ Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
+
+ // if this active-container has never failed, then simple request anyhost
+ if (!failoverMetadata.isPresent()) {
+ log.info("Requesting ANY_HOST for active container {}", containerID);
+ containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+ } else {
+ log.info("Initiating failover for active container {}", containerID);
+ // we use the activeContainer's last resourceID to initiate the failover
+ String lastKnownResourceID = failoverMetadata.get().activeContainerResourceID;
+ initiateActiveContainerFailover(containerID, lastKnownResourceID, containerAllocator);
+ }
+
+ samzaApplicationState.failedStandbyAllocations.incrementAndGet();
+ }
+ }
+
+ /**
+ * Handle an expired resource request
+ * @param containerID the containerID for which the resource request was made
+ * @param request the expired resource request
+ * @param alternativeResource an alternative, already-allocated, resource (if available)
+ * @param containerAllocator the container allocator (used to issue any required subsequent resource requests)
+ * @param resourceRequestState used to cancel resource requests if required.
+ */
+ public void handleExpiredResourceRequest(String containerID, SamzaResourceRequest request,
+ Optional<SamzaResource> alternativeResource, AbstractContainerAllocator containerAllocator,
+ ResourceRequestState resourceRequestState) {
+
+ if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+ handleExpiredRequestForStandbyContainer(containerID, request, alternativeResource, containerAllocator, resourceRequestState);
+ } else {
+ handleExpiredRequestForActiveContainer(containerID, request, alternativeResource, containerAllocator, resourceRequestState);
+ }
+ }
+
+ // Handle an expired resource request that was made for placing a standby container
+ private void handleExpiredRequestForStandbyContainer(String containerID, SamzaResourceRequest request,
+ Optional<SamzaResource> alternativeResource, AbstractContainerAllocator containerAllocator,
+ ResourceRequestState resourceRequestState) {
+
+ if (alternativeResource.isPresent()) {
+ // A standby container can be started on the anyhost-alternative-resource rightaway provided it passes all the
+ // standby constraints
+ log.info("Handling expired request, standby container {} can be started on alternative resource {}", containerID, alternativeResource.get());
+ checkStandbyConstraintsAndRunStreamProcessor(request, ResourceRequestState.ANY_HOST, alternativeResource.get(),
+ containerAllocator, resourceRequestState);
+
+ } else {
+ // If there is no alternative-resource for the standby container we make a new anyhost request
+ log.info("Handling expired request, requesting anyHost resource for standby container {}", containerID);
+ resourceRequestState.cancelResourceRequest(request);
+ containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+ }
+ }
+
+ // Handle an expired resource request that was made for placing an active container
+ private void handleExpiredRequestForActiveContainer(String containerID, SamzaResourceRequest request,
+ Optional<SamzaResource> alternativeResource, AbstractContainerAllocator containerAllocator,
+ ResourceRequestState resourceRequestState) {
+
+ Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
+
+ // An active container can be started on the alternative-any-host resource rightaway, if it has no prior failure,
+ // that is, there is no failoverMetadata associated with this resource-request
+ if (alternativeResource.isPresent() && !failoverMetadata.isPresent()) {
+
+ log.info("Handling expired request, trying to run active container {} on alternative resource {}", containerID, alternativeResource.get());
+
+ checkStandbyConstraintsAndRunStreamProcessor(request, ResourceRequestState.ANY_HOST, alternativeResource.get(),
+ containerAllocator, resourceRequestState);
+
+ } else if (!failoverMetadata.isPresent() && !alternativeResource.isPresent()) {
+ // An active container has no prior failure, and there is no-alternative-anyhost resource, so we make a new anyhost request
+ log.info("Handling expired request, requesting anyHost resource for active container {} because this active container has never failed", containerID);
+
+ resourceRequestState.cancelResourceRequest(request);
+ containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+
+ } else if (failoverMetadata.isPresent()) {
+ // An active container that had failed, and whose subsequent resource request has expired, needs to be failed over to
+ // a new standby-candidate, so we initiate a failover
+
+ log.info("Handling expired request, initiating failover for active container {}", containerID);
+
+ resourceRequestState.cancelResourceRequest(request);
+
+ // we use the activeContainer's resourceID to initiate the failover
+ String lastKnownResourceID = failoverMetadata.get().activeContainerResourceID;
+ initiateActiveContainerFailover(containerID, lastKnownResourceID, containerAllocator);
+
+ } else {
+ log.error("Handling expired request, invalid state containerID {}, resource request {}", containerID, request);
+ }
+ }
+
+
+ /**
+ * Check if a activeContainerResource has failover-metadata associated with it
+ */
+ private Optional<FailoverMetadata> getFailoverMetadata(String activeContainerResourceID) {
+ return this.failovers.containsKey(activeContainerResourceID) ? Optional.of(
+ this.failovers.get(activeContainerResourceID)) : Optional.empty();
+ }
+
+ /**
+ * Check if a SamzaResourceRequest was issued for a failover.
+ */
+ private Optional<FailoverMetadata> getFailoverMetadata(SamzaResourceRequest resourceRequest) {
+ for (FailoverMetadata failoverMetadata : this.failovers.values()) {
+ if (failoverMetadata.containsResourceRequest(resourceRequest)) {
+ return Optional.of(failoverMetadata);
+ }
+ }
+ return Optional.empty();
+ }
+
+
+ @Override
+ public String toString() {
+ return this.failovers.toString();
+ }
+
+ /**
+ * Encapsulates metadata concerning the failover of an active container.
+ */
+ public class FailoverMetadata {
+ public final String activeContainerID;
+ public final String activeContainerResourceID;
+
+ // Map of samza-container-resource ID to host, for each standby container selected for failover of the activeContainer
+ private final Map<String, String> selectedStandbyContainers;
+
+ // Resource requests issued during this failover
+ private final Set<SamzaResourceRequest> resourceRequests;
+
+ public FailoverMetadata(String activeContainerID, String activeContainerResourceID) {
+ this.activeContainerID = activeContainerID;
+ this.activeContainerResourceID = activeContainerResourceID;
+ this.selectedStandbyContainers = new HashMap<>();
+ resourceRequests = new HashSet<>();
+ }
+
+ // Check if this standbyContainerResourceID was used in this failover
+ public synchronized boolean isStandbyResourceUsed(String standbyContainerResourceID) {
+ return this.selectedStandbyContainers.keySet().contains(standbyContainerResourceID);
+ }
+
+ // Get the hostname corresponding to the standby resourceID
+ public synchronized String getStandbyContainerHostname(String standbyContainerResourceID) {
+ return selectedStandbyContainers.get(standbyContainerResourceID);
+ }
+
+ // Add the standbyContainer resource to the list of standbyContainers used in this failover
+ public synchronized void updateStandbyContainer(String standbyContainerResourceID, String standbyContainerHost) {
+ this.selectedStandbyContainers.put(standbyContainerResourceID, standbyContainerHost);
+ }
+
+ // Add the samzaResourceRequest to the list of resource requests associated with this failover
+ public synchronized void recordResourceRequest(SamzaResourceRequest samzaResourceRequest) {
+ this.resourceRequests.add(samzaResourceRequest);
+ }
+
+ // Check if this resource request is used for this failover
+ public synchronized boolean containsResourceRequest(SamzaResourceRequest samzaResourceRequest) {
+ return this.resourceRequests.contains(samzaResourceRequest);
+ }
+
+ @Override
+ public String toString() {
+ return "[activeContainerID: " + this.activeContainerID + " activeContainerResourceID: "
+ + this.activeContainerResourceID + " selectedStandbyContainers:" + selectedStandbyContainers
+ + " resourceRequests: " + resourceRequests + "]";
+ }
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyTaskUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyTaskUtil.java
new file mode 100644
index 0000000..c993445
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyTaskUtil.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+
+
+/**
+ * Collection of util methods used for performing Standby-aware Container allocation in YARN.
+ */
+public class StandbyTaskUtil {
+ private static final String STANDBY_CONTAINER_ID_SEPARATOR = "-";
+ private static final String TASKNAME_SEPARATOR = "-";
+ private static final String STANDBY_TASKNAME_PREFIX = "Standby";
+
+ /**
+ * Returns true if the containerName implies a standby container, false otherwise.
+ * @param containerID The desired containerID
+ * @return
+ */
+ public static boolean isStandbyContainer(String containerID) {
+ return containerID.contains(STANDBY_CONTAINER_ID_SEPARATOR);
+ }
+
+ // Helper method to generate buddy containerIDs by appending the replica-number to the active-container's id.
+ public final static String getStandbyContainerId(String activeContainerId, int replicaNumber) {
+ return activeContainerId.concat(STANDBY_CONTAINER_ID_SEPARATOR).concat(String.valueOf(replicaNumber));
+ }
+
+ // Helper method to generate active container's ID by removing the replica-number from the standby container's id.
+ public final static String getActiveContainerId(String standbyContainerID) {
+ return standbyContainerID.split(STANDBY_CONTAINER_ID_SEPARATOR)[0];
+ }
+
+ // Helper method to get the standby task name by prefixing "Standby" to the corresponding active task's name.
+ public final static TaskName getStandbyTaskName(TaskName activeTaskName, int replicaNum) {
+ return new TaskName(STANDBY_TASKNAME_PREFIX.concat(TASKNAME_SEPARATOR)
+ .concat(activeTaskName.getTaskName())
+ .concat(TASKNAME_SEPARATOR)
+ .concat(String.valueOf(replicaNum)));
+ }
+
+ // Helper method to get the active task name by stripping the prefix "Standby" from the standby task name.
+ public final static TaskName getActiveTaskName(TaskName standbyTaskName) {
+ return new TaskName(standbyTaskName.getTaskName().split(TASKNAME_SEPARATOR)[1]);
+ }
+
+ /**
+ * Given a containerID and job model, it returns the containerids of all containers that either have
+ * a. standby tasks corresponding to active tasks on the given container, or
+ * b. have active tasks corresponding to standby tasks on the given container.
+ * This is used to ensure that an active task and all its corresponding standby tasks are on separate hosts, and
+ * standby tasks corresponding to the same active task are on separate hosts.
+ */
+ public static List<String> getStandbyContainerConstraints(String containerID, JobModel jobModel) {
+
+ ContainerModel givenContainerModel = jobModel.getContainers().get(containerID);
+ List<String> containerIDsWithStandbyConstraints = new ArrayList<>();
+
+ // iterate over all containerModels in the jobModel
+ for (ContainerModel containerModel : jobModel.getContainers().values()) {
+
+ // add to list if active and standby tasks on the two containerModels overlap
+ if (!givenContainerModel.equals(containerModel) && checkTaskOverlap(givenContainerModel, containerModel)) {
+ containerIDsWithStandbyConstraints.add(containerModel.getId());
+ }
+ }
+ return containerIDsWithStandbyConstraints;
+ }
+
+ // Helper method that checks if tasks on the two containerModels overlap
+ private static boolean checkTaskOverlap(ContainerModel containerModel1, ContainerModel containerModel2) {
+ Set<TaskName> activeTasksOnContainer1 = getCorrespondingActiveTasks(containerModel1);
+ Set<TaskName> activeTasksOnContainer2 = getCorrespondingActiveTasks(containerModel2);
+ return !Collections.disjoint(activeTasksOnContainer1, activeTasksOnContainer2);
+ }
+
+ // Helper method that returns the active tasks corresponding to all standby tasks on a container, including any already-active tasks on the container
+ private static Set<TaskName> getCorrespondingActiveTasks(ContainerModel containerModel) {
+ Set<TaskName> tasksInActiveMode = getAllTasks(containerModel, TaskMode.Active);
+ tasksInActiveMode.addAll(getAllTasks(containerModel, TaskMode.Standby).stream()
+ .map(taskName -> getActiveTaskName(taskName))
+ .collect(Collectors.toSet()));
+ return tasksInActiveMode;
+ }
+
+ // Helper method to getAllTaskModels of this container in the given taskMode
+ private static Set<TaskName> getAllTasks(ContainerModel containerModel, TaskMode taskMode) {
+ return containerModel.getTasks()
+ .values()
+ .stream()
+ .filter(e -> e.getTaskMode().equals(taskMode))
+ .map(taskModel -> taskModel.getTaskName())
+ .collect(Collectors.toSet());
+ }
+
+}
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
index c7d556a..5fd9ceb 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperProxy.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.samza.clustermanager.StandbyTaskUtil;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskMode;
@@ -60,9 +61,6 @@ import org.slf4j.LoggerFactory;
public class TaskNameGrouperProxy {
private static final Logger LOG = LoggerFactory.getLogger(TaskNameGrouperProxy.class);
- private static final String CONTAINER_ID_SEPARATOR = "-";
- private static final String TASKNAME_SEPARATOR = "-";
- private static final String STANDBY_TASKNAME_PREFIX = "Standby";
private final TaskNameGrouper taskNameGrouper;
private final boolean standbyTasksEnabled;
private final int replicationFactor;
@@ -104,7 +102,7 @@ public class TaskNameGrouperProxy {
for (ContainerModel activeContainer : containerModels) {
for (int replicaNum = 0; replicaNum < replicationFactor - 1; replicaNum++) {
- String buddyContainerId = getBuddyContainerId(activeContainer.getId(), replicaNum);
+ String buddyContainerId = StandbyTaskUtil.getStandbyContainerId(activeContainer.getId(), replicaNum);
ContainerModel buddyContainerModel =
new ContainerModel(buddyContainerId, getTaskModelForBuddyContainer(activeContainer.getTasks(), replicaNum));
@@ -124,7 +122,7 @@ public class TaskNameGrouperProxy {
Map<TaskName, TaskModel> standbyTaskModels = new HashMap<>();
for (TaskName taskName : activeContainerTaskModel.keySet()) {
- TaskName standbyTaskName = getStandbyTaskName(taskName, replicaNum);
+ TaskName standbyTaskName = StandbyTaskUtil.getStandbyTaskName(taskName, replicaNum);
TaskModel standbyTaskModel =
new TaskModel(standbyTaskName, activeContainerTaskModel.get(taskName).getSystemStreamPartitions(),
activeContainerTaskModel.get(taskName).getChangelogPartition(), TaskMode.Standby);
@@ -135,17 +133,4 @@ public class TaskNameGrouperProxy {
activeContainerTaskModel);
return standbyTaskModels;
}
-
- // Helper method to generate buddy containerIDs by appending the replica-number to the active-container's id.
- private final static String getBuddyContainerId(String activeContainerId, int replicaNumber) {
- return activeContainerId.concat(CONTAINER_ID_SEPARATOR).concat(String.valueOf(replicaNumber));
- }
-
- // Helper method to get the standby task name by prefixing "Standby" to the corresponding active task's name.
- private final static TaskName getStandbyTaskName(TaskName activeTaskName, int replicaNum) {
- return new TaskName(STANDBY_TASKNAME_PREFIX.concat(TASKNAME_SEPARATOR)
- .concat(activeTaskName.getTaskName())
- .concat(TASKNAME_SEPARATOR)
- .concat(String.valueOf(replicaNum)));
- }
}
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 42f7f4b..e662fa1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.samza.clustermanager.StandbyTaskUtil;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.serializers.model.SamzaObjectMapper;
@@ -204,6 +205,7 @@ public class StorageManagerUtil {
/**
* Creates and returns a File pointing to the directory for the given store and task, given a particular base directory.
+ * In case of a standby task (TaskMode.Standby), the storeDirectory is the same as it would be for an active task.
*
* @param storeBaseDir the base directory to use
* @param storeName the store name to use
@@ -212,7 +214,10 @@ public class StorageManagerUtil {
* @return the partition directory for the store
*/
public static File getStorePartitionDir(File storeBaseDir, String storeName, TaskName taskName, TaskMode taskMode) {
- // TODO: use task-Mode to decide the storePartitionDir -- standby's dir should be the same as active
- return new File(storeBaseDir, (storeName + File.separator + taskName.toString()).replace(' ', '_'));
+ TaskName taskNameForDirName = taskName;
+ if (taskMode.equals(TaskMode.Standby)) {
+ taskNameForDirName = StandbyTaskUtil.getActiveTaskName(taskName);
+ }
+ return new File(storeBaseDir, (storeName + File.separator + taskNameForDirName.toString()).replace(' ', '_'));
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 15cb18f..265e56f 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -69,6 +69,10 @@ class ContainerProcessManagerMetrics(
}
})
+ val mFailedStandbyAllocations = newGauge("failed-standby-allocations", () => state.failedStandbyAllocations.get())
+ val mFailoversToAnyHost = newGauge("failovers-to-any-host", () => state.failoversToAnyHost.get())
+ val mFailoversToStandby = newGauge("failovers-to-standby", () => state.failoversToStandby.get())
+
jvm.start
reporters.values.foreach(_.start)
}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index 471c7fe..4545a75 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -93,6 +93,11 @@ public class MockClusterResourceManager extends ClusterResourceManager {
launchCountSemaphore.release();
}
+ @Override
+ public void stopStreamProcessor(SamzaResource resource) {
+ // no op
+ }
+
public void registerContainerListener(MockContainerListener listener) {
mockContainerListeners.add(listener);
}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
index 3aa58b2..676018b 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java
@@ -70,8 +70,8 @@ public class MockContainerRequestState extends ResourceRequestState {
}
@Override
- public void releaseUnstartableContainer(SamzaResource container) {
- super.releaseUnstartableContainer(container);
+ public void releaseUnstartableContainer(SamzaResource container, String preferredHost) {
+ super.releaseUnstartableContainer(container, preferredHost);
numReleasedContainers += 1;
for (MockContainerListener listener : mockContainerListeners) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
index ea05fff..1285c5f 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHostAwareContainerAllocator.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager;
+import java.util.Optional;
import org.apache.samza.config.Config;
import java.lang.reflect.Field;
@@ -31,7 +32,7 @@ public class MockHostAwareContainerAllocator extends HostAwareContainerAllocator
public MockHostAwareContainerAllocator(ClusterResourceManager manager,
Config config, SamzaApplicationState state) {
- super(manager, ALLOCATOR_TIMEOUT_MS, config, state);
+ super(manager, ALLOCATOR_TIMEOUT_MS, config, Optional.empty(), state);
}
/**
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index d648a80..841e5ba 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -418,7 +418,7 @@ public class TestContainerProcessManager {
public void testAllBufferedResourcesAreUtilized() throws Exception {
Map<String, String> config = new HashMap<>();
config.putAll(getConfigWithHostAffinity());
- config.put("cluster-manager.container.count", "2");
+ config.put("job.container.count", "2");
Config cfg = new MapConfig(config);
// 1. Request two containers on hosts - host1 and host2
state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1",
@@ -475,6 +475,7 @@ public class TestContainerProcessManager {
if (!allocator.awaitContainersStart(2, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
+ taskManager.onStreamProcessorLaunchSuccess(resource2);
taskManager.onStreamProcessorLaunchSuccess(resource3);
assertTrue(state.jobHealthy.get());
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 3e5e785..fd3b452 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -21,6 +21,7 @@ package org.apache.samza.clustermanager;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -73,7 +74,7 @@ public class TestHostAwareContainerAllocator {
@Before
public void setup() throws Exception {
- containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, timeoutMillis, config, state);
+ containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, timeoutMillis, config, Optional.empty(), state);
requestState = new MockContainerRequestState(clusterResourceManager, true);
Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("resourceRequestState");
requestStateField.setAccessible(true);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
new file mode 100644
index 0000000..c075f14
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.Partition;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestStandbyAllocator {
+
+ @Test
+ public void testWithNoStandby() {
+ JobModel jobModel = getJobModelWithStandby(1, 1, 1);
+ List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints("0", jobModel);
+ Assert.assertEquals("Constrained container count should be 0", 0, containerConstraints.size());
+ }
+
+ @Test
+ public void testWithStandby() {
+ testWithStandby(2, 1, 2);
+ testWithStandby(10, 1, 2);
+
+ testWithStandby(2, 10, 2);
+ testWithStandby(2, 10, 4);
+
+ testWithStandby(10, 1, 4);
+ testWithStandby(10, 10, 4);
+ }
+
+
+ public void testWithStandby(int nContainers, int nTasks, int replicationFactor) {
+ JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, replicationFactor);
+
+ for (String containerID : jobModel.getContainers().keySet()) {
+ List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints(containerID, jobModel);
+
+ Assert.assertTrue("Constrained container should be valid containers",
+ jobModel.getContainers().keySet().containsAll(containerConstraints));
+
+ Assert.assertEquals("Constrained container count should be (replication factor-1)", replicationFactor - 1,
+ containerConstraints.size());
+
+ Assert.assertFalse("Constrained containers list should not have the container itself",
+ containerConstraints.contains(containerID));
+
+ containerConstraints.forEach(containerConstraintID -> {
+ Assert.assertTrue("Constrained containers IDs should correspond to the active container",
+ containerID.split("-")[0].equals(containerConstraintID.split("-")[0]));
+ });
+ }
+ }
+
+ // Helper method to create a jobmodel with given number of containers, tasks and replication factor
+ private JobModel getJobModelWithStandby(int nContainers, int nTasks, int replicationFactor) {
+ Map<String, ContainerModel> containerModels = new HashMap<>();
+ int taskID = 0;
+
+ for (int j = 0; j < nContainers; j++) {
+ Map<TaskName, TaskModel> tasks = new HashMap<>();
+ for (int i = 0; i < nTasks; i++) {
+ TaskModel taskModel = getTaskModel(taskID++);
+ tasks.put(taskModel.getTaskName(), taskModel);
+ }
+ containerModels.put(String.valueOf(j), new ContainerModel(String.valueOf(j), tasks));
+ }
+
+ Map<String, ContainerModel> standbyContainerModels = new HashMap<>();
+ for (int i = 0; i < replicationFactor - 1; i++) {
+ for (String containerID : containerModels.keySet()) {
+ String standbyContainerId = StandbyTaskUtil.getStandbyContainerId(containerID, i);
+ Map<TaskName, TaskModel> standbyTasks = getStandbyTasks(containerModels.get(containerID).getTasks(), i);
+ standbyContainerModels.put(standbyContainerId, new ContainerModel(standbyContainerId, standbyTasks));
+ }
+ }
+
+ containerModels.putAll(standbyContainerModels);
+ return new JobModel(new MapConfig(), containerModels);
+ }
+
+ // Helper method that creates a taskmodel with one input ssp
+ private static TaskModel getTaskModel(int partitionNum) {
+ return new TaskModel(new TaskName("Partition " + partitionNum),
+ Collections.singleton(new SystemStreamPartition("test-system", "test-stream", new Partition(partitionNum))),
+ new Partition(partitionNum), TaskMode.Active);
+ }
+
+ // Helper method to create standby-taskModels from active-taskModels
+ private Map<TaskName, TaskModel> getStandbyTasks(Map<TaskName, TaskModel> tasks, int replicaNum) {
+ Map<TaskName, TaskModel> standbyTasks = new HashMap<>();
+ tasks.forEach((taskName, taskModel) -> {
+ TaskName standbyTaskName = StandbyTaskUtil.getStandbyTaskName(taskName, replicaNum);
+ standbyTasks.put(standbyTaskName,
+ new TaskModel(standbyTaskName, taskModel.getSystemStreamPartitions(), taskModel.getChangelogPartition(),
+ TaskMode.Standby));
+ });
+ return standbyTasks;
+ }
+}
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 53b61d9..6d0fdb1 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -227,7 +227,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
*/
@Override
public void requestResources(SamzaResourceRequest resourceRequest) {
- log.info("Requesting resources on " + resourceRequest.getPreferredHost() + " for container " + resourceRequest.getContainerID());
+ log.info("Requesting resources on " + resourceRequest.getPreferredHost() + " for container " + resourceRequest.getContainerID());
int memoryMb = resourceRequest.getMemoryMB();
int cpuCores = resourceRequest.getNumCores();
@@ -314,6 +314,14 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
}
}
+ public void stopStreamProcessor(SamzaResource resource) {
+ synchronized (lock) {
+ log.info("Stopping resource {}", resource);
+ this.nmClientAsync.stopContainerAsync(allocatedResources.get(resource).getId(),
+ allocatedResources.get(resource).getNodeId());
+ }
+ }
+
/**
* Given a lookupContainerId from Yarn (for example: containerId_app_12345, this method returns the SamzaContainer ID
* in the range [0,N-1] that maps to it.
@@ -520,7 +528,8 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
@Override
public void onContainerStopped(ContainerId containerId) {
- log.info("Got a notification from the NodeManager for a stopped container. ContainerId: {}", containerId);
+ log.info("Got a notification from the NodeManager for a stopped container. ContainerId: {} samzaContainerId {}",
+ containerId, getIDForContainer(containerId.toString()));
}
@Override
@@ -549,6 +558,19 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
log.info("Got an error when stopping container from the NodeManager. ContainerId: {}. Error: {}", containerId, t);
+ String samzaContainerId = getIDForContainer(containerId.toString());
+
+ if (samzaContainerId != null) {
+ YarnContainer container = state.runningYarnContainers.get(samzaContainerId);
+ log.info("Failed Stop on Yarn Container: {} had Samza ContainerId: {} ", containerId.toString(), samzaContainerId);
+ SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
+ container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
+
+ log.info("Re-invoking stop stream processor for container: {}", containerId);
+ this.stopStreamProcessor(resource);// For now, we retry the stopping of the container
+ } else {
+ log.info("Got an invalid notification for container: {}", containerId.toString());
+ }
}
/**