You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2017/08/05 04:16:09 UTC

[1/5] twill git commit: (TWILL-241) Added support for per Runnable configuration

Repository: twill
Updated Branches:
  refs/heads/feature/TWILL-241-per-runnable-opts [created] 6ccc7158e


(TWILL-241) Added support for per Runnable configuration


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/f1931deb
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/f1931deb
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/f1931deb

Branch: refs/heads/feature/TWILL-241-per-runnable-opts
Commit: f1931deb128dda8b3c4da76486ab1b443318496e
Parents: dbbc2a3
Author: Terence Yim <ch...@apache.org>
Authored: Fri Aug 4 15:29:05 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Aug 4 18:06:26 2017 -0700

----------------------------------------------------------------------
 .../org/apache/twill/api/TwillPreparer.java     | 11 +++
 .../org/apache/twill/api/TwillRunResources.java |  5 ++
 .../internal/DefaultTwillRunResources.java      | 13 +++-
 .../twill/internal/TwillContainerLauncher.java  | 19 ++++-
 .../internal/TwillRuntimeSpecification.java     | 77 ++++++++++++++++----
 .../internal/json/TwillRunResourcesCodec.java   | 20 +++--
 .../json/TwillRuntimeSpecificationCodec.java    | 29 +++++---
 .../appmaster/ApplicationMasterService.java     | 17 +++--
 .../internal/appmaster/RunningContainers.java   |  6 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    | 57 +++++++--------
 .../apache/twill/yarn/ContainerSizeTestRun.java | 27 ++++++-
 11 files changed, 199 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index 1f50972..35930d2 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -39,6 +39,17 @@ public interface TwillPreparer {
   TwillPreparer withConfiguration(Map<String, String> config);
 
   /**
+   * Overrides the default configuration with the given set of configurations for the given runnable only.
+   * This is useful to override configurations that affects runnables, such as
+   * {@link Configs.Keys#JAVA_RESERVED_MEMORY_MB} and {@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}.
+   *
+   * @param runnableName Name of the {@link TwillRunnable}.
+   * @param config set of configurations to override
+   * @return This {@link TwillPreparer}
+   */
+  TwillPreparer withConfiguration(String runnableName, Map<String, String> config);
+
+  /**
    * Adds a {@link LogHandler} for receiving an application log.
    * @param handler The {@link LogHandler}.
    * @return This {@link TwillPreparer}.

http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
index f721f47..287b901 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
@@ -44,6 +44,11 @@ public interface TwillRunResources {
   int getMemoryMB();
 
   /**
+   * @return the maximum amount of memory in MB of Java process heap memory.
+   */
+  int getMaxHeapMemoryMB();
+
+  /**
    * @return the host the runnable is running on.
    */
   String getHost();

http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
index 83b973a..f05074e 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
@@ -35,6 +35,7 @@ public class DefaultTwillRunResources implements TwillRunResources {
   private final int instanceId;
   private final int virtualCores;
   private final int memoryMB;
+  private final int maxHeapMemoryMB;
   private final String host;
   private final Integer debugPort;
   private final Map<String, LogEntry.Level> logLevels;
@@ -42,17 +43,18 @@ public class DefaultTwillRunResources implements TwillRunResources {
   /**
    * Constructor to create an instance of {@link DefaultTwillRunResources} with empty log levels.
    */
-  public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB,
+  public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB,
                                   String host, Integer debugPort) {
-    this(instanceId, containerId, cores, memoryMB, host, debugPort, Collections.<String, Level>emptyMap());
+    this(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, debugPort, Collections.<String, Level>emptyMap());
   }
 
-  public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB,
+  public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB,
                                   String host, Integer debugPort, Map<String, LogEntry.Level> logLevels) {
     this.instanceId = instanceId;
     this.containerId = containerId;
     this.virtualCores = cores;
     this.memoryMB = memoryMB;
+    this.maxHeapMemoryMB = maxHeapMemoryMB;
     this.host = host;
     this.debugPort = debugPort;
     this.logLevels = new HashMap<>(logLevels);
@@ -91,6 +93,11 @@ public class DefaultTwillRunResources implements TwillRunResources {
     return memoryMB;
   }
 
+  @Override
+  public int getMaxHeapMemoryMB() {
+    return maxHeapMemoryMB;
+  }
+
   /**
    * @return the host the runnable is running on.
    */

http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 0f8674b..700c0f1 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -61,6 +61,7 @@ public final class TwillContainerLauncher {
   private final int reservedMemory;
   private final double minHeapRatio;
   private final Location secureStoreLocation;
+  private int maxHeapSizeMB;
 
   public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo,
                                 ProcessLauncher.PrepareLaunchContext launchContext,
@@ -144,13 +145,12 @@ public final class TwillContainerLauncher {
       firstCommand = "$JAVA_HOME/bin/java";
     }
 
-    int memory = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory,
-            minHeapRatio);
+    maxHeapSizeMB = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory, minHeapRatio);
     commandBuilder.add("-Djava.io.tmpdir=tmp",
                        "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
                        "-Dtwill.runnable=$" + Constants.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,
                        "-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
-                       "-Xmx" + memory + "m");
+                       "-Xmx" + maxHeapSizeMB + "m");
     if (jvmOpts.getExtraOptions() != null) {
       commandBuilder.add(jvmOpts.getExtraOptions());
     }
@@ -169,6 +169,19 @@ public final class TwillContainerLauncher {
     return controller;
   }
 
