You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/10/14 11:06:06 UTC

apex-core git commit: APEXCORE-552 #resolve added support for application tags

Repository: apex-core
Updated Branches:
  refs/heads/master 59bdc81f8 -> 81b8c922c


APEXCORE-552 #resolve added support for application tags


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/81b8c922
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/81b8c922
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/81b8c922

Branch: refs/heads/master
Commit: 81b8c922c6442cbf30104eba34ede826615ac7d4
Parents: 59bdc81
Author: David Yan <da...@datatorrent.com>
Authored: Thu Oct 6 14:23:10 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue Oct 11 11:29:00 2016 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/StramClient.java | 11 +++++++++++
 .../java/com/datatorrent/stram/cli/ApexCli.java | 20 ++++++++++++++++++++
 .../stram/client/StramAppLauncher.java          | 17 ++++++++++++-----
 3 files changed, 43 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/81b8c922/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 89bca14..45e3fbd 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -27,9 +27,11 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,6 +123,7 @@ public class StramClient
   private String archives;
   private String files;
   private LinkedHashSet<String> resources;
+  private Set<String> tags = new HashSet<>();
 
   // platform dependencies that are not part of Hadoop and need to be deployed,
   // entry below will cause containing jar file from client to be copied to cluster
@@ -605,6 +608,9 @@ public class StramClient
       // Set the queue to which this application is to be submitted in the RM
       appContext.setQueue(queueName);
 
+      // set the application tags
+      appContext.setApplicationTags(tags);
+
       // Submit the application to the applications manager
       // SubmitApplicationResponse submitResp = rmClient.submitApplication(appRequest);
       // Ignore the response as either a valid response object is returned on success
@@ -686,6 +692,11 @@ public class StramClient
     this.queueName = queueName;
   }
 
+  public void addTag(String tag)
+  {
+    this.tags.add(tag);
+  }
+
   public void setResources(LinkedHashSet<String> resources)
   {
     this.resources = resources;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/81b8c922/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 69af2e3..5cfde36 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -1854,6 +1854,9 @@ public class ApexCli
             config.set(StramAppLauncher.ORIGINAL_APP_ID, commandLineInfo.origAppId);
           }
           config.set(StramAppLauncher.QUEUE_NAME, commandLineInfo.queue != null ? commandLineInfo.queue : "default");
+          if (commandLineInfo.tags != null) {
+            config.set(StramAppLauncher.TAGS, commandLineInfo.tags);
+          }
         } catch (Exception ex) {
           throw new CliException("Error opening the config XML file: " + configFile, ex);
         }
@@ -2184,6 +2187,11 @@ public class ApexCli
           jsonObj.put("state", ar.getYarnApplicationState().name());
           jsonObj.put("trackingUrl", ar.getTrackingUrl());
           jsonObj.put("finalStatus", ar.getFinalApplicationStatus());
+          JSONArray tags = new JSONArray();
+          for (String tag : ar.getApplicationTags()) {
+            tags.put(tag);
+          }
+          jsonObj.put("tags", tags);
 
           totalCnt++;
           if (ar.getYarnApplicationState() == YarnApplicationState.RUNNING) {
@@ -3371,6 +3379,11 @@ public class ApexCli
       response.put("state", appReport.getYarnApplicationState().name());
       response.put("trackingUrl", appReport.getTrackingUrl());
       response.put("finalStatus", appReport.getFinalApplicationStatus());
+      JSONArray tags = new JSONArray();
+      for (String tag : appReport.getApplicationTags()) {
+        tags.put(tag);
+      }
+      response.put("tags", tags);
       printJson(response);
     }
 
@@ -3631,6 +3644,10 @@ public class ApexCli
       launchArgs.add("-queue");
       launchArgs.add(commandLineInfo.queue);
     }
+    if (commandLineInfo.tags != null) {
+      launchArgs.add("-tags");
+      launchArgs.add(commandLineInfo.tags);
+    }
     launchArgs.add(appFile);
     if (!appFile.endsWith(".json") && !appFile.endsWith(".properties")) {
       launchArgs.add(selectedApp.name);
@@ -3902,6 +3919,7 @@ public class ApexCli
     final Option originalAppID = add(OptionBuilder.withArgName("application id").hasArg().withDescription("Specify original application identifier for restart.").create("originalAppId"));
     final Option exactMatch = add(new Option("exactMatch", "Only consider applications with exact app name"));
     final Option queue = add(OptionBuilder.withArgName("queue name").hasArg().withDescription("Specify the queue to launch the application").create("queue"));
+    final Option tags = add(OptionBuilder.withArgName("comma separated tags").hasArg().withDescription("Specify the tags for the application").create("tags"));
     final Option force = add(new Option("force", "Force launch the application. Do not check for compatibility"));
     final Option useConfigApps = add(OptionBuilder.withArgName("inclusive or exclusive").hasArg().withDescription("\"inclusive\" - merge the apps in config and app package. \"exclusive\" - only show config package apps.").create("useConfigApps"));
 
@@ -3940,6 +3958,7 @@ public class ApexCli
     result.archives = line.getOptionValue(LAUNCH_OPTIONS.archives.getOpt());
     result.files = line.getOptionValue(LAUNCH_OPTIONS.files.getOpt());
     result.queue = line.getOptionValue(LAUNCH_OPTIONS.queue.getOpt());
+    result.tags = line.getOptionValue(LAUNCH_OPTIONS.tags.getOpt());
     result.args = line.getArgs();
     result.origAppId = line.getOptionValue(LAUNCH_OPTIONS.originalAppID.getOpt());
     result.exactMatch = line.hasOption("exactMatch");
@@ -3959,6 +3978,7 @@ public class ApexCli
     String libjars;
     String files;
     String queue;
+    String tags;
     String archives;
     String origAppId;
     boolean exactMatch;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/81b8c922/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 619252f..961a97b 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -88,11 +88,12 @@ import com.datatorrent.stram.security.StramUserLogin;
 public class StramAppLauncher
 {
   public static final String CLASSPATH_RESOLVERS_KEY_NAME = StreamingApplication.DT_PREFIX + "classpath.resolvers";
-  public static final String LIBJARS_CONF_KEY_NAME = "tmplibjars";
-  public static final String FILES_CONF_KEY_NAME = "tmpfiles";
-  public static final String ARCHIVES_CONF_KEY_NAME = "tmparchives";
-  public static final String ORIGINAL_APP_ID = "tmpOriginalAppId";
-  public static final String QUEUE_NAME = "queueName";
+  public static final String LIBJARS_CONF_KEY_NAME = "_apex.libjars";
+  public static final String FILES_CONF_KEY_NAME = "_apex.files";
+  public static final String ARCHIVES_CONF_KEY_NAME = "_apex.archives";
+  public static final String ORIGINAL_APP_ID = "_apex.originalAppId";
+  public static final String QUEUE_NAME = "_apex.queueName";
+  public static final String TAGS = "_apex.tags";
 
   private static final Logger LOG = LoggerFactory.getLogger(StramAppLauncher.class);
   private File jarFile;
@@ -630,6 +631,12 @@ public class StramAppLauncher
       client.setArchives(conf.get(ARCHIVES_CONF_KEY_NAME));
       client.setOriginalAppId(conf.get(ORIGINAL_APP_ID));
       client.setQueueName(conf.get(QUEUE_NAME));
+      String tags = conf.get(TAGS);
+      if (tags != null) {
+        for (String tag : tags.split(",")) {
+          client.addTag(tag.trim());
+        }
+      }
       client.startApplication();
       return client.getApplicationReport().getApplicationId();
     } finally {