You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/12 21:24:40 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1191] Reuse Helix instance names when containers are released…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 93ee795 [GOBBLIN-1191] Reuse Helix instance names when containers are released…
93ee795 is described below
commit 93ee79546759d73310fb6f949fcbfc57e232ccd8
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Jun 12 14:24:34 2020 -0700
[GOBBLIN-1191] Reuse Helix instance names when containers are released…
Closes #3039 from sv2000/helixInstanceNameHoles
---
.../java/org/apache/gobblin/yarn/YarnService.java | 36 +++++++++++++++-------
1 file changed, 25 insertions(+), 11 deletions(-)
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 3f127eb..f9c6cc6 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -29,7 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -82,8 +82,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.io.Closer;
@@ -189,10 +187,10 @@ public class YarnService extends AbstractIdleService {
// A map from Helix instance names to the number times the instances are retried to be started
private final ConcurrentMap<String, AtomicInteger> helixInstanceRetryCount = Maps.newConcurrentMap();
- // A queue of unused Helix instance names. An unused Helix instance name gets put
- // into the queue if the container running the instance completes. Unused Helix
+ // A concurrent HashSet of unused Helix instance names. An unused Helix instance name gets put
+ // into the set if the container running the instance completes. Unused Helix
// instance names get picked up when replacement containers get allocated.
- private final ConcurrentLinkedQueue<String> unusedHelixInstanceNames = Queues.newConcurrentLinkedQueue();
+ private final Set<String> unusedHelixInstanceNames = ConcurrentHashMap.newKeySet();
private volatile boolean shutdownInProgress = false;
@@ -204,7 +202,6 @@ public class YarnService extends AbstractIdleService {
@VisibleForTesting
@Getter(AccessLevel.PROTECTED)
private int numRequestedContainers = 0;
- private final Set<String> blacklistedInstances = Sets.newHashSet();
public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus, HelixManager helixManager) throws Exception {
@@ -653,6 +650,10 @@ public class YarnService extends AbstractIdleService {
if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
+ if (completedContainerEntry != null) {
+ LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
+ this.unusedHelixInstanceNames.add(completedInstanceName);
+ }
return;
} else {
LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
@@ -697,9 +698,10 @@ public class YarnService extends AbstractIdleService {
return;
}
- // Add the Helix instance name of the completed container to the queue of unused
+ // Add the Helix instance name of the completed container to the set of unused
// instance names so they can be reused by a replacement container.
- this.unusedHelixInstanceNames.offer(completedInstanceName);
+ LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
+ this.unusedHelixInstanceNames.add(completedInstanceName);
if (this.eventSubmitter.isPresent()) {
this.eventSubmitter.get()
@@ -753,8 +755,20 @@ public class YarnService extends AbstractIdleService {
LOGGER.info(String.format("Container %s has been allocated", container.getId()));
- String instanceName = unusedHelixInstanceNames.poll();
- while (Strings.isNullOrEmpty(instanceName) || HelixUtils.isInstanceLive(helixManager, instanceName)) {
+ //Iterate over the (thread-safe) set of unused instances to find the first instance that is not currently live.
+ //Once we find a candidate instance, it is removed from the set.
+ String instanceName = null;
+ Iterator<String> iterator = unusedHelixInstanceNames.iterator();
+ while (iterator.hasNext()) {
+ instanceName = iterator.next();
+ if (!HelixUtils.isInstanceLive(helixManager, instanceName)) {
+ iterator.remove();
+ LOGGER.info("Found an unused instance {}", instanceName);
+ break;
+ }
+ }
+
+ if (Strings.isNullOrEmpty(instanceName)) {
// No unused instance name, so generating a new one.
instanceName = HelixUtils
.getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX, helixInstanceIdGenerator.incrementAndGet());