+  /**
+   * Returns the maximum heap memory size in MB of the Java process launched in the container.
+   * This method can only be called after the {@link #start(RunId, int, Class, String, Location)} method.
+   *
+   * @throws IllegalStateException if the {@link #start(RunId, int, Class, String, Location)} was not called yet.
+   */
+  public int getMaxHeapMemoryMB() {
+    if (maxHeapSizeMB <= 0) {
+      throw new IllegalStateException("Unknown maximum heap memory size. Please make sure the container is started");
+    }
+    return maxHeapSizeMB;
+  }
+
   private static final class TwillContainerControllerImpl extends AbstractZKServiceController
                                                           implements TwillContainerController {
 

http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
index 831c831..636d94d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.internal;
 
+import org.apache.twill.api.Configs;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillApplication;
 import org.apache.twill.api.TwillSpecification;
@@ -37,30 +38,28 @@ public class TwillRuntimeSpecification {
   private final String zkConnectStr;
   private final RunId twillRunId;
   private final String twillAppName;
-  private final int reservedMemory;
   private final String rmSchedulerAddr;
   private final Map<String, Map<String, String>> logLevels;
   private final Map<String, Integer> maxRetries;
-  private final double minHeapRatio;
-  private final boolean logCollectionEnabled;
+  private final Map<String, String> config;
+  private final Map<String, Map<String, String>> runnableConfigs;
 
   public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir,
                                    String zkConnectStr, RunId twillRunId, String twillAppName,
-                                   int reservedMemory, @Nullable String rmSchedulerAddr,
-                                   Map<String, Map<String, String>> logLevels, Map<String, Integer> maxRetries,
-                                   double minHeapRatio, boolean logCollectionEnabled) {
+                                   @Nullable String rmSchedulerAddr, Map<String, Map<String, String>> logLevels,
+                                   Map<String, Integer> maxRetries, Map<String, String> config,
+                                   Map<String, Map<String, String>> runnableConfigs) {
     this.twillSpecification = twillSpecification;
     this.fsUser = fsUser;
     this.twillAppDir = twillAppDir;
     this.zkConnectStr = zkConnectStr;
     this.twillRunId = twillRunId;
     this.twillAppName = twillAppName;
-    this.reservedMemory = reservedMemory;
     this.rmSchedulerAddr = rmSchedulerAddr;
     this.logLevels = logLevels;
     this.maxRetries = maxRetries;
-    this.minHeapRatio = minHeapRatio;
-    this.logCollectionEnabled = logCollectionEnabled;
+    this.config = config;
+    this.runnableConfigs = runnableConfigs;
   }
 
   public TwillSpecification getTwillSpecification() {
@@ -87,19 +86,46 @@ public class TwillRuntimeSpecification {
     return twillAppName;
   }
 
-  public int getReservedMemory() {
-    return reservedMemory;
+  /**
+   * Returns the minimum heap ratio for the application master.
+   */
+  public double getAMMinHeapRatio() {
+    return getMinHeapRatio(config);
+  }
+
+  /**
+   * Returns the minimum heap ratio for the given runnable.
+   */
+  public double getMinHeapRatio(String runnableName) {
+    return getMinHeapRatio(runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config);
   }
 
-  public double getMinHeapRatio() {
-    return minHeapRatio;
+  /**
+   * Returns the reserved non-heap memory size in MB for the application master.
+   */
+  public int getAMReservedMemory() {
+    return config.containsKey(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB) ?
+      Integer.parseInt(config.get(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB)) :
+      Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB;
+  }
+
+  /**
+   * Returns the reserved non-heap memory size in MB for the given runnable.
+   */
+  public int getReservedMemory(String runnableName) {
+    Map<String, String> conf = runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config;
+    return conf.containsKey(Configs.Keys.JAVA_RESERVED_MEMORY_MB) ?
+      Integer.parseInt(conf.get(Configs.Keys.JAVA_RESERVED_MEMORY_MB)) :
+      Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
   }
 
   /**
    * Returns whether log collection is enabled.
    */
   public boolean isLogCollectionEnabled() {
-    return logCollectionEnabled;
+    return config.containsKey(Configs.Keys.LOG_COLLECTION_ENABLED) ?
+      Boolean.parseBoolean(config.get(Configs.Keys.LOG_COLLECTION_ENABLED)) :
+      Configs.Defaults.LOG_COLLECTION_ENABLED;
   }
 
   @Nullable
@@ -116,6 +142,20 @@ public class TwillRuntimeSpecification {
   }
 
   /**
+   * Returns the configuration for the application.
+   */
+  public Map<String, String> getConfig() {
+    return config;
+  }
+
+  /**
+   * Returns the configurations for each runnable.
+   */
+  public Map<String, Map<String, String>> getRunnableConfigs() {
+    return runnableConfigs;
+  }
+
+  /**
    * Returns the ZK connection string for the Kafka used for log collections,
    * or {@code null} if log collection is disabled.
    */
@@ -127,4 +167,13 @@ public class TwillRuntimeSpecification {
     // When addressing TWILL-147, a field can be introduced to carry this value.
     return String.format("%s/%s/%s/kafka", getZkConnectStr(), getTwillAppName(), getTwillAppRunId());
   }
+
+  /**
+   * Returns the minimum heap ratio ({@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}) based on the given configuration.
+   */
+  private double getMinHeapRatio(Map<String, String> config) {
+    return config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO) ?
+      Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) :
+      Configs.Defaults.HEAP_RESERVED_MIN_RATIO;
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
index bb4d435..c9196c4 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
@@ -41,6 +41,7 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
   private static final String INSTANCE_ID = "instanceId";
   private static final String HOST = "host";
   private static final String MEMORY_MB = "memoryMB";
+  private static final String MAX_HEAP_MEMORY_MB = "maxHeapMemoryMB";
   private static final String VIRTUAL_CORES = "virtualCores";
   private static final String DEBUG_PORT = "debugPort";
   private static final String LOG_LEVELS = "logLevels";
@@ -53,6 +54,7 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
     json.addProperty(INSTANCE_ID, src.getInstanceId());
     json.addProperty(HOST, src.getHost());
     json.addProperty(MEMORY_MB, src.getMemoryMB());
+    json.addProperty(MAX_HEAP_MEMORY_MB, src.getMaxHeapMemoryMB());
     json.addProperty(VIRTUAL_CORES, src.getVirtualCores());
     if (src.getDebugPort() != null) {
       json.addProperty(DEBUG_PORT, src.getDebugPort());
@@ -69,12 +71,16 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
     JsonObject jsonObj = json.getAsJsonObject();
     Map<String, LogEntry.Level> logLevels =
       context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, LogEntry.Level>>() { }.getType());
-    return new DefaultTwillRunResources(jsonObj.get(INSTANCE_ID).getAsInt(),
-                                        jsonObj.get(CONTAINER_ID).getAsString(),
-                                        jsonObj.get(VIRTUAL_CORES).getAsInt(),
-                                        jsonObj.get(MEMORY_MB).getAsInt(),
-                                        jsonObj.get(HOST).getAsString(),
-                                        jsonObj.has(DEBUG_PORT) ? jsonObj.get(DEBUG_PORT).getAsInt() : null,
-                                        logLevels);
+    int memoryMB = jsonObj.get(MEMORY_MB).getAsInt();
+    return new DefaultTwillRunResources(
+      jsonObj.get(INSTANCE_ID).getAsInt(),
+      jsonObj.get(CONTAINER_ID).getAsString(),
+      jsonObj.get(VIRTUAL_CORES).getAsInt(),
+      memoryMB,
+      // For backward compatibility when a newer Twill client re-attached to running app started with older version.
+      jsonObj.has(MAX_HEAP_MEMORY_MB) ? jsonObj.get(MAX_HEAP_MEMORY_MB).getAsInt() : memoryMB,
+      jsonObj.get(HOST).getAsString(),
+      jsonObj.has(DEBUG_PORT) ? jsonObj.get(DEBUG_PORT).getAsInt() : null,
+      logLevels);
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
index 5ff05e8..710a9f7 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
@@ -39,18 +39,20 @@ import java.util.Map;
 final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntimeSpecification>,
                                                          JsonDeserializer<TwillRuntimeSpecification> {
 
+  private static final Type MAP_STRING_MAP_STRING_STRING_TYPE =
+    new TypeToken<Map<String, Map<String, String>>>() { }.getType();
+
   private static final String FS_USER = "fsUser";
   private static final String TWILL_APP_DIR = "twillAppDir";
   private static final String ZK_CONNECT_STR = "zkConnectStr";
   private static final String TWILL_RUNID = "twillRunId";
   private static final String TWILL_APP_NAME = "twillAppName";
-  private static final String RESERVED_MEMORY = "reservedMemory";
-  private static final String HEAP_RESERVED_MIN_RATIO = "minHeapRatio";
   private static final String RM_SCHEDULER_ADDR = "rmSchedulerAddr";
   private static final String TWILL_SPEC = "twillSpecification";
   private static final String LOG_LEVELS = "logLevels";
   private static final String MAX_RETRIES = "maxRetries";
-  private static final String LOG_COLLECTION_ENABLED = "logCollectionEnabled";
+  private static final String CONFIG = "config";
+  private static final String RUNNABLE_CONFIGS = "runnableConfigs";
 
   @Override
   public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
@@ -60,19 +62,19 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
     json.addProperty(ZK_CONNECT_STR, src.getZkConnectStr());
     json.addProperty(TWILL_RUNID, src.getTwillAppRunId().getId());
     json.addProperty(TWILL_APP_NAME, src.getTwillAppName());
-    json.addProperty(RESERVED_MEMORY, src.getReservedMemory());
-    json.addProperty(HEAP_RESERVED_MIN_RATIO, src.getMinHeapRatio());
     if (src.getRmSchedulerAddr() != null) {
       json.addProperty(RM_SCHEDULER_ADDR, src.getRmSchedulerAddr());
     }
     json.add(TWILL_SPEC,
              context.serialize(src.getTwillSpecification(), new TypeToken<TwillSpecification>() { }.getType()));
     json.add(LOG_LEVELS,
-             context.serialize(src.getLogLevels(), new TypeToken<Map<String, Map<String, String>>>() { }.getType()));
+             context.serialize(src.getLogLevels(), MAP_STRING_MAP_STRING_STRING_TYPE));
     json.add(MAX_RETRIES,
              context.serialize(src.getMaxRetries(), new TypeToken<Map<String, Integer>>() { }.getType()));
-    json.addProperty(LOG_COLLECTION_ENABLED, src.isLogCollectionEnabled());
-
+    json.add(CONFIG,
+             context.serialize(src.getConfig(), new TypeToken<Map<String, String>>() { }.getType()));
+    json.add(RUNNABLE_CONFIGS,
+             context.serialize(src.getRunnableConfigs(), MAP_STRING_MAP_STRING_STRING_TYPE));
     return json;
   }
 
@@ -84,9 +86,13 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
     TwillSpecification twillSpecification = context.deserialize(
       jsonObj.get(TWILL_SPEC), new TypeToken<TwillSpecification>() { }.getType());
     Map<String, Map<String, String>> logLevels =
-      context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, Map<String, String>>>() { }.getType());
+      context.deserialize(jsonObj.get(LOG_LEVELS), MAP_STRING_MAP_STRING_STRING_TYPE);
     Map<String, Integer> maxRetries = 
       context.deserialize(jsonObj.get(MAX_RETRIES), new TypeToken<Map<String, Integer>>() { }.getType());
+    Map<String, String> config =
+      context.deserialize(jsonObj.get(CONFIG), new TypeToken<Map<String, String>>() { }.getType());
+    Map<String, Map<String, String>> runnableConfigs =
+      context.deserialize(jsonObj.get(RUNNABLE_CONFIGS), MAP_STRING_MAP_STRING_STRING_TYPE);
     
     return new TwillRuntimeSpecification(twillSpecification,
                                          jsonObj.get(FS_USER).getAsString(),
@@ -94,12 +100,11 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
                                          jsonObj.get(ZK_CONNECT_STR).getAsString(),
                                          RunIds.fromString(jsonObj.get(TWILL_RUNID).getAsString()),
                                          jsonObj.get(TWILL_APP_NAME).getAsString(),
-                                         jsonObj.get(RESERVED_MEMORY).getAsInt(),
                                          jsonObj.has(RM_SCHEDULER_ADDR) ?
                                          jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null,
                                          logLevels,
                                          maxRetries,
-                                         jsonObj.get(HEAP_RESERVED_MIN_RATIO).getAsDouble(),
-                                         jsonObj.get(LOG_COLLECTION_ENABLED).getAsBoolean());
+                                         config,
+                                         runnableConfigs);
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 523ffce..a2ebf7b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -71,6 +71,7 @@ import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
 import org.apache.twill.internal.state.Message;
 import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.internal.utils.Instances;
