You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/03/08 17:53:06 UTC

[gobblin] branch master updated: [GOBBLIN-1796] Log startup command when container fails to startup (#3655)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ab62e7283 [GOBBLIN-1796] Log startup command when container fails to startup (#3655)
ab62e7283 is described below

commit ab62e72839bb4824b98a442de4dd1647284e880e
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Wed Mar 8 09:52:58 2023 -0800

    [GOBBLIN-1796] Log startup command when container fails to startup (#3655)
---
 .../java/org/apache/gobblin/yarn/YarnService.java  | 111 ++++++++++++++-------
 .../org/apache/gobblin/yarn/YarnServiceIT.java     |  23 -----
 .../org/apache/gobblin/yarn/YarnServiceTest.java   |  55 ++++++++--
 3 files changed, 124 insertions(+), 65 deletions(-)

diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 86a377f44..7955c791a 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,7 +38,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
-import lombok.AllArgsConstructor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -82,7 +82,6 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
@@ -605,7 +604,7 @@ public class YarnService extends AbstractIdleService {
     ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
     containerLaunchContext.setLocalResources(resourceMap);
     containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
-    containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(containerInfo)));
+    containerLaunchContext.setCommands(Arrays.asList(containerInfo.getStartupCommand()));
 
     Map<ApplicationAccessType, String> acls = new HashMap<>(1);
     acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -658,11 +657,11 @@ public class YarnService extends AbstractIdleService {
   }
 
   @VisibleForTesting
-  protected String buildContainerCommand(ContainerInfo containerInfo) {
+  protected String buildContainerCommand(Container container, String helixParticipantId, String helixInstanceTag) {
     String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
     StringBuilder containerCommand = new StringBuilder()
         .append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
-        .append(" -Xmx").append((int) (containerInfo.getContainer().getResource().getMemory() * this.jvmMemoryXmxRatio) -
+        .append(" -Xmx").append((int) (container.getResource().getMemory() * this.jvmMemoryXmxRatio) -
             this.jvmMemoryOverheadMbs).append("M")
         .append(" -D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
         .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
@@ -674,11 +673,11 @@ public class YarnService extends AbstractIdleService {
         .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
         .append(" ").append(this.applicationId)
         .append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
-        .append(" ").append(containerInfo.getHelixParticipantId());
+        .append(" ").append(helixParticipantId);
 
-    if (!Strings.isNullOrEmpty(containerInfo.getHelixTag())) {
+    if (!Strings.isNullOrEmpty(helixInstanceTag)) {
       containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
-          .append(" ").append(containerInfo.getHelixTag());
+          .append(" ").append(helixInstanceTag);
     }
     return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
         containerProcessName).append(".").append(ApplicationConstants.STDOUT)
@@ -734,30 +733,17 @@ public class YarnService extends AbstractIdleService {
           containerStatus.getContainerId(), containerStatus.getDiagnostics()));
     }
 
-    if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
-      if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
-        LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
-        if (completedContainerInfo != null) {
-          LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
-          this.unusedHelixInstanceNames.add(completedInstanceName);
+    switch(containerStatus.getExitStatus()) {
+      case(ContainerExitStatus.ABORTED):
+        if (handleAbortedContainer(containerStatus, completedContainerInfo, completedInstanceName)) {
+          return;
         }
-        return;
-      } else {
-        LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
-        // Container release was not requested. Likely, the container was running on a node on which the NM died.
-        // In this case, RM assumes that the containers are "lost", even though the container process may still be
-        // running on the node. We need to ensure that the Helix instances running on the orphaned containers
-        // are fenced off from the Helix cluster to avoid double publishing and state being committed by the
-        // instances.
-        if (!UNKNOWN_HELIX_INSTANCE.equals(completedInstanceName)) {
-          String clusterName = this.helixManager.getClusterName();
-          //Disable the orphaned instance.
-          if (HelixUtils.isInstanceLive(helixManager, completedInstanceName)) {
-            LOGGER.info("Disabling the Helix instance {}", completedInstanceName);
-            this.helixManager.getClusterManagmentTool().enableInstance(clusterName, completedInstanceName, false);
-          }
-        }
-      }
+        break;
+      case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed
+        LOGGER.info("Exit status 1. CompletedContainerInfo={}", completedContainerInfo);
+        break;
+      default:
+        break;
     }
 
     if (this.shutdownInProgress) {
@@ -804,6 +790,55 @@ public class YarnService extends AbstractIdleService {
             Optional.of(completedContainerInfo.getContainer()) : Optional.absent(), newContainerResource));
   }
 
+  /**
+   * Handles containers aborted. This method handles 2 cases:
+   * <ol>
+   *   <li>
+   *     Case 1: Gobblin AM intentionally requested container to be released (often because the number of helix tasks
+   *     has decreased due to decreased traffic)
+   *   </li>
+   *   <li>
+   *     Case 2: Unexpected hardware fault and the node is lost. Need to do specific Helix logic to ensure 2 helix tasks
+   *     are not being run by the multiple containers
+   *   </li>
+   * </ol>
+   * @param containerStatus
+   * @param completedContainerInfo
+   * @param completedInstanceName
+   * @return if release request was intentionally released (Case 1)
+   */
+  private boolean handleAbortedContainer(ContainerStatus containerStatus, ContainerInfo completedContainerInfo,
+      String completedInstanceName) {
+
+    // Case 1: Container intentionally released
+    if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
+      LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
+      if (completedContainerInfo != null) {
+        LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
+        this.unusedHelixInstanceNames.add(completedInstanceName);
+      }
+
+      return true;
+    }
+
+    // Case 2: Container release was not requested. Likely, the container was running on a node on which the NM died.
+    // In this case, RM assumes that the containers are "lost", even though the container process may still be
+    // running on the node. We need to ensure that the Helix instances running on the orphaned containers
+    // are fenced off from the Helix cluster to avoid double publishing and state being committed by the
+    // instances.
+    LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
+    if (!UNKNOWN_HELIX_INSTANCE.equals(completedInstanceName)) {
+      String clusterName = this.helixManager.getClusterName();
+      //Disable the orphaned instance.
+      if (HelixUtils.isInstanceLive(helixManager, completedInstanceName)) {
+        LOGGER.info("Disabling the Helix instance {}", completedInstanceName);
+        this.helixManager.getClusterManagmentTool().enableInstance(clusterName, completedInstanceName, false);
+      }
+    }
+
+    return false;
+  }
+
   private ImmutableMap.Builder<String, String> buildContainerStatusEventMetadata(ContainerStatus containerStatus) {
     ImmutableMap.Builder<String, String> eventMetadataBuilder = new ImmutableMap.Builder<>();
     eventMetadataBuilder.put(GobblinYarnMetricTagNames.CONTAINER_ID, containerStatus.getContainerId().toString());
@@ -1059,12 +1094,20 @@ public class YarnService extends AbstractIdleService {
     }
   }
 
-  //A class encapsulates Container instances, Helix participant IDs of the containers and Helix Tag
-  @AllArgsConstructor
+  // Class encapsulates Container instances, Helix participant IDs of the containers, Helix Tag, and
+  // initial startup command
   @Getter
-  static class ContainerInfo {
+  class ContainerInfo {
     private final Container container;
     private final String helixParticipantId;
     private final String helixTag;
+    private final String startupCommand;
+
+    public ContainerInfo(Container container, String helixParticipantId, String helixTag) {
+      this.container = container;
+      this.helixParticipantId = helixParticipantId;
+      this.helixTag = helixTag;
+      this.startupCommand = YarnService.this.buildContainerCommand(container, helixParticipantId, helixTag);
+    }
   }
 }
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceIT.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceIT.java
index 2f4443053..751f4fd68 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceIT.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceIT.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -268,28 +267,6 @@ public class YarnServiceIT {
     Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1) == null);
   }
 
