You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/10/14 11:06:06 UTC
apex-core git commit: APEXCORE-552 #resolve added support for
application tags
Repository: apex-core
Updated Branches:
refs/heads/master 59bdc81f8 -> 81b8c922c
APEXCORE-552 #resolve added support for application tags
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/81b8c922
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/81b8c922
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/81b8c922
Branch: refs/heads/master
Commit: 81b8c922c6442cbf30104eba34ede826615ac7d4
Parents: 59bdc81
Author: David Yan <da...@datatorrent.com>
Authored: Thu Oct 6 14:23:10 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue Oct 11 11:29:00 2016 -0700
----------------------------------------------------------------------
.../java/com/datatorrent/stram/StramClient.java | 11 +++++++++++
.../java/com/datatorrent/stram/cli/ApexCli.java | 20 ++++++++++++++++++++
.../stram/client/StramAppLauncher.java | 17 ++++++++++++-----
3 files changed, 43 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/81b8c922/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 89bca14..45e3fbd 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -27,9 +27,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,6 +123,7 @@ public class StramClient
private String archives;
private String files;
private LinkedHashSet<String> resources;
+ private Set<String> tags = new HashSet<>();
// platform dependencies that are not part of Hadoop and need to be deployed,
// entry below will cause containing jar file from client to be copied to cluster
@@ -605,6 +608,9 @@ public class StramClient
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(queueName);
+ // set the application tags
+ appContext.setApplicationTags(tags);
+
// Submit the application to the applications manager
// SubmitApplicationResponse submitResp = rmClient.submitApplication(appRequest);
// Ignore the response as either a valid response object is returned on success
@@ -686,6 +692,11 @@ public class StramClient
this.queueName = queueName;
}
+ public void addTag(String tag)
+ {
+ this.tags.add(tag);
+ }
+
public void setResources(LinkedHashSet<String> resources)
{
this.resources = resources;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/81b8c922/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 69af2e3..5cfde36 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -1854,6 +1854,9 @@ public class ApexCli
config.set(StramAppLauncher.ORIGINAL_APP_ID, commandLineInfo.origAppId);
}
config.set(StramAppLauncher.QUEUE_NAME, commandLineInfo.queue != null ? commandLineInfo.queue : "default");
+ if (commandLineInfo.tags != null) {
+ config.set(StramAppLauncher.TAGS, commandLineInfo.tags);
+ }
} catch (Exception ex) {
throw new CliException("Error opening the config XML file: " + configFile, ex);
}
@@ -2184,6 +2187,11 @@ public class ApexCli
jsonObj.put("state", ar.getYarnApplicationState().name());
jsonObj.put("trackingUrl", ar.getTrackingUrl());
jsonObj.put("finalStatus", ar.getFinalApplicationStatus());
+ JSONArray tags = new JSONArray();
+ for (String tag : ar.getApplicationTags()) {
+ tags.put(tag);
+ }
+ jsonObj.put("tags", tags);
totalCnt++;
if (ar.getYarnApplicationState() == YarnApplicationState.RUNNING) {
@@ -3371,6 +3379,11 @@ public class ApexCli
response.put("state", appReport.getYarnApplicationState().name());
response.put("trackingUrl", appReport.getTrackingUrl());
response.put("finalStatus", appReport.getFinalApplicationStatus());
+ JSONArray tags = new JSONArray();
+ for (String tag : appReport.getApplicationTags()) {
+ tags.put(tag);
+ }
+ response.put("tags", tags);
printJson(response);
}
@@ -3631,6 +3644,10 @@ public class ApexCli
launchArgs.add("-queue");
launchArgs.add(commandLineInfo.queue);
}
+ if (commandLineInfo.tags != null) {
+ launchArgs.add("-tags");
+ launchArgs.add(commandLineInfo.tags);
+ }
launchArgs.add(appFile);
if (!appFile.endsWith(".json") && !appFile.endsWith(".properties")) {
launchArgs.add(selectedApp.name);
@@ -3902,6 +3919,7 @@ public class ApexCli
final Option originalAppID = add(OptionBuilder.withArgName("application id").hasArg().withDescription("Specify original application identifier for restart.").create("originalAppId"));
final Option exactMatch = add(new Option("exactMatch", "Only consider applications with exact app name"));
final Option queue = add(OptionBuilder.withArgName("queue name").hasArg().withDescription("Specify the queue to launch the application").create("queue"));
+ final Option tags = add(OptionBuilder.withArgName("comma separated tags").hasArg().withDescription("Specify the tags for the application").create("tags"));
final Option force = add(new Option("force", "Force launch the application. Do not check for compatibility"));
final Option useConfigApps = add(OptionBuilder.withArgName("inclusive or exclusive").hasArg().withDescription("\"inclusive\" - merge the apps in config and app package. \"exclusive\" - only show config package apps.").create("useConfigApps"));
@@ -3940,6 +3958,7 @@ public class ApexCli
result.archives = line.getOptionValue(LAUNCH_OPTIONS.archives.getOpt());
result.files = line.getOptionValue(LAUNCH_OPTIONS.files.getOpt());
result.queue = line.getOptionValue(LAUNCH_OPTIONS.queue.getOpt());
+ result.tags = line.getOptionValue(LAUNCH_OPTIONS.tags.getOpt());
result.args = line.getArgs();
result.origAppId = line.getOptionValue(LAUNCH_OPTIONS.originalAppID.getOpt());
result.exactMatch = line.hasOption("exactMatch");
@@ -3959,6 +3978,7 @@ public class ApexCli
String libjars;
String files;
String queue;
+ String tags;
String archives;
String origAppId;
boolean exactMatch;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/81b8c922/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 619252f..961a97b 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -88,11 +88,12 @@ import com.datatorrent.stram.security.StramUserLogin;
public class StramAppLauncher
{
public static final String CLASSPATH_RESOLVERS_KEY_NAME = StreamingApplication.DT_PREFIX + "classpath.resolvers";
- public static final String LIBJARS_CONF_KEY_NAME = "tmplibjars";
- public static final String FILES_CONF_KEY_NAME = "tmpfiles";
- public static final String ARCHIVES_CONF_KEY_NAME = "tmparchives";
- public static final String ORIGINAL_APP_ID = "tmpOriginalAppId";
- public static final String QUEUE_NAME = "queueName";
+ public static final String LIBJARS_CONF_KEY_NAME = "_apex.libjars";
+ public static final String FILES_CONF_KEY_NAME = "_apex.files";
+ public static final String ARCHIVES_CONF_KEY_NAME = "_apex.archives";
+ public static final String ORIGINAL_APP_ID = "_apex.originalAppId";
+ public static final String QUEUE_NAME = "_apex.queueName";
+ public static final String TAGS = "_apex.tags";
private static final Logger LOG = LoggerFactory.getLogger(StramAppLauncher.class);
private File jarFile;
@@ -630,6 +631,12 @@ public class StramAppLauncher
client.setArchives(conf.get(ARCHIVES_CONF_KEY_NAME));
client.setOriginalAppId(conf.get(ORIGINAL_APP_ID));
client.setQueueName(conf.get(QUEUE_NAME));
+ String tags = conf.get(TAGS);
+ if (tags != null) {
+ for (String tag : tags.split(",")) {
+ client.addTag(tag.trim());
+ }
+ }
client.startApplication();
return client.getApplicationReport().getApplicationId();
} finally {