You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/05/22 18:58:11 UTC
[gobblin] branch master updated: [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (#3692)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b43980014 [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (#3692)
b43980014 is described below
commit b4398001431050df51c593e88b0ccb45c7dd8fa4
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Mon May 22 11:58:03 2023 -0700
[GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (#3692)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology
* improve code style
* add more un-retriable status that we want to log out
* address comments
* add log when mismatch happens for debuggability
---------
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
.../gobblin/yarn/YarnAutoScalingManager.java | 6 +-
.../java/org/apache/gobblin/yarn/YarnService.java | 66 +++++++++++++++++++---
2 files changed, 62 insertions(+), 10 deletions(-)
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 5f5c872c6..e6683cfd3 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -88,7 +88,7 @@ public class YarnAutoScalingManager extends AbstractIdleService {
private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + "windowSize";
- private final static int DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
+ public final static int DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
private final Config config;
private final HelixManager helixManager;
@@ -97,9 +97,9 @@ public class YarnAutoScalingManager extends AbstractIdleService {
private final int partitionsPerContainer;
private final double overProvisionFactor;
private final SlidingWindowReservoir slidingFixedSizeWindow;
- private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
+ private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
private static final HashSet<TaskPartitionState>
- UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.DROPPED);
+ UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.DROPPED, TaskPartitionState.COMPLETED, TaskPartitionState.TIMED_OUT);
public YarnAutoScalingManager(GobblinApplicationMaster appMaster) {
this.config = appMaster.getConfig();
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index fe0d93d2d..e1da50d94 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -200,6 +201,8 @@ public class YarnService extends AbstractIdleService {
private final boolean isPurgingOfflineHelixInstancesEnabled;
private final long helixPurgeLaggingThresholdMs;
private final long helixPurgeStatusPollingRateMs;
+ private final ConcurrentMap<ContainerId, Long> containerIdleSince = Maps.newConcurrentMap();
+ private final ConcurrentMap<ContainerId, String> removedContainerID = Maps.newConcurrentMap();
private volatile YarnContainerRequestBundle yarnContainerRequest;
private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
@@ -473,11 +476,30 @@ public class YarnService extends AbstractIdleService {
return false;
}
+ //Correct the containerMap first as there is cases that handleContainerCompletion() is called before onContainersAllocated()
+ for (ContainerId removedId : this.removedContainerID.keySet()) {
+ ContainerInfo containerInfo = this.containerMap.remove(removedId);
+ if (containerInfo != null) {
+ String helixTag = containerInfo.getHelixTag();
+ allocatedContainerCountMap.putIfAbsent(helixTag, new AtomicInteger(0));
+ this.allocatedContainerCountMap.get(helixTag).decrementAndGet();
+ this.removedContainerID.remove(removedId);
+ }
+ }
+
int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
// YARN can allocate more than the requested number of containers, compute additional allocations and deallocations
// based on the max of the requested and actual allocated counts
// Represents the number of containers allocated for across all helix tags
int totalAllocatedContainers = this.containerMap.size();
+ int totalContainersInContainerCountMap = 0;
+ for (AtomicInteger count: allocatedContainerCountMap.values()) {
+ totalContainersInContainerCountMap += count.get();
+ }
+ if (totalContainersInContainerCountMap != totalAllocatedContainers) {
+ LOGGER.warn(String.format("Container number mismatch in containerMap and allocatedContainerCountMap, "
+ + "we have %s containers in containerMap while %s in allocatedContainerCountMap", totalAllocatedContainers, totalContainersInContainerCountMap));
+ }
// Request additional containers if the desired count is higher than the max of the current allocation or previously
// requested amount. Note that there may be in-flight or additional allocations after numContainers has been computed
@@ -501,31 +523,54 @@ public class YarnService extends AbstractIdleService {
}
}
+ //Iterate through all containers allocated and check whether the corresponding helix instance is still LIVE within the helix cluster.
+ // A container that has a bad connection to zookeeper will be dropped from the Helix cluster if the disconnection is greater than the specified timeout.
+ // In these cases, we want to release the container to get a new container because these containers won't be assigned tasks by Helix
+
+ List<Container> containersToRelease = new ArrayList<>();
+ HashSet<ContainerId> idleContainerIdsToRelease = new HashSet<>();
+ for (Map.Entry<ContainerId, ContainerInfo> entry : this.containerMap.entrySet()) {
+ ContainerInfo containerInfo = entry.getValue();
+ if (!HelixUtils.isInstanceLive(helixManager, containerInfo.getHelixParticipantId())) {
+ containerIdleSince.putIfAbsent(entry.getKey(), System.currentTimeMillis());
+ if (System.currentTimeMillis() - containerIdleSince.get(entry.getKey())
+ >= TimeUnit.MINUTES.toMillis(YarnAutoScalingManager.DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES)) {
+ LOGGER.info("Releasing Container {} because the assigned participant {} has been in-active for more than {} minutes",
+ entry.getKey(), containerInfo.getHelixParticipantId(), YarnAutoScalingManager.DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES);
+ containersToRelease.add(containerInfo.getContainer());
+ idleContainerIdsToRelease.add(entry.getKey());
+ }
+ } else {
+ containerIdleSince.remove(entry.getKey());
+ }
+ }
+
// If the total desired is lower than the currently allocated amount then release free containers.
// This is based on the currently allocated amount since containers may still be in the process of being allocated
// and assigned work. Resizing based on numRequestedContainers at this point may release a container right before
// or soon after it is assigned work.
- if (numTargetContainers < totalAllocatedContainers) {
- List<Container> containersToRelease = new ArrayList<>();
+ if (numTargetContainers < totalAllocatedContainers - idleContainerIdsToRelease.size()) {
int numToShutdown = totalAllocatedContainers - numTargetContainers;
- LOGGER.info("Shrinking number of containers by {} because numTargetContainers < totalAllocatedContainers ({} < {})",
- numToShutdown, numTargetContainers, totalAllocatedContainers);
+ LOGGER.info("Shrinking number of containers by {} because numTargetContainers < totalAllocatedContainers - idleContainersToRelease ({} < {} - {})",
+ totalAllocatedContainers - numTargetContainers - idleContainerIdsToRelease.size(), numTargetContainers, totalAllocatedContainers, idleContainerIdsToRelease.size());
// Look for eligible containers to release. If a container is in use then it is not released.
for (Map.Entry<ContainerId, ContainerInfo> entry : this.containerMap.entrySet()) {
ContainerInfo containerInfo = entry.getValue();
- if (!inUseInstances.contains(containerInfo.getHelixParticipantId())) {
+ if (!inUseInstances.contains(containerInfo.getHelixParticipantId()) && !idleContainerIdsToRelease.contains(entry.getKey())) {
containersToRelease.add(containerInfo.getContainer());
}
- if (containersToRelease.size() == numToShutdown) {
+ if (containersToRelease.size() >= numToShutdown) {
break;
}
}
LOGGER.info("Shutting down {} containers. containersToRelease={}", containersToRelease.size(), containersToRelease);
+ }
+ if (!containersToRelease.isEmpty()) {
this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
}
this.yarnContainerRequest = yarnContainerRequestBundle;
@@ -721,9 +766,16 @@ public class YarnService extends AbstractIdleService {
//Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might
//encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the
//containerId missing from the containersMap.
+ // We use removedContainerID to remember these containers and remove them from containerMap later when we call requestTargetNumberOfContainers method
+ if (completedContainerInfo == null) {
+ removedContainerID.putIfAbsent(containerStatus.getContainerId(), "");
+ }
String completedInstanceName = completedContainerInfo == null? UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
+
String helixTag = completedContainerInfo == null ? helixInstanceTags : completedContainerInfo.getHelixTag();
- allocatedContainerCountMap.get(helixTag).decrementAndGet();
+ if (completedContainerInfo != null) {
+ allocatedContainerCountMap.get(helixTag).decrementAndGet();
+ }
LOGGER.info(String.format("Container %s running Helix instance %s with tag %s has completed with exit status %d",
containerStatus.getContainerId(), completedInstanceName, helixTag, containerStatus.getExitStatus()));