You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/28 04:26:08 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1099] Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new cf6af89  [GOBBLIN-1099] Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
cf6af89 is described below

commit cf6af89995369e80afbd661ef0d7d2852d1ca4c9
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Mar 27 21:26:00 2020 -0700

    [GOBBLIN-1099] Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
    
    Closes #2940 from sv2000/yarnOrphans
---
 .../gobblin/cluster/GobblinClusterManager.java     |  8 +-
 .../apache/gobblin/cluster/GobblinTaskRunner.java  | 93 ++++++++++++++++++----
 .../org/apache/gobblin/cluster/HelixUtils.java     | 32 ++++++++
 .../gobblin/cluster/GobblinTaskRunnerTest.java     | 47 ++++++++++-
 .../gobblin/yarn/GobblinApplicationMaster.java     |  3 +-
 .../gobblin/yarn/GobblinYarnAppLauncher.java       | 43 +++++++---
 .../java/org/apache/gobblin/yarn/YarnService.java  | 45 ++++++++---
 .../gobblin/yarn/GobblinYarnAppLauncherTest.java   |  2 +-
 .../org/apache/gobblin/yarn/YarnServiceTest.java   | 20 ++---
 9 files changed, 237 insertions(+), 56 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 8eabad1..2c3f49f 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -56,6 +56,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -108,6 +109,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
   protected ServiceBasedAppLauncher applicationLauncher;
 
   // An EventBus used for communications between services running in the ApplicationMaster
+  @Getter(AccessLevel.PUBLIC)
   protected final EventBus eventBus = new EventBus(GobblinClusterManager.class.getSimpleName());
 
   protected final Path appWorkDir;
@@ -400,12 +402,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
     stop();
   }
 
-  @VisibleForTesting
-  EventBus getEventBus() {
-    return this.eventBus;
-  }
-
-
   /**
    * Creates and returns a {@link MessageHandlerFactory} for handling of Helix
    * {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s.
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index b9f1c96..2d35dea 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -42,12 +44,15 @@ import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
@@ -57,6 +62,10 @@ import org.apache.helix.task.TaskStateModelFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -121,6 +130,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
   private final String clusterName;
 
+  @Getter
   private HelixManager jobHelixManager;
 
   private Optional<HelixManager> taskDriverHelixManager = Optional.absent();
@@ -292,7 +302,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     // Add a shutdown hook so the task scheduler gets properly shutdown
     addShutdownHook();
 
-    connectHelixManager();
+    connectHelixManagerWithRetry();
 
     addInstanceTags();
 
@@ -366,21 +376,74 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   }
 
   @VisibleForTesting
-  void connectHelixManager() {
-    try {
-      this.jobHelixManager.connect();
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-              new ParticipantShutdownMessageHandlerFactory());
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-              getUserDefinedMessageHandlerFactory());
-      if (this.taskDriverHelixManager.isPresent()) {
-        this.taskDriverHelixManager.get().connect();
+  void connectHelixManager() throws Exception {
+    this.jobHelixManager.connect();
+    //Ensure the instance is enabled.
+    this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true);
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+            new ParticipantShutdownMessageHandlerFactory());
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            getUserDefinedMessageHandlerFactory());
+    if (this.taskDriverHelixManager.isPresent()) {
+      this.taskDriverHelixManager.get().connect();
+      //Ensure the instance is enabled.
+      this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true);
+    }
+  }
+
+  /**
+   * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting
+   * to re-join the cluster:
+   * <li>
+   *   <ul>Disconnect from Helix cluster, which would close any open clients</ul>
+   *   <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul>
+   *   <ul>Re-construct helix manager instances, used to re-join the cluster</ul>
+   * </li>
+   */
+  private void onClusterJoinFailure() {
+    logger.warn("Disconnecting Helix manager..");
+    disconnectHelixManager();
+
+    HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
+    //Drop the helix Instance
+    logger.warn("Dropping instance: {} from cluster: {}", helixInstanceName, clusterName);
+    HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName);
+
+    if (this.taskDriverHelixManager.isPresent()) {
+      String taskDriverCluster = clusterConfig.getString(GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY);
+      logger.warn("Dropping instance: {} from task driver cluster: {}", helixInstanceName, taskDriverCluster);
+      HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName);
+    }
+    admin.close();
+
+    logger.warn("Reinitializing Helix manager..");
+    initHelixManager();
+  }
+
+  @VisibleForTesting
+  void connectHelixManagerWithRetry() {
+    Callable<Void> connectHelixManagerCallable = () -> {
+      try {
+        logger.info("Instance: {} attempting to join cluster: {}", helixInstanceName, clusterName);
+        connectHelixManager();
+      } catch (HelixException e) {
+        logger.error("Exception encountered when joining cluster", e);
+        onClusterJoinFailure();
+        throw e;
       }
-    } catch (Exception e) {
-      logger.error("HelixManager failed to connect", e);
-      throw Throwables.propagate(e);
+      return null;
+    };
+
+    Retryer<Void> retryer = RetryerBuilder.<Void>newBuilder()
+        .retryIfException()
+        .withStopStrategy(StopStrategies.stopAfterAttempt(5)).build();
+
+    try {
+      retryer.call(connectHelixManagerCallable);
+    } catch (ExecutionException | RetryException e) {
+      Throwables.propagate(e);
     }
   }
 
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index f6f4730..f70a741 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.cluster;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -27,9 +28,14 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskConfig;
@@ -331,4 +337,30 @@ public class HelixUtils {
       System.setProperty(entry.getKey().toString(), entry.getValue().toString());
     }
   }
