You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/20 16:34:21 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1091] Pass Yarn
application id as part of AppMaster and YarnTaskRunner's start up command[]
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a634612 [GOBBLIN-1091] Pass Yarn application id as part of AppMaster and YarnTaskRunner's start up command[]
a634612 is described below
commit a63461257c3fcea8f4ff67087f8cb29be25d6baf
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Mar 20 09:34:13 2020 -0700
[GOBBLIN-1091] Pass Yarn application id as part of AppMaster and YarnTaskRunner's start up command[]
Closes #2933 from sv2000/yarnApplicationId
---
.../gobblin/cluster/GobblinClusterConfigurationKeys.java | 1 +
.../org/apache/gobblin/cluster/GobblinTaskRunner.java | 2 ++
.../main/java/org/apache/gobblin/cluster/SingleTask.java | 14 ++++++++++----
.../org/apache/gobblin/yarn/GobblinApplicationMaster.java | 15 +++++++++------
.../org/apache/gobblin/yarn/GobblinYarnAppLauncher.java | 6 ++++--
.../org/apache/gobblin/yarn/GobblinYarnTaskRunner.java | 10 ++++++----
.../main/java/org/apache/gobblin/yarn/YarnService.java | 2 ++
.../apache/gobblin/yarn/GobblinYarnAppLauncherTest.java | 4 ++--
8 files changed, 36 insertions(+), 18 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 7dbf7b1..128a5d6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -42,6 +42,7 @@ public class GobblinClusterConfigurationKeys {
// General Gobblin Cluster application configuration properties.
public static final String APPLICATION_NAME_OPTION_NAME = "app_name";
+ public static final String APPLICATION_ID_OPTION_NAME = "app_id";
public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";
public static final String STANDALONE_CLUSTER_MODE_KEY = GOBBLIN_CLUSTER_PREFIX + "standaloneMode";
public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 6b3fff7..b9f1c96 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -611,6 +611,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
Options options = new Options();
options.addOption("a", GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true,
"Application name");
+ options.addOption("d", GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME, true,
+ "Application id");
options.addOption("i", GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true,
"Helix instance name");
options.addOption(Option.builder("t").longOpt(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 4778371..9dd0a1c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -138,10 +138,16 @@ public class SingleTask {
String storeName = _workUnitFilePath.getParent().getName();
WorkUnit workUnit;
- if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) {
- workUnit = _stateStores.getMwuStateStore().getAll(storeName, fileName).get(0);
- } else {
- workUnit = _stateStores.getWuStateStore().getAll(storeName, fileName).get(0);
+ try {
+ if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) {
+ workUnit = _stateStores.getMwuStateStore().getAll(storeName, fileName).get(0);
+ } else {
+ workUnit = _stateStores.getWuStateStore().getAll(storeName, fileName).get(0);
+ }
+ } catch (IOException e) {
+ //Add workunitFilePath to the IOException message to aid debugging
+ throw new IOException("Exception retrieving state from state store for workunit: " + _workUnitFilePath.toString(),
+ e);
}
// The list of individual WorkUnits (flattened) to run
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 282156d..14488d5 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -80,11 +80,11 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
@Getter
private final YarnService yarnService;
- public GobblinApplicationMaster(String applicationName, ContainerId containerId, Config config,
+ public GobblinApplicationMaster(String applicationName, String applicationId, ContainerId containerId, Config config,
YarnConfiguration yarnConfiguration) throws Exception {
- super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(),
- GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
- ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
+ super(applicationName, applicationId, GobblinClusterUtils.addDynamicConfig(config
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+ ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
Optional.<Path>absent());
String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
@@ -204,6 +204,7 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
private static Options buildOptions() {
Options options = new Options();
options.addOption("a", GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true, "Yarn application name");
+ options.addOption("d", GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME, true, "Yarn application id");
return options;
}
@@ -216,7 +217,8 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
Options options = buildOptions();
try {
CommandLine cmd = new DefaultParser().parse(options, args);
- if (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)) {
+ if (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) ||
+ (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME))) {
printUsage(options);
System.exit(1);
}
@@ -231,7 +233,8 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
try (GobblinApplicationMaster applicationMaster = new GobblinApplicationMaster(
- cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME), containerId,
+ cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
+ cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME), containerId,
ConfigFactory.load(), new YarnConfiguration())) {
applicationMaster.start();
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 3367842..a6d7a88 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -548,7 +548,7 @@ public class GobblinYarnAppLauncher {
ContainerLaunchContext amContainerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
amContainerLaunchContext.setLocalResources(appMasterLocalResources);
amContainerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
- amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(resource.getMemory())));
+ amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(), resource.getMemory())));
Map<ApplicationAccessType, String> acls = new HashMap<>(1);
acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -729,7 +729,7 @@ public class GobblinYarnAppLauncher {
}
@VisibleForTesting
- protected String buildApplicationMasterCommand(int memoryMbs) {
+ protected String buildApplicationMasterCommand(String applicationId, int memoryMbs) {
String appMasterClassName = GobblinApplicationMaster.class.getSimpleName();
return new StringBuilder()
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
@@ -741,6 +741,8 @@ public class GobblinYarnAppLauncher {
.append(" ").append(GobblinApplicationMaster.class.getName())
.append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
+ .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
+ .append(" ").append(applicationId)
.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
appMasterClassName).append(".").append(ApplicationConstants.STDOUT)
.append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index dc68162..d59a429 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -59,9 +59,9 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
public static final String HELIX_YARN_INSTANCE_NAME_PREFIX = GobblinYarnTaskRunner.class.getSimpleName();
- public GobblinYarnTaskRunner(String applicationName, String helixInstanceName, ContainerId containerId, Config config,
+ public GobblinYarnTaskRunner(String applicationName, String applicationId, String helixInstanceName, ContainerId containerId, Config config,
Optional<Path> appWorkDirOptional) throws Exception {
- super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId),
+ super(applicationName, helixInstanceName, applicationId, getTaskRunnerId(containerId),
GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))), appWorkDirOptional);
}
@@ -176,7 +176,8 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
try {
CommandLine cmd = new DefaultParser().parse(options, args);
if (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) || !cmd
- .hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)) {
+ .hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME) || !cmd
+ .hasOption(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)) {
printUsage(options);
System.exit(1);
}
@@ -190,6 +191,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
ContainerId containerId =
ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
String applicationName = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
+ String applicationId = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME);
String helixInstanceName = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME);
String helixInstanceTags = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME);
@@ -199,7 +201,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
}
GobblinTaskRunner gobblinTaskRunner =
- new GobblinYarnTaskRunner(applicationName, helixInstanceName, containerId, config,
+ new GobblinYarnTaskRunner(applicationName, applicationId, helixInstanceName, containerId, config,
Optional.<Path>absent());
gobblinTaskRunner.start();
} catch (ParseException pe) {
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 027be31..4586571 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -577,6 +577,8 @@ public class YarnService extends AbstractIdleService {
.append(" ").append(GobblinYarnTaskRunner.class.getName())
.append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
+ .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
+ .append(" ").append(this.applicationId)
.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
.append(" ").append(helixInstanceName);
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 0d2f3a5..adc8bd0 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -209,7 +209,7 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
@Test
public void testBuildApplicationMasterCommand() {
- String command = this.gobblinYarnAppLauncher.buildApplicationMasterCommand(64);
+ String command = this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456", 64);
// 41 is from 64 * 0.8 - 10
Assert.assertTrue(command.contains("-Xmx41"));
@@ -434,7 +434,7 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
public TestApplicationMaster(String applicationName, ContainerId containerId, Config config,
YarnConfiguration yarnConfiguration)
throws Exception {
- super(applicationName, containerId, config, yarnConfiguration);
+ super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(), containerId, config, yarnConfiguration);
}
@Override