You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/12 21:24:40 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1191] Reuse Helix instance names when containers are released…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 93ee795  [GOBBLIN-1191] Reuse Helix instance names when containers are released…
93ee795 is described below

commit 93ee79546759d73310fb6f949fcbfc57e232ccd8
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Jun 12 14:24:34 2020 -0700

    [GOBBLIN-1191] Reuse Helix instance names when containers are released…
    
    Closes #3039 from sv2000/helixInstanceNameHoles
---
 .../java/org/apache/gobblin/yarn/YarnService.java  | 36 +++++++++++++++-------
 1 file changed, 25 insertions(+), 11 deletions(-)

diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 3f127eb..f9c6cc6 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -29,7 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -82,8 +82,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.io.Closer;
@@ -189,10 +187,10 @@ public class YarnService extends AbstractIdleService {
   // A map from Helix instance names to the number times the instances are retried to be started
   private final ConcurrentMap<String, AtomicInteger> helixInstanceRetryCount = Maps.newConcurrentMap();
 
-  // A queue of unused Helix instance names. An unused Helix instance name gets put
-  // into the queue if the container running the instance completes. Unused Helix
+  // A concurrent HashSet of unused Helix instance names. An unused Helix instance name gets put
+  // into the set if the container running the instance completes. Unused Helix
   // instance names get picked up when replacement containers get allocated.
-  private final ConcurrentLinkedQueue<String> unusedHelixInstanceNames = Queues.newConcurrentLinkedQueue();
+  private final Set<String> unusedHelixInstanceNames = ConcurrentHashMap.newKeySet();
 
   private volatile boolean shutdownInProgress = false;
 
@@ -204,7 +202,6 @@ public class YarnService extends AbstractIdleService {
   @VisibleForTesting
   @Getter(AccessLevel.PROTECTED)
   private int numRequestedContainers = 0;
-  private final Set<String> blacklistedInstances = Sets.newHashSet();
 
   public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
       FileSystem fs, EventBus eventBus, HelixManager helixManager) throws Exception {
@@ -653,6 +650,10 @@ public class YarnService extends AbstractIdleService {
     if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
       if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
         LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
+        if (completedContainerEntry != null) {
+          LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
+          this.unusedHelixInstanceNames.add(completedInstanceName);
+        }
         return;
       } else {
         LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
@@ -697,9 +698,10 @@ public class YarnService extends AbstractIdleService {
         return;
       }
 
-      // Add the Helix instance name of the completed container to the queue of unused
+      // Add the Helix instance name of the completed container to the set of unused
       // instance names so they can be reused by a replacement container.
-      this.unusedHelixInstanceNames.offer(completedInstanceName);
+      LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
+      this.unusedHelixInstanceNames.add(completedInstanceName);
 
       if (this.eventSubmitter.isPresent()) {
         this.eventSubmitter.get()
@@ -753,8 +755,20 @@ public class YarnService extends AbstractIdleService {
 
         LOGGER.info(String.format("Container %s has been allocated", container.getId()));
 
-        String instanceName = unusedHelixInstanceNames.poll();
-        while (Strings.isNullOrEmpty(instanceName) || HelixUtils.isInstanceLive(helixManager, instanceName)) {
+        //Iterate over the (thread-safe) set of unused instances to find the first instance that is not currently live.
+        //Once we find a candidate instance, it is removed from the set.
+        String instanceName = null;
+        Iterator<String> iterator = unusedHelixInstanceNames.iterator();
+        while (iterator.hasNext()) {
+          instanceName = iterator.next();
+          if (!HelixUtils.isInstanceLive(helixManager, instanceName)) {
+            iterator.remove();
+            LOGGER.info("Found an unused instance {}", instanceName);
+            break;
+          }
+        }
+
+        if (Strings.isNullOrEmpty(instanceName)) {
           // No unused instance name, so generating a new one.
           instanceName = HelixUtils
               .getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX, helixInstanceIdGenerator.incrementAndGet());