You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/20 16:34:21 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1091] Pass Yarn application id as part of AppMaster and YarnTaskRunner's start up command[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a634612  [GOBBLIN-1091] Pass Yarn application id as part of AppMaster and YarnTaskRunner's start up command[]
a634612 is described below

commit a63461257c3fcea8f4ff67087f8cb29be25d6baf
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Mar 20 09:34:13 2020 -0700

    [GOBBLIN-1091] Pass Yarn application id as part of AppMaster and YarnTaskRunner's start up command[]
    
    Closes #2933 from sv2000/yarnApplicationId
---
 .../gobblin/cluster/GobblinClusterConfigurationKeys.java  |  1 +
 .../org/apache/gobblin/cluster/GobblinTaskRunner.java     |  2 ++
 .../main/java/org/apache/gobblin/cluster/SingleTask.java  | 14 ++++++++++----
 .../org/apache/gobblin/yarn/GobblinApplicationMaster.java | 15 +++++++++------
 .../org/apache/gobblin/yarn/GobblinYarnAppLauncher.java   |  6 ++++--
 .../org/apache/gobblin/yarn/GobblinYarnTaskRunner.java    | 10 ++++++----
 .../main/java/org/apache/gobblin/yarn/YarnService.java    |  2 ++
 .../apache/gobblin/yarn/GobblinYarnAppLauncherTest.java   |  4 ++--
 8 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 7dbf7b1..128a5d6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -42,6 +42,7 @@ public class GobblinClusterConfigurationKeys {
 
   // General Gobblin Cluster application configuration properties.
   public static final String APPLICATION_NAME_OPTION_NAME = "app_name";
+  public static final String APPLICATION_ID_OPTION_NAME = "app_id";
   public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";
   public static final String STANDALONE_CLUSTER_MODE_KEY = GOBBLIN_CLUSTER_PREFIX + "standaloneMode";
   public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 6b3fff7..b9f1c96 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -611,6 +611,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     Options options = new Options();
     options.addOption("a", GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true,
         "Application name");
+    options.addOption("d", GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME, true,
+        "Application id");
     options.addOption("i", GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true,
         "Helix instance name");
     options.addOption(Option.builder("t").longOpt(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 4778371..9dd0a1c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -138,10 +138,16 @@ public class SingleTask {
     String storeName = _workUnitFilePath.getParent().getName();
     WorkUnit workUnit;
 
-    if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) {
-      workUnit = _stateStores.getMwuStateStore().getAll(storeName, fileName).get(0);
-    } else {
-      workUnit = _stateStores.getWuStateStore().getAll(storeName, fileName).get(0);
+    try {
+      if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) {
+        workUnit = _stateStores.getMwuStateStore().getAll(storeName, fileName).get(0);
+      } else {
+        workUnit = _stateStores.getWuStateStore().getAll(storeName, fileName).get(0);
+      }
+    } catch (IOException e) {
+      //Add workunitFilePath to the IOException message to aid debugging
+      throw new IOException("Exception retrieving state from state store for workunit: " + _workUnitFilePath.toString(),
+          e);
     }
 
     // The list of individual WorkUnits (flattened) to run
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 282156d..14488d5 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -80,11 +80,11 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
   @Getter
   private final YarnService yarnService;
 
-  public GobblinApplicationMaster(String applicationName, ContainerId containerId, Config config,
+  public GobblinApplicationMaster(String applicationName, String applicationId, ContainerId containerId, Config config,
       YarnConfiguration yarnConfiguration) throws Exception {
-    super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(),
-        GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
-            ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
+    super(applicationName, applicationId, GobblinClusterUtils.addDynamicConfig(config
+            .withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+                ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
         Optional.<Path>absent());
 
     String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
@@ -204,6 +204,7 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
   private static Options buildOptions() {
     Options options = new Options();
     options.addOption("a", GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true, "Yarn application name");
+    options.addOption("d", GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME, true, "Yarn application id");
     return options;
   }
 
@@ -216,7 +217,8 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
     Options options = buildOptions();
     try {
       CommandLine cmd = new DefaultParser().parse(options, args);
-      if (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)) {
+      if (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) ||
+          (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME))) {
         printUsage(options);
         System.exit(1);
       }
@@ -231,7 +233,8 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
           ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
 
       try (GobblinApplicationMaster applicationMaster = new GobblinApplicationMaster(
-          cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME), containerId,
+          cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
+          cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME), containerId,
           ConfigFactory.load(), new YarnConfiguration())) {
 
         applicationMaster.start();
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 3367842..a6d7a88 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -548,7 +548,7 @@ public class GobblinYarnAppLauncher {
     ContainerLaunchContext amContainerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
     amContainerLaunchContext.setLocalResources(appMasterLocalResources);
     amContainerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
-    amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(resource.getMemory())));
+    amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(), resource.getMemory())));
 
     Map<ApplicationAccessType, String> acls = new HashMap<>(1);
     acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -729,7 +729,7 @@ public class GobblinYarnAppLauncher {
   }
 
   @VisibleForTesting
