You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/05/22 18:58:11 UTC

[gobblin] branch master updated: [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (#3692)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b43980014 [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (#3692)
b43980014 is described below

commit b4398001431050df51c593e88b0ccb45c7dd8fa4
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Mon May 22 11:58:03 2023 -0700

    [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (#3692)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology
    
    * improve code style
    
    * add more un-retriable status that we want to log out
    
    * address comments
    
    * add log when mismatch happens for debuggability
    
    ---------
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
 .../gobblin/yarn/YarnAutoScalingManager.java       |  6 +-
 .../java/org/apache/gobblin/yarn/YarnService.java  | 66 +++++++++++++++++++---
 2 files changed, 62 insertions(+), 10 deletions(-)

diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 5f5c872c6..e6683cfd3 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -88,7 +88,7 @@ public class YarnAutoScalingManager extends AbstractIdleService {
 
   private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + "windowSize";
 
-  private final static int DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
+  public final static int DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
 
   private final Config config;
   private final HelixManager helixManager;
@@ -97,9 +97,9 @@ public class YarnAutoScalingManager extends AbstractIdleService {
   private final int partitionsPerContainer;
   private final double overProvisionFactor;
   private final SlidingWindowReservoir slidingFixedSizeWindow;
-  private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
+  private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
   private static final HashSet<TaskPartitionState>
-      UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.DROPPED);
+      UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.DROPPED, TaskPartitionState.COMPLETED, TaskPartitionState.TIMED_OUT);
 
   public YarnAutoScalingManager(GobblinApplicationMaster appMaster) {
     this.config = appMaster.getConfig();
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index fe0d93d2d..e1da50d94 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -200,6 +201,8 @@ public class YarnService extends AbstractIdleService {
   private final boolean isPurgingOfflineHelixInstancesEnabled;
   private final long helixPurgeLaggingThresholdMs;
   private final long helixPurgeStatusPollingRateMs;
+  private final ConcurrentMap<ContainerId, Long> containerIdleSince = Maps.newConcurrentMap();
+  private final ConcurrentMap<ContainerId, String> removedContainerID = Maps.newConcurrentMap();
 
   private volatile YarnContainerRequestBundle yarnContainerRequest;
   private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
@@ -473,11 +476,30 @@ public class YarnService extends AbstractIdleService {
       return false;
     }
 
+    //Correct the containerMap first as there is cases that handleContainerCompletion() is called before onContainersAllocated()
+    for (ContainerId removedId : this.removedContainerID.keySet()) {
+      ContainerInfo containerInfo = this.containerMap.remove(removedId);
+      if (containerInfo != null) {
+        String helixTag = containerInfo.getHelixTag();
+        allocatedContainerCountMap.putIfAbsent(helixTag, new AtomicInteger(0));
+        this.allocatedContainerCountMap.get(helixTag).decrementAndGet();
+        this.removedContainerID.remove(removedId);
+      }
+    }
+
     int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
     // YARN can allocate more than the requested number of containers, compute additional allocations and deallocations
     // based on the max of the requested and actual allocated counts
     // Represents the number of containers allocated for across all helix tags
     int totalAllocatedContainers = this.containerMap.size();
+    int totalContainersInContainerCountMap = 0;
+    for (AtomicInteger count: allocatedContainerCountMap.values()) {
+      totalContainersInContainerCountMap += count.get();
+    }
+    if (totalContainersInContainerCountMap != totalAllocatedContainers) {
+      LOGGER.warn(String.format("Container number mismatch in containerMap and allocatedContainerCountMap, "
+          + "we have %s containers in containerMap while %s in allocatedContainerCountMap", totalAllocatedContainers, totalContainersInContainerCountMap));
+    }
 
     // Request additional containers if the desired count is higher than the max of the current allocation or previously
     // requested amount. Note that there may be in-flight or additional allocations after numContainers has been computed
@@ -501,31 +523,54 @@ public class YarnService extends AbstractIdleService {
       }
     }
 
+    //Iterate through all containers allocated and check whether the corresponding helix instance is still LIVE within the helix cluster.
+    // A container that has a bad connection to zookeeper will be dropped from the Helix cluster if the disconnection is greater than the specified timeout.
+    // In these cases, we want to release the container to get a new container because these containers won't be assigned tasks by Helix
+
+    List<Container> containersToRelease = new ArrayList<>();
+    HashSet<ContainerId> idleContainerIdsToRelease = new HashSet<>();
+    for (Map.Entry<ContainerId, ContainerInfo> entry : this.containerMap.entrySet()) {
+      ContainerInfo containerInfo = entry.getValue();
+      if (!HelixUtils.isInstanceLive(helixManager, containerInfo.getHelixParticipantId())) {
+        containerIdleSince.putIfAbsent(entry.getKey(), System.currentTimeMillis());
+        if (System.currentTimeMillis() - containerIdleSince.get(entry.getKey())
+            >= TimeUnit.MINUTES.toMillis(YarnAutoScalingManager.DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES)) {
+          LOGGER.info("Releasing Container {} because the assigned participant {} has been in-active for more than {} minutes",
+              entry.getKey(), containerInfo.getHelixParticipantId(), YarnAutoScalingManager.DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES);
+          containersToRelease.add(containerInfo.getContainer());
+          idleContainerIdsToRelease.add(entry.getKey());
+        }
+      } else {
+        containerIdleSince.remove(entry.getKey());
+      }
+    }
+
     // If the total desired is lower than the currently allocated amount then release free containers.
     // This is based on the currently allocated amount since containers may still be in the process of being allocated
     // and assigned work. Resizing based on numRequestedContainers at this point may release a container right before
     // or soon after it is assigned work.
-    if (numTargetContainers < totalAllocatedContainers) {
-      List<Container> containersToRelease = new ArrayList<>();
+    if (numTargetContainers < totalAllocatedContainers - idleContainerIdsToRelease.size()) {
       int numToShutdown = totalAllocatedContainers - numTargetContainers;
 
-      LOGGER.info("Shrinking number of containers by {} because numTargetContainers < totalAllocatedContainers ({} < {})",
-          numToShutdown, numTargetContainers, totalAllocatedContainers);
+      LOGGER.info("Shrinking number of containers by {} because numTargetContainers < totalAllocatedContainers - idleContainersToRelease ({} < {} - {})",
+          totalAllocatedContainers - numTargetContainers - idleContainerIdsToRelease.size(), numTargetContainers, totalAllocatedContainers, idleContainerIdsToRelease.size());
 
       // Look for eligible containers to release. If a container is in use then it is not released.
       for (Map.Entry<ContainerId, ContainerInfo> entry : this.containerMap.entrySet()) {
         ContainerInfo containerInfo = entry.getValue();
-        if (!inUseInstances.contains(containerInfo.getHelixParticipantId())) {
+        if (!inUseInstances.contains(containerInfo.getHelixParticipantId()) && !idleContainerIdsToRelease.contains(entry.getKey())) {
           containersToRelease.add(containerInfo.getContainer());
         }
 
-        if (containersToRelease.size() == numToShutdown) {
+        if (containersToRelease.size() >= numToShutdown) {
           break;
         }
       }
 
       LOGGER.info("Shutting down {} containers. containersToRelease={}", containersToRelease.size(), containersToRelease);
+    }
 
+    if (!containersToRelease.isEmpty()) {
       this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
     }
     this.yarnContainerRequest = yarnContainerRequestBundle;
@@ -721,9 +766,16 @@ public class YarnService extends AbstractIdleService {
     //Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might
     //encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the
     //containerId missing from the containersMap.
+    // We use removedContainerID to remember these containers and remove them from containerMap later when we call requestTargetNumberOfContainers method
+    if (completedContainerInfo == null) {
+      removedContainerID.putIfAbsent(containerStatus.getContainerId(), "");
+    }
     String completedInstanceName = completedContainerInfo == null?  UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
+
     String helixTag = completedContainerInfo == null ? helixInstanceTags : completedContainerInfo.getHelixTag();
-    allocatedContainerCountMap.get(helixTag).decrementAndGet();
+    if (completedContainerInfo != null) {
+      allocatedContainerCountMap.get(helixTag).decrementAndGet();
+    }
 
     LOGGER.info(String.format("Container %s running Helix instance %s with tag %s has completed with exit status %d",
         containerStatus.getContainerId(), completedInstanceName, helixTag, containerStatus.getExitStatus()));