+
+  /**
+   * A utility method that returns all current live instances in a given Helix cluster. This method assumes that
+   * the passed {@link HelixManager} instance is already connected.
+   * @param helixManager
+   * @return all live instances in the Helix cluster.
+   */
+  public static List<String> getLiveInstances(HelixManager helixManager) {
+    HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+    PropertyKey liveInstancesKey = accessor.keyBuilder().liveInstances();
+    return accessor.getChildNames(liveInstancesKey);
+  }
+
+  public static boolean isInstanceLive(HelixManager helixManager, String instanceName) {
+    HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+    PropertyKey liveInstanceKey = accessor.keyBuilder().liveInstance(instanceName);
+    return accessor.getProperty(liveInstanceKey) != null;
+  }
+
+  public static void dropInstanceIfExists(HelixAdmin admin, String clusterName, String helixInstanceName) {
+    try {
+      admin.dropInstance(clusterName, new InstanceConfig(helixInstanceName));
+    } catch (HelixException e) {
+      log.error("Could not drop instance: {} due to: {}", helixInstanceName, e);
+    }
+  }
 }
\ No newline at end of file
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index b115607..0e48661 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -23,6 +23,9 @@ import java.net.URL;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.manager.zk.ZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -60,6 +63,9 @@ public class GobblinTaskRunnerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private GobblinClusterManager gobblinClusterManager;
+  private GobblinTaskRunner corruptGobblinTaskRunner;
+  private String clusterName;
+  private String corruptHelixInstance;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -80,8 +86,8 @@ public class GobblinTaskRunnerTest {
         .resolve();
 
     String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    HelixUtils.createGobblinHelixCluster(zkConnectionString,
-        config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+    this.clusterName = config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    HelixUtils.createGobblinHelixCluster(zkConnectionString, this.clusterName);
 
     // Participant
     this.gobblinTaskRunner =
@@ -89,12 +95,17 @@ public class GobblinTaskRunnerTest {
             TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent());
     this.gobblinTaskRunner.connectHelixManager();
 
+    // Participant with a partial Instance set up on Helix/ZK
+    this.corruptHelixInstance = HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0);
+    this.corruptGobblinTaskRunner =
+        new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, corruptHelixInstance,
+            TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent());
+
     // Controller
     this.gobblinClusterManager =
         new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_APPLICATION_ID, config,
             Optional.<Path>absent());
     this.gobblinClusterManager.connectHelixManager();
-
   }
 
   @Test
@@ -117,6 +128,36 @@ public class GobblinTaskRunnerTest {
     Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
   }
 