+import org.apache.twill.internal.utils.Resources;
 import org.apache.twill.internal.yarn.AbstractYarnTwillService;
 import org.apache.twill.internal.yarn.YarnAMClient;
 import org.apache.twill.internal.yarn.YarnContainerInfo;
@@ -127,8 +128,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
   private final ExpectedContainers expectedContainers;
   private final YarnAMClient amClient;
   private final JvmOptions jvmOpts;
-  private final int reservedMemory;
-  private final double minHeapRatio;
   private final EventHandler eventHandler;
   private final Location applicationLocation;
   private final PlacementPolicyManager placementPolicyManager;
@@ -151,8 +150,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
     this.amClient = amClient;
     this.credentials = createCredentials();
     this.jvmOpts = loadJvmOptions();
-    this.reservedMemory = twillRuntimeSpec.getReservedMemory();
-    this.minHeapRatio = twillRuntimeSpec.getMinHeapRatio();
     this.twillSpec = twillRuntimeSpec.getTwillSpecification();
     this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies());
     this.environments = getEnvironments();
@@ -198,11 +195,18 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
   private RunningContainers createRunningContainers(ContainerId appMasterContainerId,
                                                     String appMasterHost) throws Exception {
+    int containerMemoryMB = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB));
+
+    // We can't get the -Xmx easily, so just recompute the -Xmx in the same way that the client does
+    int maxHeapMemoryMB = Resources.computeMaxHeapSize(containerMemoryMB,
+                                                       twillRuntimeSpec.getAMReservedMemory(),
+                                                       twillRuntimeSpec.getAMMinHeapRatio());
     TwillRunResources appMasterResources = new DefaultTwillRunResources(
       0,
       appMasterContainerId.toString(),
       Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES)),
-      Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB)),
+      containerMemoryMB,
+      maxHeapMemoryMB,
       appMasterHost, null);
     String appId = appMasterContainerId.getApplicationAttemptId().getApplicationId().toString();
     return new RunningContainers(appId, appMasterResources, zkClient, applicationLocation,
@@ -667,7 +671,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       TwillContainerLauncher launcher = new TwillContainerLauncher(
         twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
         ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
-        containerCount, jvmOpts, reservedMemory, getSecureStoreLocation(), minHeapRatio);
+        containerCount, jvmOpts, twillRuntimeSpec.getReservedMemory(runnableName), getSecureStoreLocation(),
+        twillRuntimeSpec.getMinHeapRatio(runnableName));
 
       runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);
 

http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index a950c46..c85c372 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -163,9 +163,9 @@ final class RunningContainers {
                                                                  containerInfo.getId(),
                                                                  containerInfo.getVirtualCores(),
                                                                  containerInfo.getMemoryMB(),
+                                                                 launcher.getMaxHeapMemoryMB(),
                                                                  containerInfo.getHost().getHostName(),
                                                                  controller);
-
       resourceReport.addRunResources(runnableName, resources);
       containerStats.put(runnableName, containerInfo);
 
@@ -734,9 +734,9 @@ final class RunningContainers {
     private Integer dynamicDebugPort = null;
 
     private DynamicTwillRunResources(int instanceId, String containerId,
-                                     int cores, int memoryMB, String host,
+                                     int cores, int memoryMB, int maxHeapMemoryMB, String host,
                                      TwillContainerController controller) {
-      super(instanceId, containerId, cores, memoryMB, host, null);
+      super(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, null);
       this.controller = controller;
     }
 

http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 52e18eb..5442fa0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -147,6 +147,7 @@ final class YarnTwillPreparer implements TwillPreparer {
   private final Map<String, Map<String, String>> logLevels = Maps.newHashMap();
   private final LocationCache locationCache;
   private final Map<String, Integer> maxRetries = Maps.newHashMap();
+  private final Map<String, Map<String, String>> runnableConfigs = Maps.newHashMap();
   private String schedulerQueue;
   private String extraOptions;
   private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG;
@@ -183,6 +184,13 @@ final class YarnTwillPreparer implements TwillPreparer {
   }
 
   @Override
+  public TwillPreparer withConfiguration(String runnableName, Map<String, String> config) {
+    confirmRunnableName(runnableName);
+    runnableConfigs.put(runnableName, Maps.newHashMap(config));
+    return this;
+  }
+
+  @Override
   public TwillPreparer addLogHandler(LogHandler handler) {
     logHandlers.add(handler);
     return this;
@@ -382,10 +390,11 @@ final class YarnTwillPreparer implements TwillPreparer {
             createApplicationJar(createBundler(classAcceptor), localFiles);
             createResourcesJar(createBundler(classAcceptor), localFiles);
 
+            TwillRuntimeSpecification twillRuntimeSpec;
             Path runtimeConfigDir = Files.createTempDirectory(getLocalStagingDir().toPath(),
                                                               Constants.Files.RUNTIME_CONFIG_JAR);
             try {
-              saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
+              twillRuntimeSpec = saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
               saveLogback(runtimeConfigDir.resolve(Constants.Files.LOGBACK_TEMPLATE));
               saveClassPaths(runtimeConfigDir);
               saveJvmOptions(extraOptions, debugOptions, runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS));
@@ -405,10 +414,9 @@ final class YarnTwillPreparer implements TwillPreparer {
             //     appMaster.jar
             //     org.apache.twill.internal.appmaster.ApplicationMasterMain
             //     false
-
-            int reservedMemoryMB = config.getInt(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB,
-                                                 Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB);
-            int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), reservedMemoryMB, getMinHeapRatio());
+            int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
+                                                      twillRuntimeSpec.getAMReservedMemory(),
+                                                      twillRuntimeSpec.getAMMinHeapRatio());
             return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(),
                                           createSubmissionCredentials())
               .addCommand(
@@ -439,22 +447,6 @@ final class YarnTwillPreparer implements TwillPreparer {
   }
 
   /**
-   * Returns the minimum heap ratio based on the configuration.
-   */
-  private double getMinHeapRatio() {
-    // doing this way to support hadoop-2.0 profile
-    String minHeapRatioStr = config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO);
-    return (minHeapRatioStr == null) ? Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr);
-  }
-
-  /**
-   * Returns the reserved memory size in MB based on the configuration.
-   */
-  private int getReservedMemory() {
-    return config.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB, Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
-  }
-
-  /**
    * Returns the local staging directory based on the configuration.
    */
   private File getLocalStagingDir() {
@@ -670,7 +662,7 @@ final class YarnTwillPreparer implements TwillPreparer {
     return localFiles;
   }
 
-  private void saveSpecification(TwillSpecification spec, Path targetFile) throws IOException {
+  private TwillRuntimeSpecification saveSpecification(TwillSpecification spec, Path targetFile) throws IOException {
     final Multimap<String, LocalFile> runnableLocalFiles = populateRunnableLocalFiles(spec);
 
     // Rewrite LocalFiles inside twillSpec
@@ -692,15 +684,20 @@ final class YarnTwillPreparer implements TwillPreparer {
       }
       TwillSpecification newTwillSpec = new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(),
                                                                       spec.getPlacementPolicies(), eventHandler);
-      boolean logCollectionEnabled = config.getBoolean(Configs.Keys.LOG_COLLECTION_ENABLED,
-                                                       Configs.Defaults.LOG_COLLECTION_ENABLED);
-      TwillRuntimeSpecificationAdapter.create().toJson(
-        new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(),
-                                      appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
-                                      getReservedMemory(), config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
-                                      logLevels, maxRetries, getMinHeapRatio(), logCollectionEnabled), writer);
+      Map<String, String> configMap = Maps.newHashMap();
+      for (Map.Entry<String, String> entry : config) {
+        configMap.put(entry.getKey(), entry.getValue());
+      }
+
+      TwillRuntimeSpecification twillRuntimeSpec = new TwillRuntimeSpecification(
+        newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(),
+        appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
+        config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
+        logLevels, maxRetries, configMap, runnableConfigs);
+      TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec, writer);
+      LOG.debug("Done {}", targetFile);
+      return twillRuntimeSpec;
     }