-  protected String buildApplicationMasterCommand(int memoryMbs) {
+  protected String buildApplicationMasterCommand(String applicationId, int memoryMbs) {
     String appMasterClassName = GobblinApplicationMaster.class.getSimpleName();
     return new StringBuilder()
         .append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
@@ -741,6 +741,8 @@ public class GobblinYarnAppLauncher {
         .append(" ").append(GobblinApplicationMaster.class.getName())
         .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
         .append(" ").append(this.applicationName)
+        .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
+        .append(" ").append(applicationId)
         .append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
             appMasterClassName).append(".").append(ApplicationConstants.STDOUT)
         .append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index dc68162..d59a429 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -59,9 +59,9 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
 
   public static final String HELIX_YARN_INSTANCE_NAME_PREFIX = GobblinYarnTaskRunner.class.getSimpleName();
 
-  public GobblinYarnTaskRunner(String applicationName, String helixInstanceName, ContainerId containerId, Config config,
+  public GobblinYarnTaskRunner(String applicationName, String applicationId, String helixInstanceName, ContainerId containerId, Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
-    super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId),
+    super(applicationName, helixInstanceName, applicationId, getTaskRunnerId(containerId),
         GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
             ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))), appWorkDirOptional);
   }
@@ -176,7 +176,8 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
     try {
       CommandLine cmd = new DefaultParser().parse(options, args);
       if (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) || !cmd
-          .hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)) {
+          .hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME) || !cmd
+    .hasOption(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)) {
         printUsage(options);
         System.exit(1);
       }
@@ -190,6 +191,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
       ContainerId containerId =
           ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
       String applicationName = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
+      String applicationId = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME);
       String helixInstanceName = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME);
       String helixInstanceTags = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME);
 
@@ -199,7 +201,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
       }
 
       GobblinTaskRunner gobblinTaskRunner =
-          new GobblinYarnTaskRunner(applicationName, helixInstanceName, containerId, config,
+          new GobblinYarnTaskRunner(applicationName, applicationId, helixInstanceName, containerId, config,
               Optional.<Path>absent());
       gobblinTaskRunner.start();
     } catch (ParseException pe) {
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 027be31..4586571 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -577,6 +577,8 @@ public class YarnService extends AbstractIdleService {
         .append(" ").append(GobblinYarnTaskRunner.class.getName())
         .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
         .append(" ").append(this.applicationName)
+        .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
+        .append(" ").append(this.applicationId)
         .append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
         .append(" ").append(helixInstanceName);
 
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 0d2f3a5..adc8bd0 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -209,7 +209,7 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
 
   @Test
   public void testBuildApplicationMasterCommand() {
-    String command = this.gobblinYarnAppLauncher.buildApplicationMasterCommand(64);
+    String command = this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456", 64);
 
     // 41 is from 64 * 0.8 - 10
     Assert.assertTrue(command.contains("-Xmx41"));
@@ -434,7 +434,7 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
     public TestApplicationMaster(String applicationName, ContainerId containerId, Config config,
         YarnConfiguration yarnConfiguration)
         throws Exception {
-      super(applicationName, containerId, config, yarnConfiguration);
+      super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(), containerId, config, yarnConfiguration);
     }
 
     @Override