+  @Test
+  public void testConnectHelixManagerWithRetry() {
+    //Connect and disconnect the corrupt task runner to create a Helix Instance set up.
+    try {
+      this.corruptGobblinTaskRunner.connectHelixManager();
+      this.corruptGobblinTaskRunner.disconnectHelixManager();
+    } catch (Exception e) {
+      Assert.fail("Failed to connect to ZK");
+    }
+
+    //Delete ERRORS/HISTORY/STATUSUPDATES znodes under INSTANCES to simulate partial instance set up.
+    ZkClient zkClient = new ZkClient(testingZKServer.getConnectString());
+    zkClient.delete(PropertyPathBuilder.instanceError(clusterName, corruptHelixInstance));
+    zkClient.delete(PropertyPathBuilder.instanceHistory(clusterName, corruptHelixInstance));
+    zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(clusterName, corruptHelixInstance));
+
+    //Ensure that the connecting to Helix without retry will throw a HelixException
+    try {
+      corruptGobblinTaskRunner.connectHelixManager();
+      Assert.fail("Unexpected success in connecting to HelixManager");
+    } catch (Exception e) {
+      //Assert that a HelixException is thrown.
+      Assert.assertTrue(e.getClass().equals(HelixException.class));
+    }
+
+    //Ensure that connect with retry succeeds
+    corruptGobblinTaskRunner.connectHelixManagerWithRetry();
+    Assert.assertTrue(true);
+  }
+
   @AfterClass
   public void tearDown() throws IOException {
     try {
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 14488d5..b1095bc 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -119,7 +119,8 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
   protected YarnService buildYarnService(Config config, String applicationName, String applicationId,
       YarnConfiguration yarnConfiguration, FileSystem fs)
       throws Exception {
-    return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
+    return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus,
+        this.getMultiManager().getJobClusterHelixManager());
   }
 
   /**
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index a6d7a88..82842f2 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.helix.Criteria;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -319,7 +320,12 @@ public class GobblinYarnAppLauncher {
       this.securityManager.get().loginAndScheduleTokenRenewal();
     }
 
-    this.applicationId = getApplicationId();
+    Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId();
+    if (!reconnectableApplicationId.isPresent()) {
+      disableLiveHelixInstances();
+      LOGGER.info("No reconnectable application found so submitting a new application");
+      this.applicationId = Optional.of(setupAndSubmitApplication());
+    }
 
     this.applicationStatusMonitor.scheduleAtFixedRate(new Runnable() {
       @Override
@@ -396,6 +402,9 @@ public class GobblinYarnAppLauncher {
 
       stopYarnClient();
 
+      LOGGER.info("Disabling all live Helix instances..");
+      disableLiveHelixInstances();
+
       disconnectHelixManager();
     } finally {
       try {
@@ -479,6 +488,26 @@ public class GobblinYarnAppLauncher {
     }
   }
 
+  /**
+   * A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application
+   * leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use
+   * the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the
+   * new cluster.
+   *
+   * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application
+   * completion, this method should be removed.
+   */
+  void disableLiveHelixInstances() {
+    String clusterName = this.helixManager.getClusterName();
+    HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool();
+    List<String> liveInstances = HelixUtils.getLiveInstances(this.helixManager);
+    LOGGER.warn("Found {} live instances in the cluster.", liveInstances.size());
+    for (String instanceName: liveInstances) {
+      LOGGER.warn("Disabling instance: {}", instanceName);
+      helixAdmin.enableInstance(clusterName, instanceName, false);
+    }
+  }
+
   @VisibleForTesting
   void disconnectHelixManager() {
     if (this.helixManager.isConnected()) {
@@ -496,17 +525,6 @@ public class GobblinYarnAppLauncher {
     this.yarnClient.stop();
   }
 
-  private Optional<ApplicationId> getApplicationId() throws YarnException, IOException {
-    Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId();
-    if (reconnectableApplicationId.isPresent()) {
-      LOGGER.info("Found reconnectable application with application ID: " + reconnectableApplicationId.get());
-      return reconnectableApplicationId;
-    }
-
-    LOGGER.info("No reconnectable application found so submitting a new application");
-    return Optional.of(setupAndSubmitApplication());
-  }
-
   @VisibleForTesting
   Optional<ApplicationId> getReconnectableApplicationId() throws YarnException, IOException {
     List<ApplicationReport> applicationReports =
@@ -518,6 +536,7 @@ public class GobblinYarnAppLauncher {
     // Try to find an application with a matching application name
     for (ApplicationReport applicationReport : applicationReports) {
       if (this.applicationName.equals(applicationReport.getName())) {
+        LOGGER.info("Found reconnectable application with application ID: " + applicationReport.getApplicationId());
         return Optional.of(applicationReport.getApplicationId());
       }
     }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 4586571..5f6da10 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +83,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.io.Closer;
@@ -123,6 +125,8 @@ public class YarnService extends AbstractIdleService {
 
   private static final Splitter SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
 
+  private static final String UNKNOWN_HELIX_INSTANCE = "UNKNOWN";
+
   private final String applicationName;
   private final String applicationId;
   private final String appViewAcl;
@@ -156,6 +160,7 @@ public class YarnService extends AbstractIdleService {
 
   private final Optional<String> containerJvmArgs;
   private final String containerTimezone;
+  private final HelixManager helixManager;
 
   private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
 
@@ -199,9 +204,10 @@ public class YarnService extends AbstractIdleService {
   @VisibleForTesting
   @Getter(AccessLevel.PROTECTED)
   private int numRequestedContainers = 0;
+  private final Set<String> blacklistedInstances = Sets.newHashSet();
 
   public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
-      FileSystem fs, EventBus eventBus) throws Exception {
+      FileSystem fs, EventBus eventBus, HelixManager helixManager) throws Exception {
     this.applicationName = applicationName;
     this.applicationId = applicationId;
 
@@ -209,6 +215,8 @@ public class YarnService extends AbstractIdleService {
 
     this.eventBus = eventBus;
 
+    this.helixManager = helixManager;
+
     this.gobblinMetrics = config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
         Optional.of(buildGobblinMetrics()) : Optional.<GobblinMetrics>absent();
 
@@ -625,7 +633,10 @@ public class YarnService extends AbstractIdleService {
    */
   protected void handleContainerCompletion(ContainerStatus containerStatus) {
     Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId());
-    String completedInstanceName = completedContainerEntry == null? "unknown" : completedContainerEntry.getValue();
+    //Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might
+    //encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the
+    //containerId missing from the containersMap.
+    String completedInstanceName = completedContainerEntry == null?  UNKNOWN_HELIX_INSTANCE : completedContainerEntry.getValue();
 
     LOGGER.info(String.format("Container %s running Helix instance %s has completed with exit status %d",
         containerStatus.getContainerId(), completedInstanceName, containerStatus.getExitStatus()));
@@ -635,10 +646,26 @@ public class YarnService extends AbstractIdleService {
           containerStatus.getContainerId(), containerStatus.getDiagnostics()));
     }
 
-    if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
-      LOGGER.info("Container release requested, so not spawning a replacement for containerId {}",
-          containerStatus.getContainerId());
-      return;
+    if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
+      if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
+        LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
+        return;
+      } else {
+        LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
+       // Container release was not requested. Likely, the container was running on a node on which the NM died.
+       // In this case, RM assumes that the containers are "lost", even though the container process may still be
+        // running on the node. We need to ensure that the Helix instances running on the orphaned containers
+        // are fenced off from the Helix cluster to avoid double publishing and state being committed by the
+        // instances.
+        if (!UNKNOWN_HELIX_INSTANCE.equals(completedInstanceName)) {
+          String clusterName = this.helixManager.getClusterName();
+          //Disable the orphaned instance.
+          if (HelixUtils.isInstanceLive(helixManager, completedInstanceName)) {
+            LOGGER.info("Disabling the Helix instance {}", completedInstanceName);
+            this.helixManager.getClusterManagmentTool().enableInstance(clusterName, completedInstanceName, false);
+          }
+        }
+      }
     }
 
     if (this.shutdownInProgress) {
@@ -723,10 +750,10 @@ public class YarnService extends AbstractIdleService {
         LOGGER.info(String.format("Container %s has been allocated", container.getId()));
 
         String instanceName = unusedHelixInstanceNames.poll();
-        if (Strings.isNullOrEmpty(instanceName)) {
+        while (Strings.isNullOrEmpty(instanceName) || HelixUtils.isInstanceLive(helixManager, instanceName)) {
           // No unused instance name, so generating a new one.
-          instanceName = HelixUtils.getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX,
-              helixInstanceIdGenerator.incrementAndGet());
+          instanceName = HelixUtils
+              .getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX, helixInstanceIdGenerator.incrementAndGet());
         }
 
         final String finalInstanceName = instanceName;
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index adc8bd0..c1c7142 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -480,7 +480,7 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
   private static class TestYarnService extends YarnService {
     public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
         FileSystem fs, EventBus eventBus) throws Exception {
-      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);
+      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, null);
     }
 
     @Override
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 2eb032f..160ad39 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -17,13 +17,6 @@
 
 package org.apache.gobblin.yarn;
 
-import com.google.common.base.Predicate;
-import com.google.common.eventbus.EventBus;
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URL;
@@ -35,7 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
-import org.apache.gobblin.testing.AssertWithBackoff;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -64,6 +57,15 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicate;
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
+
 
 /**
  * Tests for {@link YarnService}.
@@ -282,7 +284,7 @@ public class YarnServiceTest {
    static class TestYarnService extends YarnService {
     public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
         FileSystem fs, EventBus eventBus) throws Exception {
-      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);
+      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, null);
     }
 
     protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName)