You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/28 04:26:08 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1099] Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new cf6af89 [GOBBLIN-1099] Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
cf6af89 is described below
commit cf6af89995369e80afbd661ef0d7d2852d1ca4c9
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Mar 27 21:26:00 2020 -0700
[GOBBLIN-1099] Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
Closes #2940 from sv2000/yarnOrphans
---
.../gobblin/cluster/GobblinClusterManager.java | 8 +-
.../apache/gobblin/cluster/GobblinTaskRunner.java | 93 ++++++++++++++++++----
.../org/apache/gobblin/cluster/HelixUtils.java | 32 ++++++++
.../gobblin/cluster/GobblinTaskRunnerTest.java | 47 ++++++++++-
.../gobblin/yarn/GobblinApplicationMaster.java | 3 +-
.../gobblin/yarn/GobblinYarnAppLauncher.java | 43 +++++++---
.../java/org/apache/gobblin/yarn/YarnService.java | 45 ++++++++---
.../gobblin/yarn/GobblinYarnAppLauncherTest.java | 2 +-
.../org/apache/gobblin/yarn/YarnServiceTest.java | 20 ++---
9 files changed, 237 insertions(+), 56 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 8eabad1..2c3f49f 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -56,6 +56,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@@ -108,6 +109,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
protected ServiceBasedAppLauncher applicationLauncher;
// An EventBus used for communications between services running in the ApplicationMaster
+ @Getter(AccessLevel.PUBLIC)
protected final EventBus eventBus = new EventBus(GobblinClusterManager.class.getSimpleName());
protected final Path appWorkDir;
@@ -400,12 +402,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
stop();
}
- @VisibleForTesting
- EventBus getEventBus() {
- return this.eventBus;
- }
-
-
/**
* Creates and returns a {@link MessageHandlerFactory} for handling of Helix
* {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s.
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index b9f1c96..2d35dea 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -28,6 +28,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -42,12 +44,15 @@ import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
@@ -57,6 +62,10 @@ import org.apache.helix.task.TaskStateModelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -121,6 +130,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
private final String clusterName;
+ @Getter
private HelixManager jobHelixManager;
private Optional<HelixManager> taskDriverHelixManager = Optional.absent();
@@ -292,7 +302,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
// Add a shutdown hook so the task scheduler gets properly shutdown
addShutdownHook();
- connectHelixManager();
+ connectHelixManagerWithRetry();
addInstanceTags();
@@ -366,21 +376,74 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
@VisibleForTesting
- void connectHelixManager() {
- try {
- this.jobHelixManager.connect();
- this.jobHelixManager.getMessagingService()
- .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
- new ParticipantShutdownMessageHandlerFactory());
- this.jobHelixManager.getMessagingService()
- .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
- getUserDefinedMessageHandlerFactory());
- if (this.taskDriverHelixManager.isPresent()) {
- this.taskDriverHelixManager.get().connect();
+ void connectHelixManager() throws Exception {
+ this.jobHelixManager.connect();
+ //Ensure the instance is enabled.
+ this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true);
+ this.jobHelixManager.getMessagingService()
+ .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+ new ParticipantShutdownMessageHandlerFactory());
+ this.jobHelixManager.getMessagingService()
+ .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+ getUserDefinedMessageHandlerFactory());
+ if (this.taskDriverHelixManager.isPresent()) {
+ this.taskDriverHelixManager.get().connect();
+ //Ensure the instance is enabled.
+ this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true);
+ }
+ }
+
+ /**
+ * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting
+ * to re-join the cluster:
+ * <li>
+ * <ul>Disconnect from Helix cluster, which would close any open clients</ul>
+ * <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul>
+ * <ul>Re-construct helix manager instances, used to re-join the cluster</ul>
+ * </li>
+ */
+ private void onClusterJoinFailure() {
+ logger.warn("Disconnecting Helix manager..");
+ disconnectHelixManager();
+
+ HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
+ //Drop the helix Instance
+ logger.warn("Dropping instance: {} from cluster: {}", helixInstanceName, clusterName);
+ HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName);
+
+ if (this.taskDriverHelixManager.isPresent()) {
+ String taskDriverCluster = clusterConfig.getString(GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY);
+ logger.warn("Dropping instance: {} from task driver cluster: {}", helixInstanceName, taskDriverCluster);
+ HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName);
+ }
+ admin.close();
+
+ logger.warn("Reinitializing Helix manager..");
+ initHelixManager();
+ }
+
+ @VisibleForTesting
+ void connectHelixManagerWithRetry() {
+ Callable<Void> connectHelixManagerCallable = () -> {
+ try {
+ logger.info("Instance: {} attempting to join cluster: {}", helixInstanceName, clusterName);
+ connectHelixManager();
+ } catch (HelixException e) {
+ logger.error("Exception encountered when joining cluster", e);
+ onClusterJoinFailure();
+ throw e;
}
- } catch (Exception e) {
- logger.error("HelixManager failed to connect", e);
- throw Throwables.propagate(e);
+ return null;
+ };
+
+ Retryer<Void> retryer = RetryerBuilder.<Void>newBuilder()
+ .retryIfException()
+ .withStopStrategy(StopStrategies.stopAfterAttempt(5)).build();
+
+ try {
+ retryer.call(connectHelixManagerCallable);
+ } catch (ExecutionException | RetryException e) {
+ Throwables.propagate(e);
}
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index f6f4730..f70a741 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.cluster;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -27,9 +28,14 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskConfig;
@@ -331,4 +337,30 @@ public class HelixUtils {
System.setProperty(entry.getKey().toString(), entry.getValue().toString());
}
}
+
+ /**
+ * A utility method that returns all current live instances in a given Helix cluster. This method assumes that
+ * the passed {@link HelixManager} instance is already connected.
+ * @param helixManager
+ * @return all live instances in the Helix cluster.
+ */
+ public static List<String> getLiveInstances(HelixManager helixManager) {
+ HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+ PropertyKey liveInstancesKey = accessor.keyBuilder().liveInstances();
+ return accessor.getChildNames(liveInstancesKey);
+ }
+
+ public static boolean isInstanceLive(HelixManager helixManager, String instanceName) {
+ HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+ PropertyKey liveInstanceKey = accessor.keyBuilder().liveInstance(instanceName);
+ return accessor.getProperty(liveInstanceKey) != null;
+ }
+
+ public static void dropInstanceIfExists(HelixAdmin admin, String clusterName, String helixInstanceName) {
+ try {
+ admin.dropInstance(clusterName, new InstanceConfig(helixInstanceName));
+ } catch (HelixException e) {
+ log.error("Could not drop instance: {} due to: {}", helixInstanceName, e);
+ }
+ }
}
\ No newline at end of file
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index b115607..0e48661 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -23,6 +23,9 @@ import java.net.URL;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.manager.zk.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -60,6 +63,9 @@ public class GobblinTaskRunnerTest {
private GobblinTaskRunner gobblinTaskRunner;
private GobblinClusterManager gobblinClusterManager;
+ private GobblinTaskRunner corruptGobblinTaskRunner;
+ private String clusterName;
+ private String corruptHelixInstance;
@BeforeClass
public void setUp() throws Exception {
@@ -80,8 +86,8 @@ public class GobblinTaskRunnerTest {
.resolve();
String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
- HelixUtils.createGobblinHelixCluster(zkConnectionString,
- config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+ this.clusterName = config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ HelixUtils.createGobblinHelixCluster(zkConnectionString, this.clusterName);
// Participant
this.gobblinTaskRunner =
@@ -89,12 +95,17 @@ public class GobblinTaskRunnerTest {
TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent());
this.gobblinTaskRunner.connectHelixManager();
+ // Participant with a partial Instance set up on Helix/ZK
+ this.corruptHelixInstance = HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0);
+ this.corruptGobblinTaskRunner =
+ new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, corruptHelixInstance,
+ TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent());
+
// Controller
this.gobblinClusterManager =
new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_APPLICATION_ID, config,
Optional.<Path>absent());
this.gobblinClusterManager.connectHelixManager();
-
}
@Test
@@ -117,6 +128,36 @@ public class GobblinTaskRunnerTest {
Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
}
+ @Test
+ public void testConnectHelixManagerWithRetry() {
+ //Connect and disconnect the corrupt task runner to create a Helix Instance set up.
+ try {
+ this.corruptGobblinTaskRunner.connectHelixManager();
+ this.corruptGobblinTaskRunner.disconnectHelixManager();
+ } catch (Exception e) {
+ Assert.fail("Failed to connect to ZK");
+ }
+
+ //Delete ERRORS/HISTORY/STATUSUPDATES znodes under INSTANCES to simulate partial instance set up.
+ ZkClient zkClient = new ZkClient(testingZKServer.getConnectString());
+ zkClient.delete(PropertyPathBuilder.instanceError(clusterName, corruptHelixInstance));
+ zkClient.delete(PropertyPathBuilder.instanceHistory(clusterName, corruptHelixInstance));
+ zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(clusterName, corruptHelixInstance));
+
+ //Ensure that the connecting to Helix without retry will throw a HelixException
+ try {
+ corruptGobblinTaskRunner.connectHelixManager();
+ Assert.fail("Unexpected success in connecting to HelixManager");
+ } catch (Exception e) {
+ //Assert that a HelixException is thrown.
+ Assert.assertTrue(e.getClass().equals(HelixException.class));
+ }
+
+ //Ensure that connect with retry succeeds
+ corruptGobblinTaskRunner.connectHelixManagerWithRetry();
+ Assert.assertTrue(true);
+ }
+
@AfterClass
public void tearDown() throws IOException {
try {
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 14488d5..b1095bc 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -119,7 +119,8 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
protected YarnService buildYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs)
throws Exception {
- return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
+ return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus,
+ this.getMultiManager().getJobClusterHelixManager());
}
/**
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index a6d7a88..82842f2 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.helix.Criteria;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -319,7 +320,12 @@ public class GobblinYarnAppLauncher {
this.securityManager.get().loginAndScheduleTokenRenewal();
}
- this.applicationId = getApplicationId();
+ Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId();
+ if (!reconnectableApplicationId.isPresent()) {
+ disableLiveHelixInstances();
+ LOGGER.info("No reconnectable application found so submitting a new application");
+ this.applicationId = Optional.of(setupAndSubmitApplication());
+ }
this.applicationStatusMonitor.scheduleAtFixedRate(new Runnable() {
@Override
@@ -396,6 +402,9 @@ public class GobblinYarnAppLauncher {
stopYarnClient();
+ LOGGER.info("Disabling all live Helix instances..");
+ disableLiveHelixInstances();
+
disconnectHelixManager();
} finally {
try {
@@ -479,6 +488,26 @@ public class GobblinYarnAppLauncher {
}
}
+ /**
+ * A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application
+ * leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use
+ * the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the
+ * new cluster.
+ *
+ * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application
+ * completion, this method should be removed.
+ */
+ void disableLiveHelixInstances() {
+ String clusterName = this.helixManager.getClusterName();
+ HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool();
+ List<String> liveInstances = HelixUtils.getLiveInstances(this.helixManager);
+ LOGGER.warn("Found {} live instances in the cluster.", liveInstances.size());
+ for (String instanceName: liveInstances) {
+ LOGGER.warn("Disabling instance: {}", instanceName);
+ helixAdmin.enableInstance(clusterName, instanceName, false);
+ }
+ }
+
@VisibleForTesting
void disconnectHelixManager() {
if (this.helixManager.isConnected()) {
@@ -496,17 +525,6 @@ public class GobblinYarnAppLauncher {
this.yarnClient.stop();
}
- private Optional<ApplicationId> getApplicationId() throws YarnException, IOException {
- Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId();
- if (reconnectableApplicationId.isPresent()) {
- LOGGER.info("Found reconnectable application with application ID: " + reconnectableApplicationId.get());
- return reconnectableApplicationId;
- }
-
- LOGGER.info("No reconnectable application found so submitting a new application");
- return Optional.of(setupAndSubmitApplication());
- }
-
@VisibleForTesting
Optional<ApplicationId> getReconnectableApplicationId() throws YarnException, IOException {
List<ApplicationReport> applicationReports =
@@ -518,6 +536,7 @@ public class GobblinYarnAppLauncher {
// Try to find an application with a matching application name
for (ApplicationReport applicationReport : applicationReports) {
if (this.applicationName.equals(applicationReport.getName())) {
+ LOGGER.info("Found reconnectable application with application ID: " + applicationReport.getApplicationId());
return Optional.of(applicationReport.getApplicationId());
}
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 4586571..5f6da10 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +83,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.io.Closer;
@@ -123,6 +125,8 @@ public class YarnService extends AbstractIdleService {
private static final Splitter SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
+ private static final String UNKNOWN_HELIX_INSTANCE = "UNKNOWN";
+
private final String applicationName;
private final String applicationId;
private final String appViewAcl;
@@ -156,6 +160,7 @@ public class YarnService extends AbstractIdleService {
private final Optional<String> containerJvmArgs;
private final String containerTimezone;
+ private final HelixManager helixManager;
private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
@@ -199,9 +204,10 @@ public class YarnService extends AbstractIdleService {
@VisibleForTesting
@Getter(AccessLevel.PROTECTED)
private int numRequestedContainers = 0;
+ private final Set<String> blacklistedInstances = Sets.newHashSet();
public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
- FileSystem fs, EventBus eventBus) throws Exception {
+ FileSystem fs, EventBus eventBus, HelixManager helixManager) throws Exception {
this.applicationName = applicationName;
this.applicationId = applicationId;
@@ -209,6 +215,8 @@ public class YarnService extends AbstractIdleService {
this.eventBus = eventBus;
+ this.helixManager = helixManager;
+
this.gobblinMetrics = config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
Optional.of(buildGobblinMetrics()) : Optional.<GobblinMetrics>absent();
@@ -625,7 +633,10 @@ public class YarnService extends AbstractIdleService {
*/
protected void handleContainerCompletion(ContainerStatus containerStatus) {
Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId());
- String completedInstanceName = completedContainerEntry == null? "unknown" : completedContainerEntry.getValue();
+ //Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might
+ //encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the
+ //containerId missing from the containersMap.
+ String completedInstanceName = completedContainerEntry == null? UNKNOWN_HELIX_INSTANCE : completedContainerEntry.getValue();
LOGGER.info(String.format("Container %s running Helix instance %s has completed with exit status %d",
containerStatus.getContainerId(), completedInstanceName, containerStatus.getExitStatus()));
@@ -635,10 +646,26 @@ public class YarnService extends AbstractIdleService {
containerStatus.getContainerId(), containerStatus.getDiagnostics()));
}
- if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
- LOGGER.info("Container release requested, so not spawning a replacement for containerId {}",
- containerStatus.getContainerId());
- return;
+ if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
+ if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
+ LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
+ return;
+ } else {
+ LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
+ // Container release was not requested. Likely, the container was running on a node on which the NM died.
+ // In this case, RM assumes that the containers are "lost", even though the container process may still be
+ // running on the node. We need to ensure that the Helix instances running on the orphaned containers
+ // are fenced off from the Helix cluster to avoid double publishing and state being committed by the
+ // instances.
+ if (!UNKNOWN_HELIX_INSTANCE.equals(completedInstanceName)) {
+ String clusterName = this.helixManager.getClusterName();
+ //Disable the orphaned instance.
+ if (HelixUtils.isInstanceLive(helixManager, completedInstanceName)) {
+ LOGGER.info("Disabling the Helix instance {}", completedInstanceName);
+ this.helixManager.getClusterManagmentTool().enableInstance(clusterName, completedInstanceName, false);
+ }
+ }
+ }
}
if (this.shutdownInProgress) {
@@ -723,10 +750,10 @@ public class YarnService extends AbstractIdleService {
LOGGER.info(String.format("Container %s has been allocated", container.getId()));
String instanceName = unusedHelixInstanceNames.poll();
- if (Strings.isNullOrEmpty(instanceName)) {
+ while (Strings.isNullOrEmpty(instanceName) || HelixUtils.isInstanceLive(helixManager, instanceName)) {
// No unused instance name, so generating a new one.
- instanceName = HelixUtils.getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX,
- helixInstanceIdGenerator.incrementAndGet());
+ instanceName = HelixUtils
+ .getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX, helixInstanceIdGenerator.incrementAndGet());
}
final String finalInstanceName = instanceName;
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index adc8bd0..c1c7142 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -480,7 +480,7 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
private static class TestYarnService extends YarnService {
public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
- super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);
+ super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, null);
}
@Override
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 2eb032f..160ad39 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -17,13 +17,6 @@
package org.apache.gobblin.yarn;
-import com.google.common.base.Predicate;
-import com.google.common.eventbus.EventBus;
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URL;
@@ -35,7 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
-import org.apache.gobblin.testing.AssertWithBackoff;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.UserGroupInformation;
@@ -64,6 +57,15 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Predicate;
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
+
/**
* Tests for {@link YarnService}.
@@ -282,7 +284,7 @@ public class YarnServiceTest {
static class TestYarnService extends YarnService {
public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
- super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);
+ super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, null);
}
protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName)