You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/03/08 17:53:06 UTC
[gobblin] branch master updated: [GOBBLIN-1796] Log startup command when container fails to startup (#3655)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ab62e7283 [GOBBLIN-1796] Log startup command when container fails to startup (#3655)
ab62e7283 is described below
commit ab62e72839bb4824b98a442de4dd1647284e880e
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Wed Mar 8 09:52:58 2023 -0800
[GOBBLIN-1796] Log startup command when container fails to startup (#3655)
---
.../java/org/apache/gobblin/yarn/YarnService.java | 111 ++++++++++++++-------
.../org/apache/gobblin/yarn/YarnServiceIT.java | 23 -----
.../org/apache/gobblin/yarn/YarnServiceTest.java | 55 ++++++++--
3 files changed, 124 insertions(+), 65 deletions(-)
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 86a377f44..7955c791a 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -37,7 +38,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
-import lombok.AllArgsConstructor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -82,7 +82,6 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
@@ -605,7 +604,7 @@ public class YarnService extends AbstractIdleService {
ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
containerLaunchContext.setLocalResources(resourceMap);
containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
- containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(containerInfo)));
+ containerLaunchContext.setCommands(Arrays.asList(containerInfo.getStartupCommand()));
Map<ApplicationAccessType, String> acls = new HashMap<>(1);
acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -658,11 +657,11 @@ public class YarnService extends AbstractIdleService {
}
@VisibleForTesting
- protected String buildContainerCommand(ContainerInfo containerInfo) {
+ protected String buildContainerCommand(Container container, String helixParticipantId, String helixInstanceTag) {
String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
StringBuilder containerCommand = new StringBuilder()
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
- .append(" -Xmx").append((int) (containerInfo.getContainer().getResource().getMemory() * this.jvmMemoryXmxRatio) -
+ .append(" -Xmx").append((int) (container.getResource().getMemory() * this.jvmMemoryXmxRatio) -
this.jvmMemoryOverheadMbs).append("M")
.append(" -D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
.append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
@@ -674,11 +673,11 @@ public class YarnService extends AbstractIdleService {
.append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
.append(" ").append(this.applicationId)
.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
- .append(" ").append(containerInfo.getHelixParticipantId());
+ .append(" ").append(helixParticipantId);
- if (!Strings.isNullOrEmpty(containerInfo.getHelixTag())) {
+ if (!Strings.isNullOrEmpty(helixInstanceTag)) {
containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
- .append(" ").append(containerInfo.getHelixTag());
+ .append(" ").append(helixInstanceTag);
}
return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
containerProcessName).append(".").append(ApplicationConstants.STDOUT)
@@ -734,30 +733,17 @@ public class YarnService extends AbstractIdleService {
containerStatus.getContainerId(), containerStatus.getDiagnostics()));
}
- if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
- if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
- LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
- if (completedContainerInfo != null) {
- LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
- this.unusedHelixInstanceNames.add(completedInstanceName);
+ switch(containerStatus.getExitStatus()) {
+ case(ContainerExitStatus.ABORTED):
+ if (handleAbortedContainer(containerStatus, completedContainerInfo, completedInstanceName)) {
+ return;
}
- return;
- } else {
- LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
- // Container release was not requested. Likely, the container was running on a node on which the NM died.
- // In this case, RM assumes that the containers are "lost", even though the container process may still be
- // running on the node. We need to ensure that the Helix instances running on the orphaned containers
- // are fenced off from the Helix cluster to avoid double publishing and state being committed by the
- // instances.
- if (!UNKNOWN_HELIX_INSTANCE.equals(completedInstanceName)) {
- String clusterName = this.helixManager.getClusterName();
- //Disable the orphaned instance.
- if (HelixUtils.isInstanceLive(helixManager, completedInstanceName)) {
- LOGGER.info("Disabling the Helix instance {}", completedInstanceName);
- this.helixManager.getClusterManagmentTool().enableInstance(clusterName, completedInstanceName, false);
- }
- }
- }
+ break;
+ case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed
+ LOGGER.info("Exit status 1. CompletedContainerInfo={}", completedContainerInfo);
+ break;
+ default:
+ break;
}
if (this.shutdownInProgress) {
@@ -804,6 +790,55 @@ public class YarnService extends AbstractIdleService {
Optional.of(completedContainerInfo.getContainer()) : Optional.absent(), newContainerResource));
}
+ /**
+ * Handles containers aborted. This method handles 2 cases:
+ * <ol>
+ * <li>
+ * Case 1: Gobblin AM intentionally requested container to be released (often because the number of helix tasks
+ * has decreased due to decreased traffic)
+ * </li>
+ * <li>
+ * Case 2: Unexpected hardware fault and the node is lost. Need to do specific Helix logic to ensure 2 helix tasks
+ * are not being run by the multiple containers
+ * </li>
+ * </ol>
+ * @param containerStatus
+ * @param completedContainerInfo
+ * @param completedInstanceName
+ * @return if release request was intentionally released (Case 1)
+ */
+ private boolean handleAbortedContainer(ContainerStatus containerStatus, ContainerInfo completedContainerInfo,
+ String completedInstanceName) {
+
+ // Case 1: Container intentionally released
+ if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
+ LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
+ if (completedContainerInfo != null) {
+ LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
+ this.unusedHelixInstanceNames.add(completedInstanceName);
+ }
+
+ return true;
+ }
+
+ // Case 2: Container release was not requested. Likely, the container was running on a node on which the NM died.
+ // In this case, RM assumes that the containers are "lost", even though the container process may still be
+ // running on the node. We need to ensure that the Helix instances running on the orphaned containers
+ // are fenced off from the Helix cluster to avoid double publishing and state being committed by the
+ // instances.
+ LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
+ if (!UNKNOWN_HELIX_INSTANCE.equals(completedInstanceName)) {
+ String clusterName = this.helixManager.getClusterName();
+ //Disable the orphaned instance.
+ if (HelixUtils.isInstanceLive(helixManager, completedInstanceName)) {
+ LOGGER.info("Disabling the Helix instance {}", completedInstanceName);
+ this.helixManager.getClusterManagmentTool().enableInstance(clusterName, completedInstanceName, false);
+ }
+ }
+
+ return false;
+ }
+
private ImmutableMap.Builder<String, String> buildContainerStatusEventMetadata(ContainerStatus containerStatus) {
ImmutableMap.Builder<String, String> eventMetadataBuilder = new ImmutableMap.Builder<>();
eventMetadataBuilder.put(GobblinYarnMetricTagNames.CONTAINER_ID, containerStatus.getContainerId().toString());
@@ -1059,12 +1094,20 @@ public class YarnService extends AbstractIdleService {
}
}
- //A class encapsulates Container instances, Helix participant IDs of the containers and Helix Tag
- @AllArgsConstructor
+ // Class encapsulates Container instances, Helix participant IDs of the containers, Helix Tag, and
+ // initial startup command
@Getter
- static class ContainerInfo {
+ class ContainerInfo {
private final Container container;
private final String helixParticipantId;
private final String helixTag;
+ private final String startupCommand;
+
+ public ContainerInfo(Container container, String helixParticipantId, String helixTag) {
+ this.container = container;
+ this.helixParticipantId = helixParticipantId;
+ this.helixTag = helixTag;
+ this.startupCommand = YarnService.this.buildContainerCommand(container, helixParticipantId, helixTag);
+ }
}
}
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceIT.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceIT.java
index 2f4443053..751f4fd68 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceIT.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceIT.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -268,28 +267,6 @@ public class YarnServiceIT {
Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1) == null);
}
- @Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods = "testReleasedContainerCache")
- public void testBuildContainerCommand() throws Exception {
- Config modifiedConfig = this.config
- .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef("10"))
- .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef("0.8"));
- TestYarnService yarnService =
- new TestYarnService(modifiedConfig, "testApp2", "appId2",
- this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus);
-
- ContainerId containerId = ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0),
- 0), 0);
- Resource resource = Resource.newInstance(2048, 1);
- Container container = Container.newInstance(containerId, null, null, resource, null, null);
- YarnService.ContainerInfo
- containerInfo = new YarnService.ContainerInfo(container, "helixInstance1", "helixTag");
-
- String command = yarnService.buildContainerCommand(containerInfo);
-
- // 1628 is from 2048 * 0.8 - 10
- Assert.assertTrue(command.contains("-Xmx1628"));
- }
-
/**
* Test if requested resource exceed the resource limit, yarnService should fail.
*/
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 99cf8c096..7115b2f07 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -23,8 +23,13 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -43,9 +48,12 @@ import org.testng.annotations.Test;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME;
+import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME;
import static org.mockito.Mockito.*;
@@ -54,7 +62,6 @@ import static org.mockito.Mockito.*;
*/
public class YarnServiceTest {
final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
- private TestYarnService yarnService;
private Config config;
private YarnConfiguration clusterConf = new YarnConfiguration();
private final EventBus eventBus = new EventBus("YarnServiceTest");
@@ -88,10 +95,6 @@ public class YarnServiceTest {
.thenReturn(mockRegisterApplicationMasterResponse);
when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
.thenReturn(mockResource);
-
- // Create the test yarn service, but don't start yet
- this.yarnService = new TestYarnService(this.config, "testApp", "appId",
- this.clusterConf, mockFs, this.eventBus);
}
/**
@@ -100,14 +103,50 @@ public class YarnServiceTest {
*/
@Test(groups = {"gobblin.yarn"})
public void testYarnStartUpFirst() throws Exception{
+ // Create the test yarn service, but don't start yet
+ YarnService yarnService = new TestYarnService(this.config, "testApp", "appId",
+ this.clusterConf, mockFs, this.eventBus);
+
// Not allowed to request target number of containers since yarnService hasn't started up yet.
- Assert.assertFalse(this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
+ Assert.assertFalse(yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
// Start the yarn service
- this.yarnService.startUp();
+ yarnService.startUp();
// Allowed to request target number of containers after yarnService is started up.
- Assert.assertTrue(this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
+ Assert.assertTrue(yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
+ }
+
+ @Test(groups = {"gobblin.yarn"})
+ public void testYarnContainerStartupCommand() throws Exception{
+ final int resourceMemoryMB = 2048;
+ final int jvmMemoryOverheadMB = 10;
+ final double jvmXmxRatio = 0.8;
+ final int expectedJavaHeapSizeMB = (int)(resourceMemoryMB * jvmXmxRatio) - jvmMemoryOverheadMB;
+
+ final String helixInstance = "helixInstance1";
+ final String helixTag = "helixTag";
+
+ Config modifiedConfig = this.config
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef("10"))
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef("0.8"));
+ TestYarnService yarnService = new TestYarnService(modifiedConfig, "testApp2", "appId2",
+ this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus);
+
+ ContainerId containerId = ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0),
+ 0), 0);
+ Resource resource = Resource.newInstance(resourceMemoryMB, 1);
+ Container container = Container.newInstance(containerId, null, null, resource, null, null);
+ YarnService.ContainerInfo containerInfo =
+ yarnService.new ContainerInfo(container, helixInstance, helixTag);
+
+ String command = containerInfo.getStartupCommand();
+
+ LOG.info(command);
+ Assert.assertTrue(command.contains("-Xmx" + expectedJavaHeapSizeMB +"M"));
+ Assert.assertTrue(command.contains(String.format("--%s %s", HELIX_INSTANCE_NAME_OPTION_NAME, helixInstance)));
+ Assert.assertTrue(command.contains(String.format("--%s %s", HELIX_INSTANCE_TAGS_OPTION_NAME, helixTag)));
+ Assert.assertTrue(command.endsWith("1><LOG_DIR>/GobblinYarnTaskRunner.stdout 2><LOG_DIR>/GobblinYarnTaskRunner.stderr"));
}
static class TestYarnService extends YarnService {