-  @Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods = "testReleasedContainerCache")
-  public void testBuildContainerCommand() throws Exception {
-    Config modifiedConfig = this.config
-        .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef("10"))
-        .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef("0.8"));
-    TestYarnService yarnService =
-        new TestYarnService(modifiedConfig, "testApp2", "appId2",
-            this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus);
-
-    ContainerId containerId = ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0),
-        0), 0);
-    Resource resource = Resource.newInstance(2048, 1);
-    Container container = Container.newInstance(containerId, null, null, resource, null, null);
-    YarnService.ContainerInfo
-        containerInfo = new YarnService.ContainerInfo(container, "helixInstance1", "helixTag");
-
-    String command = yarnService.buildContainerCommand(containerInfo);
-
-    // 1628 is from 2048 * 0.8 - 10
-    Assert.assertTrue(command.contains("-Xmx1628"));
-  }
-
   /**
    * Test if requested resource exceed the resource limit, yarnService should fail.
    */
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 99cf8c096..7115b2f07 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -23,8 +23,13 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -43,9 +48,12 @@ import org.testng.annotations.Test;
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 
+import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME;
+import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME;
 import static org.mockito.Mockito.*;
 
 
@@ -54,7 +62,6 @@ import static org.mockito.Mockito.*;
  */
 public class YarnServiceTest {
   final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
-  private TestYarnService yarnService;
   private Config config;
   private YarnConfiguration clusterConf = new YarnConfiguration();
   private final EventBus eventBus = new EventBus("YarnServiceTest");
@@ -88,10 +95,6 @@ public class YarnServiceTest {
         .thenReturn(mockRegisterApplicationMasterResponse);
     when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
         .thenReturn(mockResource);
-
-    // Create the test yarn service, but don't start yet
-    this.yarnService = new TestYarnService(this.config, "testApp", "appId",
-        this.clusterConf, mockFs, this.eventBus);
   }
 
   /**
@@ -100,14 +103,50 @@ public class YarnServiceTest {
    */
   @Test(groups = {"gobblin.yarn"})
   public void testYarnStartUpFirst() throws Exception{
+    // Create the test yarn service, but don't start yet
+    YarnService yarnService = new TestYarnService(this.config, "testApp", "appId",
+        this.clusterConf, mockFs, this.eventBus);
+
     // Not allowed to request target number of containers since yarnService hasn't started up yet.
-    Assert.assertFalse(this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
+    Assert.assertFalse(yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
 
     // Start the yarn service
-    this.yarnService.startUp();
+    yarnService.startUp();
 
     // Allowed to request target number of containers after yarnService is started up.
-    Assert.assertTrue(this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
+    Assert.assertTrue(yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
+  }
+
+  @Test(groups = {"gobblin.yarn"})
+  public void testYarnContainerStartupCommand() throws Exception{
+    final int resourceMemoryMB = 2048;
+    final int jvmMemoryOverheadMB = 10;
+    final double jvmXmxRatio = 0.8;
+    final int expectedJavaHeapSizeMB = (int)(resourceMemoryMB * jvmXmxRatio) - jvmMemoryOverheadMB;
+
+    final String helixInstance = "helixInstance1";
+    final String helixTag = "helixTag";
+
+    Config modifiedConfig = this.config
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef("10"))
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef("0.8"));
+    TestYarnService yarnService = new TestYarnService(modifiedConfig, "testApp2", "appId2",
+            this.clusterConf, FileSystem.getLocal(new Configuration()), this.eventBus);
+
+    ContainerId containerId = ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0),
+        0), 0);
+    Resource resource = Resource.newInstance(resourceMemoryMB, 1);
+    Container container = Container.newInstance(containerId, null, null, resource, null, null);
+    YarnService.ContainerInfo containerInfo =
+        yarnService.new ContainerInfo(container, helixInstance, helixTag);
+
+    String command = containerInfo.getStartupCommand();
+
+    LOG.info(command);
+    Assert.assertTrue(command.contains("-Xmx" + expectedJavaHeapSizeMB +"M"));
+    Assert.assertTrue(command.contains(String.format("--%s %s", HELIX_INSTANCE_NAME_OPTION_NAME, helixInstance)));
+    Assert.assertTrue(command.contains(String.format("--%s %s", HELIX_INSTANCE_TAGS_OPTION_NAME, helixTag)));
+    Assert.assertTrue(command.endsWith("1><LOG_DIR>/GobblinYarnTaskRunner.stdout 2><LOG_DIR>/GobblinYarnTaskRunner.stderr"));
   }
 
   static class TestYarnService extends YarnService {