-    LOG.debug("Done {}", targetFile);
   }
 
   private void saveLogback(Path targetFile) throws IOException {

http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index f5143ce..73f1476 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -25,17 +25,19 @@ import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.ResourceSpecification;
 import org.apache.twill.api.TwillApplication;
 import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
 import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.api.logging.PrinterLogHandler;
 import org.apache.twill.discovery.ServiceDiscovered;
+import org.apache.twill.internal.utils.Resources;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.PrintWriter;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -67,9 +69,16 @@ public class ContainerSizeTestRun extends BaseYarnTest {
   @Test
   public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException {
     TwillRunner runner = getTwillRunner();
+    String runnableName = "sleep";
+
     TwillController controller = runner.prepare(new MaxHeapApp())
-      // Alter the AM container size
-      .withConfiguration(Collections.singletonMap(Configs.Keys.YARN_AM_MEMORY_MB, "256"))
+      // Alter the AM container size and heap ratio
+      .withConfiguration(ImmutableMap.of(Configs.Keys.YARN_AM_MEMORY_MB, "256",
+                                         Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.65"))
+      // Use a different heap ratio and reserved memory size for the runnable
+      .withConfiguration(runnableName,
+                         ImmutableMap.of(Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.8",
+                                         Configs.Keys.JAVA_RESERVED_MEMORY_MB, "1024"))
       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
       .start();
 
@@ -77,10 +86,20 @@ public class ContainerSizeTestRun extends BaseYarnTest {
       ServiceDiscovered discovered = controller.discoverService("sleep");
       Assert.assertTrue(waitForSize(discovered, 1, 120));
 
-      // Verify the AM container size
+      // Verify the AM container size and heap size
       ResourceReport resourceReport = controller.getResourceReport();
       Assert.assertNotNull(resourceReport);
       Assert.assertEquals(256, resourceReport.getAppMasterResources().getMemoryMB());
+      Assert.assertEquals(Resources.computeMaxHeapSize(256, Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB, 0.65d),
+                          resourceReport.getAppMasterResources().getMaxHeapMemoryMB());
+
+      // Verify the runnable container heap size
+      Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources(runnableName);
+      Assert.assertFalse(runnableResources.isEmpty());
+      TwillRunResources resources = runnableResources.iterator().next();
+      Assert.assertEquals(Resources.computeMaxHeapSize(resources.getMemoryMB(), 1024, 0.8d),
+                          resources.getMaxHeapMemoryMB());
+
     } finally {
       controller.terminate().get(120, TimeUnit.SECONDS);
     }


[5/5] twill git commit: Only copy twill configurations into the runtime spec.

Posted by ch...@apache.org.
Only copy twill configurations into the runtime spec.

Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6ccc7158
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6ccc7158
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6ccc7158

Branch: refs/heads/feature/TWILL-241-per-runnable-opts
Commit: 6ccc7158e13a0b7182197cc35c91d96fc9947541
Parents: 577e6c7
Author: Terence Yim <ch...@apache.org>
Authored: Fri Aug 4 21:16:00 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Aug 4 21:16:00 2017 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/6ccc7158/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index c2b0ee5..0eba62b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -698,7 +698,9 @@ final class YarnTwillPreparer implements TwillPreparer {
                                                                       spec.getPlacementPolicies(), eventHandler);
       Map<String, String> configMap = Maps.newHashMap();
       for (Map.Entry<String, String> entry : config) {
-        configMap.put(entry.getKey(), entry.getValue());
+        if (entry.getKey().startsWith("twill.")) {
+          configMap.put(entry.getKey(), entry.getValue());
+        }
       }
 
       TwillRuntimeSpecification twillRuntimeSpec = new TwillRuntimeSpecification(


[4/5] twill git commit: Fix check style issue.

Posted by ch...@apache.org.
Fix check style issue.

Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/577e6c7c
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/577e6c7c
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/577e6c7c

Branch: refs/heads/feature/TWILL-241-per-runnable-opts
Commit: 577e6c7cf86abd576a3ac676faba96165c44d146
Parents: f9d5a59
Author: Terence Yim <ch...@apache.org>
Authored: Fri Aug 4 19:01:33 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Aug 4 19:01:33 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/twill/internal/DefaultTwillRunResources.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/577e6c7c/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
index f05074e..6f8a052 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
@@ -45,7 +45,8 @@ public class DefaultTwillRunResources implements TwillRunResources {
    */
   public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB,
                                   String host, Integer debugPort) {
-    this(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, debugPort, Collections.<String, Level>emptyMap());
+    this(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, debugPort,
+         Collections.<String, Level>emptyMap());
   }
 
   public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB,


[2/5] twill git commit: (TWILL-241) Added support for per runnable JVM options - Also removed JvmOptionsCodec since JvmOptions only uses simple types

Posted by ch...@apache.org.
(TWILL-241) Added support for per runnable JVM options
- Also removed JvmOptionsCodec since JvmOptions only uses simple types


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/29a7999f
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/29a7999f
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/29a7999f

Branch: refs/heads/feature/TWILL-241-per-runnable-opts
Commit: 29a7999f45859996595287fb1c28b225a2564ed9
Parents: f1931de
Author: Terence Yim <ch...@apache.org>
Authored: Fri Aug 4 16:19:32 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Aug 4 18:06:33 2017 -0700

----------------------------------------------------------------------
 .../org/apache/twill/api/TwillPreparer.java     |  13 +++
 .../org/apache/twill/internal/JvmOptions.java   |  17 ++-
 .../twill/internal/TwillContainerLauncher.java  |  10 +-
 .../twill/internal/json/JvmOptionsCodec.java    | 111 -------------------
 .../internal/json/JvmOptionsCodecTest.java      | 107 ------------------
 .../appmaster/ApplicationMasterService.java     |  18 +--
 .../apache/twill/yarn/YarnTwillPreparer.java    |  73 +++++++-----
 .../twill/yarn/CustomClassLoaderRunnable.java   |   3 +-
 .../twill/yarn/CustomClassLoaderTestRun.java    |   3 +-
 .../apache/twill/yarn/JvmOptionsTestRun.java    | 103 +++++++++++++++++
 .../org/apache/twill/yarn/YarnTestSuite.java    |   1 +
 11 files changed, 195 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index 35930d2..812a086 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -87,6 +87,19 @@ public interface TwillPreparer {
   TwillPreparer setJVMOptions(String options);
 
   /**
+   * This methods sets the extra JVM options that will be passed to the java command line for the given runnable
+   * of the application started through this {@link org.apache.twill.api.TwillPreparer} instance.
+   * The options set for the given runnable will be appended to any global options set through the
+   * {@link #setJVMOptions(String)} or {@link #addJVMOptions(String)} method.
+   *
+   * This is intended for advance usage. All options will be passed unchanged to the java command line. Invalid
+   * options could cause application not able to start.
+   *
+   * @param options extra JVM options.
+   */
+  TwillPreparer setJVMOptions(String runnableName, String options);
+
+  /**
    * This methods adds extra JVM options that will be passed to the java command line for every runnable
    * of the application started through this {@link org.apache.twill.api.TwillPreparer} instance.
    *

http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java b/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java
index 945561b..6e35c6c 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.primitives.Booleans;
 
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -30,17 +31,29 @@ import java.util.Set;
 public final class JvmOptions {
 
   private final String extraOptions;
+  private final Map<String, String> runnableExtraOptions;
   private final DebugOptions debugOptions;
 
-  public JvmOptions(String extraOptions, DebugOptions debugOptions) {
+  public JvmOptions(String extraOptions, Map<String, String> runnableExtraOptions, DebugOptions debugOptions) {
     this.extraOptions = extraOptions;
+    this.runnableExtraOptions = runnableExtraOptions;
     this.debugOptions = debugOptions;
   }
 
-  public String getExtraOptions() {
+  /**
+   * Returns the extra options for the application master.
+   */
+  public String getAMExtraOptions() {
     return extraOptions;
   }
 
+  /**
+   * Returns the extra options for the given runnable.
+   */
+  public String getRunnableExtraOptions(String runnableName) {
+    return runnableExtraOptions.containsKey(runnableName) ? runnableExtraOptions.get(runnableName) : extraOptions;
+  }
+
   public DebugOptions getDebugOptions() {
     return debugOptions;
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 700c0f1..0b98ba6 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -65,8 +65,9 @@ public final class TwillContainerLauncher {
 
   public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo,
                                 ProcessLauncher.PrepareLaunchContext launchContext,
-                                ZKClient zkClient, int instanceCount, JvmOptions jvmOpts, int reservedMemory,
-                                Location secureStoreLocation, double minHeapRatio) {
+                                ZKClient zkClient, int instanceCount, JvmOptions jvmOpts,
+                                int reservedMemory, double minHeapRatio,
+                                Location secureStoreLocation) {
     this.runtimeSpec = runtimeSpec;
     this.containerInfo = containerInfo;
     this.launchContext = launchContext;
@@ -151,8 +152,9 @@ public final class TwillContainerLauncher {
                        "-Dtwill.runnable=$" + Constants.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,
                        "-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
                        "-Xmx" + maxHeapSizeMB + "m");
-    if (jvmOpts.getExtraOptions() != null) {
-      commandBuilder.add(jvmOpts.getExtraOptions());
+    String extraOptions = jvmOpts.getRunnableExtraOptions(runtimeSpec.getName());
+    if (!extraOptions.isEmpty()) {
+      commandBuilder.add(extraOptions);
     }
     commandBuilder.add(TwillLauncher.class.getName(),
                        mainClass.getName(),

http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java
deleted file mode 100644
index 807840f..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.io.InputSupplier;
-import com.google.common.io.OutputSupplier;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-import org.apache.twill.internal.JvmOptions;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.io.Writer;
-import java.lang.reflect.Type;
-import java.util.Set;
-
-/**
- * Gson codec for {@link JvmOptions}.
- */
-public class JvmOptionsCodec implements JsonSerializer<JvmOptions>, JsonDeserializer<JvmOptions> {
-
-  private static final Gson GSON = new GsonBuilder().registerTypeAdapter(JvmOptions.class, new JvmOptionsCodec())
-                                                    .registerTypeAdapter(JvmOptions.DebugOptions.class,
-                                                                         new DebugOptionsCodec())
-                                                    .create();
-
-  public static void encode(JvmOptions jvmOptions, OutputSupplier<? extends Writer> writerSupplier) throws IOException {
-    try (Writer writer = writerSupplier.getOutput()) {
-      GSON.toJson(jvmOptions, writer);
-    }
-  }
-
-  public static JvmOptions decode(InputSupplier<? extends Reader> readerSupplier) throws IOException {
-    try (Reader reader = readerSupplier.getInput()) {
-      return GSON.fromJson(reader, JvmOptions.class);
-    }
-  }
-
-  @Override
-  public JvmOptions deserialize(JsonElement json, Type type, JsonDeserializationContext context)
-    throws JsonParseException {
-    JsonObject jsonObj = json.getAsJsonObject();
-    String extraOptions = context.deserialize(jsonObj.get("extraOptions"), String.class);
-    JvmOptions.DebugOptions debugOptions = context.deserialize(jsonObj.get("debugOptions"),
-                                                               JvmOptions.DebugOptions.class);
-    return new JvmOptions(extraOptions, debugOptions);
-  }
-
-  @Override
-  public JsonElement serialize(JvmOptions jvmOptions, Type type, JsonSerializationContext context) {
-    JsonObject json = new JsonObject();
-    json.add("extraOptions", context.serialize(jvmOptions.getExtraOptions()));
-    json.add("debugOptions", context.serialize(jvmOptions.getDebugOptions()));
-    return json;
-  }
-
-  private static class DebugOptionsCodec
-    implements JsonSerializer<JvmOptions.DebugOptions>, JsonDeserializer<JvmOptions.DebugOptions> {
-
-    @Override
-    public JvmOptions.DebugOptions deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
-      throws JsonParseException {
-      JsonObject jsonObj = json.getAsJsonObject();
-      Boolean doDebug = context.deserialize(jsonObj.get("doDebug"), Boolean.class);
-      if (!doDebug) {
-        return JvmOptions.DebugOptions.NO_DEBUG;
-      }
-      Boolean doSuspend = context.deserialize(jsonObj.get("doSuspend"), Boolean.class);
-      Set<String> runnables = context.deserialize(jsonObj.get("runnables"),
-                                                  new TypeToken<Set<String>>() { }.getType());
-      return new JvmOptions.DebugOptions(true, doSuspend, runnables == null ? null : ImmutableSet.copyOf(runnables));
-    }
-
-    @Override
-    public JsonElement serialize(JvmOptions.DebugOptions src, Type typeOfSrc, JsonSerializationContext context) {
-      JsonObject json = new JsonObject();
-      json.add("doDebug", context.serialize(src.doDebug()));
-      json.add("doSuspend", context.serialize(src.doSuspend()));
-      if (src.getRunnables() != null) {
-        json.add("runnables", context.serialize(src.getRunnables()));
-      }
-      return json;
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java b/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java
deleted file mode 100644
index 2791e72..0000000
--- a/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.io.InputSupplier;
-import com.google.common.io.OutputSupplier;
-import org.apache.twill.internal.JvmOptions;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Tests the JvmOptions Codec.
- */
-public class JvmOptionsCodecTest {
-
-  @Test
-  public void testNoNulls() throws Exception {
-    JvmOptions options = new JvmOptions("-version",
-                                        new JvmOptions.DebugOptions(true, false, ImmutableSet.of("one", "two")));
-    final StringWriter writer = new StringWriter();
-    JvmOptionsCodec.encode(options, new OutputSupplier<Writer>() {
-      @Override
-      public Writer getOutput() throws IOException {
-        return writer;
-      }
-    });
-    JvmOptions options1 = JvmOptionsCodec.decode(new InputSupplier<Reader>() {
-      @Override
-      public Reader getInput() throws IOException {
-        return new StringReader(writer.toString());
-      }
-    });
-    Assert.assertEquals(options.getExtraOptions(), options1.getExtraOptions());
-    Assert.assertEquals(options.getDebugOptions().doDebug(), options1.getDebugOptions().doDebug());
-    Assert.assertEquals(options.getDebugOptions().doSuspend(), options1.getDebugOptions().doSuspend());
-    Assert.assertEquals(options.getDebugOptions().getRunnables(), options1.getDebugOptions().getRunnables());
-  }
-
-  @Test
-  public void testSomeNulls() throws Exception {
-    JvmOptions options = new JvmOptions(null, new JvmOptions.DebugOptions(false, false, null));
-    final StringWriter writer = new StringWriter();
-    JvmOptionsCodec.encode(options, new OutputSupplier<Writer>() {
-      @Override
-      public Writer getOutput() throws IOException {
-        return writer;
-      }
-    });
-    JvmOptions options1 = JvmOptionsCodec.decode(new InputSupplier<Reader>() {
-      @Override
-      public Reader getInput() throws IOException {
-        return new StringReader(writer.toString());
-      }
-    });
-    Assert.assertEquals(options.getExtraOptions(), options1.getExtraOptions());
-    Assert.assertEquals(options.getDebugOptions().doDebug(), options1.getDebugOptions().doDebug());
-    Assert.assertEquals(options.getDebugOptions().doSuspend(), options1.getDebugOptions().doSuspend());
-    Assert.assertEquals(options.getDebugOptions().getRunnables(), options1.getDebugOptions().getRunnables());
-  }
-
-  @Test
-  public void testNoRunnables() throws Exception {
-    List<String> noRunnables = Collections.emptyList();
-    JvmOptions options = new JvmOptions(null, new JvmOptions.DebugOptions(true, false, noRunnables));
-    final StringWriter writer = new StringWriter();
-    JvmOptionsCodec.encode(options, new OutputSupplier<Writer>() {
-      @Override
-      public Writer getOutput() throws IOException {
-        return writer;
-      }
-    });
-    JvmOptions options1 = JvmOptionsCodec.decode(new InputSupplier<Reader>() {
-      @Override
-      public Reader getInput() throws IOException {
-        return new StringReader(writer.toString());
-      }
-    });
-    Assert.assertEquals(options.getExtraOptions(), options1.getExtraOptions());
-    Assert.assertEquals(options.getDebugOptions().doDebug(), options1.getDebugOptions().doDebug());
-    Assert.assertEquals(options.getDebugOptions().doSuspend(), options1.getDebugOptions().doSuspend());
-    Assert.assertEquals(options.getDebugOptions().getRunnables(), options1.getDebugOptions().getRunnables());
-  }
-}

http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index a2ebf7b..4917f4d 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -30,7 +30,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Ranges;
 import com.google.common.collect.Sets;
-import com.google.common.io.InputSupplier;
 import com.google.common.reflect.TypeToken;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -65,7 +64,6 @@ import org.apache.twill.internal.JvmOptions;
 import org.apache.twill.internal.ProcessLauncher;
 import org.apache.twill.internal.TwillContainerLauncher;
 import org.apache.twill.internal.TwillRuntimeSpecification;
-import org.apache.twill.internal.json.JvmOptionsCodec;
 import org.apache.twill.internal.json.LocalFileCodec;
 import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
 import org.apache.twill.internal.state.Message;
@@ -84,7 +82,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileReader;
 import java.io.IOException;
 import java.io.Reader;
 import java.nio.charset.StandardCharsets;
@@ -167,14 +164,11 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
   private JvmOptions loadJvmOptions() throws IOException {
     final File jvmOptsFile = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.JVM_OPTIONS);
     if (!jvmOptsFile.exists()) {
-      return new JvmOptions(null, JvmOptions.DebugOptions.NO_DEBUG);
+      return new JvmOptions("", Collections.<String, String>emptyMap(), JvmOptions.DebugOptions.NO_DEBUG);
+    }
+    try (Reader reader = Files.newBufferedReader(jvmOptsFile.toPath(), StandardCharsets.UTF_8)) {
+      return GSON.fromJson(reader, JvmOptions.class);
     }
-    return JvmOptionsCodec.decode(new InputSupplier<Reader>() {
-      @Override
-      public Reader getInput() throws IOException {
-        return new FileReader(jvmOptsFile);
-      }
-    });
   }
 
   @SuppressWarnings("unchecked")
@@ -671,8 +665,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       TwillContainerLauncher launcher = new TwillContainerLauncher(
         twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
         ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
-        containerCount, jvmOpts, twillRuntimeSpec.getReservedMemory(runnableName), getSecureStoreLocation(),
-        twillRuntimeSpec.getMinHeapRatio(runnableName));
+        containerCount, jvmOpts, twillRuntimeSpec.getReservedMemory(runnableName),
+        twillRuntimeSpec.getMinHeapRatio(runnableName), getSecureStoreLocation());
 
       runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);
 

http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 5442fa0..c2b0ee5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -75,7 +75,6 @@ import org.apache.twill.internal.appmaster.ApplicationMasterMain;
 import org.apache.twill.internal.container.TwillContainerMain;
 import org.apache.twill.internal.io.LocationCache;
 import org.apache.twill.internal.json.ArgumentsCodec;
-import org.apache.twill.internal.json.JvmOptionsCodec;
 import org.apache.twill.internal.json.LocalFileCodec;
 import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
 import org.apache.twill.internal.utils.Dependencies;
@@ -114,6 +113,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
+import javax.annotation.Nullable;
 
 /**
  * Implementation for {@link TwillPreparer} to prepare and launch distributed application on Hadoop YARN.
@@ -148,14 +148,15 @@ final class YarnTwillPreparer implements TwillPreparer {
   private final LocationCache locationCache;
   private final Map<String, Integer> maxRetries = Maps.newHashMap();
   private final Map<String, Map<String, String>> runnableConfigs = Maps.newHashMap();
-  private String schedulerQueue;
+  private final Map<String, String> runnableExtraOptions = Maps.newHashMap();
   private String extraOptions;
   private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG;
+  private String schedulerQueue;
   private ClassAcceptor classAcceptor;
   private String classLoaderClassName;
 
   YarnTwillPreparer(Configuration config, TwillSpecification twillSpec, RunId runId,
-                    String zkConnectString, Location appLocation, String extraOptions,
+                    String zkConnectString, Location appLocation, @Nullable String extraOptions,
                     LocationCache locationCache, YarnTwillControllerFactory controllerFactory) {
     this.config = config;
     this.twillSpec = twillSpec;
@@ -164,7 +165,7 @@ final class YarnTwillPreparer implements TwillPreparer {
     this.appLocation = appLocation;
     this.controllerFactory = controllerFactory;
     this.credentials = createCredentials();
-    this.extraOptions = extraOptions;
+    this.extraOptions = extraOptions == null ? "" : extraOptions;
     this.classAcceptor = new ClassAcceptor();
     this.locationCache = locationCache;
   }
@@ -209,13 +210,23 @@ final class YarnTwillPreparer implements TwillPreparer {
 
   @Override
   public TwillPreparer setJVMOptions(String options) {
+    Preconditions.checkArgument(options != null, "JVM options cannot be null.");
     this.extraOptions = options;
     return this;
   }
 
   @Override
+  public TwillPreparer setJVMOptions(String runnableName, String options) {
+    confirmRunnableName(runnableName);
+    Preconditions.checkArgument(options != null, "JVM options cannot be null.");
+    runnableExtraOptions.put(runnableName, options);
+    return this;
+  }
+
+  @Override
   public TwillPreparer addJVMOptions(String options) {
-    this.extraOptions = extraOptions == null ? options : extraOptions + " " + options;
+    Preconditions.checkArgument(options != null, "JVM options cannot be null.");
+    this.extraOptions = extraOptions.isEmpty() ? options : extraOptions + " " + options;
     return this;
   }
 
@@ -226,6 +237,9 @@ final class YarnTwillPreparer implements TwillPreparer {
 
   @Override
   public TwillPreparer enableDebugging(boolean doSuspend, String... runnables) {
+    for (String runnableName : runnables) {
+      confirmRunnableName(runnableName);
+    }
     this.debugOptions = new JvmOptions.DebugOptions(true, doSuspend, ImmutableSet.copyOf(runnables));
     return this;
   }
@@ -379,9 +393,6 @@ final class YarnTwillPreparer implements TwillPreparer {
         new Callable<ProcessController<YarnApplicationReport>>() {
           @Override
           public ProcessController<YarnApplicationReport> call() throws Exception {
-
-            String extraOptions = getExtraOptions();
-
             // Local files needed by AM
             Map<String, LocalFile> localFiles = Maps.newHashMap();
 
@@ -391,13 +402,14 @@ final class YarnTwillPreparer implements TwillPreparer {
             createResourcesJar(createBundler(classAcceptor), localFiles);
 
             TwillRuntimeSpecification twillRuntimeSpec;
+            JvmOptions jvmOptions;
             Path runtimeConfigDir = Files.createTempDirectory(getLocalStagingDir().toPath(),
                                                               Constants.Files.RUNTIME_CONFIG_JAR);
             try {
               twillRuntimeSpec = saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
               saveLogback(runtimeConfigDir.resolve(Constants.Files.LOGBACK_TEMPLATE));
               saveClassPaths(runtimeConfigDir);
-              saveJvmOptions(extraOptions, debugOptions, runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS));
+              jvmOptions = saveJvmOptions(runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS));
               saveArguments(new Arguments(arguments, runnableArgs),
                             runtimeConfigDir.resolve(Constants.Files.ARGUMENTS));
               saveEnvironments(runtimeConfigDir.resolve(Constants.Files.ENVIRONMENTS));
@@ -426,7 +438,7 @@ final class YarnTwillPreparer implements TwillPreparer {
                 "-Dtwill.app=$" + Constants.TWILL_APP_NAME,
                 "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
                 "-Xmx" + memory + "m",
-                extraOptions,
+                jvmOptions.getAMExtraOptions(),
                 TwillLauncher.class.getName(),
                 ApplicationMasterMain.class.getName(),
                 Boolean.FALSE.toString())
@@ -456,12 +468,12 @@ final class YarnTwillPreparer implements TwillPreparer {
   /**
    * Returns the extra options for the container JVM.
    */
-  private String getExtraOptions() {
-    String extraOptions = this.extraOptions == null ? "" : this.extraOptions;
-    if (classLoaderClassName != null) {
-      extraOptions += " -D" + Constants.TWILL_CONTAINER_CLASSLOADER + "=" + classLoaderClassName;
+  private String addClassLoaderClassName(String extraOptions) {
+    if (classLoaderClassName == null) {
+      return extraOptions;
     }
-    return extraOptions;
+    String classLoaderProperty = "-D" + Constants.TWILL_CONTAINER_CLASSLOADER + "=" + classLoaderClassName;
+    return extraOptions.isEmpty() ? classLoaderProperty : " " + classLoaderProperty;
   }
 
   private void setEnv(String runnableName, Map<String, String> env, boolean overwrite) {
@@ -759,20 +771,31 @@ final class YarnTwillPreparer implements TwillPreparer {
                 Joiner.on(':').join(classPaths).getBytes(StandardCharsets.UTF_8));
   }
 
-  private void saveJvmOptions(String extraOptions,
-                              JvmOptions.DebugOptions debugOptions, final Path targetPath) throws IOException {
-    if (extraOptions.isEmpty() && JvmOptions.DebugOptions.NO_DEBUG.equals(debugOptions)) {
+  private JvmOptions saveJvmOptions(final Path targetPath) throws IOException {
+    // Updates the extra options with the classloader name if necessary
+    final String globalOptions = addClassLoaderClassName(extraOptions);
+    // Append runnable specific extra options.
+    Map<String, String> runnableExtraOptions = Maps.newHashMap(
+      Maps.transformValues(this.runnableExtraOptions, new Function<String, String>() {
+        @Override
+        public String apply(String extraOptions) {
+          return globalOptions.isEmpty() ? extraOptions : globalOptions + " " + extraOptions;
+        }
+      }));
+
+    JvmOptions jvmOptions = new JvmOptions(globalOptions, runnableExtraOptions, debugOptions);
+    if (globalOptions.isEmpty() && runnableExtraOptions.isEmpty()
+      && JvmOptions.DebugOptions.NO_DEBUG.equals(debugOptions)) {
       // If no vm options, no need to localize the file.
-      return;
+      return jvmOptions;
     }
+
     LOG.debug("Creating {}", targetPath);
-    JvmOptionsCodec.encode(new JvmOptions(extraOptions, debugOptions), new OutputSupplier<Writer>() {
-      @Override
-      public Writer getOutput() throws IOException {
-        return Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8);
-      }
-    });
+    try (Writer writer = Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8)) {
+      new Gson().toJson(new JvmOptions(globalOptions, runnableExtraOptions, debugOptions), writer);
+    }
     LOG.debug("Done {}", targetPath);
+    return jvmOptions;
   }
 
   private void saveArguments(Arguments arguments, final Path targetPath) throws IOException {

http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java
index 66bcd42..591f931 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLatch;
  */
 public final class CustomClassLoaderRunnable extends AbstractTwillRunnable {
 
-  static final String SERVICE_NAME = "custom.service";
   static final String GENERATED_CLASS_NAME = "org.apache.twill.test.Generated";
 
   private static final Logger LOG = LoggerFactory.getLogger(CustomClassLoaderRunnable.class);
@@ -42,7 +41,7 @@ public final class CustomClassLoaderRunnable extends AbstractTwillRunnable {
     try {
       Class<?> cls = Class.forName(GENERATED_CLASS_NAME);
       java.lang.reflect.Method announce = cls.getMethod("announce", ServiceAnnouncer.class, String.class, int.class);
-      announce.invoke(cls.newInstance(), getContext(), SERVICE_NAME, 54321);
+      announce.invoke(cls.newInstance(), getContext(), System.getProperty("service.name"), 54321);
       Uninterruptibles.awaitUninterruptibly(stopLatch);
     } catch (Exception e) {
       LOG.error("Failed to call announce on " + GENERATED_CLASS_NAME, e);

http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java
index 0ac43a6..f0a75b2 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java
@@ -34,9 +34,10 @@ public class CustomClassLoaderTestRun extends BaseYarnTest {
     TwillController controller = getTwillRunner().prepare(new CustomClassLoaderRunnable())
       .setClassLoader(CustomClassLoader.class.getName())
       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .setJVMOptions(CustomClassLoaderRunnable.class.getSimpleName(), "-Dservice.name=custom")
       .start();
 
-    Assert.assertTrue(waitForSize(controller.discoverService(CustomClassLoaderRunnable.SERVICE_NAME), 1, 120));
+    Assert.assertTrue(waitForSize(controller.discoverService("custom"), 1, 120));
     controller.terminate().get();
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java
new file mode 100644
index 0000000..3ec49e2
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Unit test for testing extra JVM options setting for runnables.
+ */
+public class JvmOptionsTestRun extends BaseYarnTest {
+
+  @Test
+  public void testExtraOptions() throws InterruptedException, ExecutionException {
+    // Start the testing app with jvm options at both global level as well as for the specific runnables.
+    TwillController controller = getTwillRunner()
+      .prepare(new JvmOptionsApplication())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
+      .setJVMOptions("-Dservice.name=default")
+      .setJVMOptions("r2", "-Dservice.name=r2")
+      .start();
+
+    // For r1 and r3 will be using "default" as the service name.
+    waitForSize(controller.discoverService("default"), 2, 120);
+    // r2 will be use "r2" as the service name.
+    waitForSize(controller.discoverService("r2"), 1, 120);
+
+    controller.terminate().get();
+  }
+
+  /**
+   * Application for testing extra jvm options
+   */
+  public static final class JvmOptionsApplication implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName(JvmOptionsApplication.class.getSimpleName())
+        .withRunnable()
+          .add("r1", new SimpleRunnable()).noLocalFiles()
+          .add("r2", new SimpleRunnable()).noLocalFiles()
+          .add("r3", new SimpleRunnable()).noLocalFiles()
+        .anyOrder()
+        .build();
+    }
+  }
+
+  /**
+   * A runnable that simple announce itself to some name based on the system property and wait for stop signal.
+   */
+  public static final class SimpleRunnable extends AbstractTwillRunnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleRunnable.class);
+
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+    @Override
+    public void run() {
+      String runnableName = getContext().getSpecification().getName();
+      String serviceName = System.getProperty("service.name");
+      LOG.info("Announcing with name {} for runnable {}", serviceName, runnableName);
+
+      // Compute a unique port name based on runnable name (running names are r[0-9]+)
+      getContext().announce(serviceName, 12345 + Integer.parseInt(runnableName.substring(1)));
+      try {
+        stopLatch.await();
+      } catch (InterruptedException e) {
+        LOG.warn("Run thread interrupted", e);
+      }
+    }
+
+    @Override
+    public void stop() {
+      stopLatch.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index 0911a3d..0bb7fce 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -33,6 +33,7 @@ import org.junit.runners.Suite;
   EnvironmentTestRun.class,
   FailureRestartTestRun.class,
   InitializeFailTestRun.class,
+  JvmOptionsTestRun.class,
   LocalFileTestRun.class,
   LogHandlerTestRun.class,
   LogLevelChangeTestRun.class,


[3/5] twill git commit: Updated the config reading logic.

Posted by ch...@apache.org.
Updated the config reading logic.

Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/f9d5a596
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/f9d5a596
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/f9d5a596

Branch: refs/heads/feature/TWILL-241-per-runnable-opts
Commit: f9d5a596b0d99c0ca1830113d25d30f520f85bc9
Parents: 29a7999
Author: Terence Yim <ch...@apache.org>
Authored: Fri Aug 4 19:00:00 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Aug 4 19:00:00 2017 -0700

----------------------------------------------------------------------
 .../internal/TwillRuntimeSpecification.java     | 41 +++++++++++++-------
 .../apache/twill/yarn/ContainerSizeTestRun.java | 14 +++++--
 2 files changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/f9d5a596/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
index 636d94d..2974870 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
@@ -90,33 +90,30 @@ public class TwillRuntimeSpecification {
    * Returns the minimum heap ratio for the application master.
    */
   public double getAMMinHeapRatio() {
-    return getMinHeapRatio(config);
+    return getMinHeapRatio(config, Configs.Defaults.HEAP_RESERVED_MIN_RATIO);
   }
 
   /**
    * Returns the minimum heap ratio for the given runnable.
    */
   public double getMinHeapRatio(String runnableName) {
-    return getMinHeapRatio(runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config);
+    double ratio = getMinHeapRatio(runnableConfigs.get(runnableName), 0d);
+    return ratio <= 0d ? getMinHeapRatio(config, Configs.Defaults.HEAP_RESERVED_MIN_RATIO) : ratio;
   }
 
   /**
    * Returns the reserved non-heap memory size in MB for the application master.
    */
   public int getAMReservedMemory() {
-    return config.containsKey(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB) ?
-      Integer.parseInt(config.get(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB)) :
-      Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB;
+    return getReservedMemory(config, Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB);
   }
 
   /**
    * Returns the reserved non-heap memory size in MB for the given runnable.
    */
   public int getReservedMemory(String runnableName) {
-    Map<String, String> conf = runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config;
-    return conf.containsKey(Configs.Keys.JAVA_RESERVED_MEMORY_MB) ?
-      Integer.parseInt(conf.get(Configs.Keys.JAVA_RESERVED_MEMORY_MB)) :
-      Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
+    int memory = getReservedMemory(runnableConfigs.get(runnableName), -1);
+    return memory < 0 ? getReservedMemory(config, Configs.Defaults.JAVA_RESERVED_MEMORY_MB) : memory;
   }
 
   /**
@@ -171,9 +168,27 @@ public class TwillRuntimeSpecification {
   /**
    * Returns the minimum heap ratio ({@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}) based on the given configuration.
    */
-  private double getMinHeapRatio(Map<String, String> config) {
-    return config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO) ?
-      Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) :
-      Configs.Defaults.HEAP_RESERVED_MIN_RATIO;
+  private double getMinHeapRatio(@Nullable Map<String, String> config, double defaultValue) {
+    if (config == null) {
+      return defaultValue;
+    }
+
+    double ratio = config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO) ?
+      Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) : defaultValue;
+    // Ratio can't be <= 0
+    return ratio <= 0d ? defaultValue : ratio;
+  }
+
+  /**
+   * Returns the reserved memory size ({@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}) based on the given configuration.
+   */
+  private int getReservedMemory(@Nullable Map<String, String> config, int defaultValue) {
+    if (config == null) {
+      return defaultValue;
+    }
+    int memory = config.containsKey(Configs.Keys.JAVA_RESERVED_MEMORY_MB) ?
+      Integer.parseInt(config.get(Configs.Keys.JAVA_RESERVED_MEMORY_MB)) : defaultValue;
+    // memory size can't be -ve
+    return memory < 0 ? defaultValue : memory;
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/f9d5a596/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index 73f1476..10dcaf5 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -69,14 +69,13 @@ public class ContainerSizeTestRun extends BaseYarnTest {
   @Test
   public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException {
     TwillRunner runner = getTwillRunner();
-    String runnableName = "sleep";
 
     TwillController controller = runner.prepare(new MaxHeapApp())
       // Alter the AM container size and heap ratio
       .withConfiguration(ImmutableMap.of(Configs.Keys.YARN_AM_MEMORY_MB, "256",
                                          Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.65"))
       // Use a different heap ratio and reserved memory size for the runnable
-      .withConfiguration(runnableName,
+      .withConfiguration("sleep",
                          ImmutableMap.of(Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.8",
                                          Configs.Keys.JAVA_RESERVED_MEMORY_MB, "1024"))
       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
@@ -94,12 +93,20 @@ public class ContainerSizeTestRun extends BaseYarnTest {
                           resourceReport.getAppMasterResources().getMaxHeapMemoryMB());
 
       // Verify the runnable container heap size
-      Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources(runnableName);
+      Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources("sleep");
       Assert.assertFalse(runnableResources.isEmpty());
       TwillRunResources resources = runnableResources.iterator().next();
       Assert.assertEquals(Resources.computeMaxHeapSize(resources.getMemoryMB(), 1024, 0.8d),
                           resources.getMaxHeapMemoryMB());
 
+      // For the sleep2 runnable, we don't set any ratio and reserved memory.
+      // The ratio should get default to 0.65 (app) and reserved memory to 200
+      runnableResources = resourceReport.getRunnableResources("sleep2");
+      Assert.assertFalse(runnableResources.isEmpty());
+      resources = runnableResources.iterator().next();
+      Assert.assertEquals(
+        Resources.computeMaxHeapSize(resources.getMemoryMB(), Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB, 0.65d),
+        resources.getMaxHeapMemoryMB());
     } finally {
       controller.terminate().get(120, TimeUnit.SECONDS);
     }
@@ -181,6 +188,7 @@ public class ContainerSizeTestRun extends BaseYarnTest {
         .setName("MaxHeapApp")
         .withRunnable()
         .add("sleep", new MaxHeapRunnable(12345), res).noLocalFiles()
+        .add("sleep2", new MaxHeapRunnable(23456), res).noLocalFiles()
         .anyOrder()
         .build();
     }