You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2017/08/05 04:16:09 UTC
[1/5] twill git commit: (TWILL-241) Added support for per Runnable
configuration
Repository: twill
Updated Branches:
refs/heads/feature/TWILL-241-per-runnable-opts [created] 6ccc7158e
(TWILL-241) Added support for per Runnable configuration
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/f1931deb
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/f1931deb
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/f1931deb
Branch: refs/heads/feature/TWILL-241-per-runnable-opts
Commit: f1931deb128dda8b3c4da76486ab1b443318496e
Parents: dbbc2a3
Author: Terence Yim <ch...@apache.org>
Authored: Fri Aug 4 15:29:05 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Aug 4 18:06:26 2017 -0700
----------------------------------------------------------------------
.../org/apache/twill/api/TwillPreparer.java | 11 +++
.../org/apache/twill/api/TwillRunResources.java | 5 ++
.../internal/DefaultTwillRunResources.java | 13 +++-
.../twill/internal/TwillContainerLauncher.java | 19 ++++-
.../internal/TwillRuntimeSpecification.java | 77 ++++++++++++++++----
.../internal/json/TwillRunResourcesCodec.java | 20 +++--
.../json/TwillRuntimeSpecificationCodec.java | 29 +++++---
.../appmaster/ApplicationMasterService.java | 17 +++--
.../internal/appmaster/RunningContainers.java | 6 +-
.../apache/twill/yarn/YarnTwillPreparer.java | 57 +++++++--------
.../apache/twill/yarn/ContainerSizeTestRun.java | 27 ++++++-
11 files changed, 199 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index 1f50972..35930d2 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -39,6 +39,17 @@ public interface TwillPreparer {
TwillPreparer withConfiguration(Map<String, String> config);
/**
+ * Overrides the default configuration with the given set of configurations for the given runnable only.
+ * This is useful to override configurations that affects runnables, such as
+ * {@link Configs.Keys#JAVA_RESERVED_MEMORY_MB} and {@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}.
+ *
+ * @param runnableName Name of the {@link TwillRunnable}.
+ * @param config set of configurations to override
+ * @return This {@link TwillPreparer}
+ */
+ TwillPreparer withConfiguration(String runnableName, Map<String, String> config);
+
+ /**
* Adds a {@link LogHandler} for receiving an application log.
* @param handler The {@link LogHandler}.
* @return This {@link TwillPreparer}.
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
index f721f47..287b901 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
@@ -44,6 +44,11 @@ public interface TwillRunResources {
int getMemoryMB();
/**
+ * @return the maximum amount of memory in MB of Java process heap memory.
+ */
+ int getMaxHeapMemoryMB();
+
+ /**
* @return the host the runnable is running on.
*/
String getHost();
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
index 83b973a..f05074e 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
@@ -35,6 +35,7 @@ public class DefaultTwillRunResources implements TwillRunResources {
private final int instanceId;
private final int virtualCores;
private final int memoryMB;
+ private final int maxHeapMemoryMB;
private final String host;
private final Integer debugPort;
private final Map<String, LogEntry.Level> logLevels;
@@ -42,17 +43,18 @@ public class DefaultTwillRunResources implements TwillRunResources {
/**
* Constructor to create an instance of {@link DefaultTwillRunResources} with empty log levels.
*/
- public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB,
+ public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB,
String host, Integer debugPort) {
- this(instanceId, containerId, cores, memoryMB, host, debugPort, Collections.<String, Level>emptyMap());
+ this(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, debugPort, Collections.<String, Level>emptyMap());
}
- public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB,
+ public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB,
String host, Integer debugPort, Map<String, LogEntry.Level> logLevels) {
this.instanceId = instanceId;
this.containerId = containerId;
this.virtualCores = cores;
this.memoryMB = memoryMB;
+ this.maxHeapMemoryMB = maxHeapMemoryMB;
this.host = host;
this.debugPort = debugPort;
this.logLevels = new HashMap<>(logLevels);
@@ -91,6 +93,11 @@ public class DefaultTwillRunResources implements TwillRunResources {
return memoryMB;
}
+ @Override
+ public int getMaxHeapMemoryMB() {
+ return maxHeapMemoryMB;
+ }
+
/**
* @return the host the runnable is running on.
*/
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 0f8674b..700c0f1 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -61,6 +61,7 @@ public final class TwillContainerLauncher {
private final int reservedMemory;
private final double minHeapRatio;
private final Location secureStoreLocation;
+ private int maxHeapSizeMB;
public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo,
ProcessLauncher.PrepareLaunchContext launchContext,
@@ -144,13 +145,12 @@ public final class TwillContainerLauncher {
firstCommand = "$JAVA_HOME/bin/java";
}
- int memory = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory,
- minHeapRatio);
+ maxHeapSizeMB = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory, minHeapRatio);
commandBuilder.add("-Djava.io.tmpdir=tmp",
"-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
"-Dtwill.runnable=$" + Constants.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,
"-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
- "-Xmx" + memory + "m");
+ "-Xmx" + maxHeapSizeMB + "m");
if (jvmOpts.getExtraOptions() != null) {
commandBuilder.add(jvmOpts.getExtraOptions());
}
@@ -169,6 +169,19 @@ public final class TwillContainerLauncher {
return controller;
}
+ /**
+ * Returns the maximum heap memory size in MB of the Java process launched in the container.
+ * This method can only be called after the {@link #start(RunId, int, Class, String, Location)} method.
+ *
+ * @throws IllegalStateException if the {@link #start(RunId, int, Class, String, Location)} was not called yet.
+ */
+ public int getMaxHeapMemoryMB() {
+ if (maxHeapSizeMB <= 0) {
+ throw new IllegalStateException("Unknown maximum heap memory size. Please make sure the container is started");
+ }
+ return maxHeapSizeMB;
+ }
+
private static final class TwillContainerControllerImpl extends AbstractZKServiceController
implements TwillContainerController {
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
index 831c831..636d94d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
@@ -17,6 +17,7 @@
*/
package org.apache.twill.internal;
+import org.apache.twill.api.Configs;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillSpecification;
@@ -37,30 +38,28 @@ public class TwillRuntimeSpecification {
private final String zkConnectStr;
private final RunId twillRunId;
private final String twillAppName;
- private final int reservedMemory;
private final String rmSchedulerAddr;
private final Map<String, Map<String, String>> logLevels;
private final Map<String, Integer> maxRetries;
- private final double minHeapRatio;
- private final boolean logCollectionEnabled;
+ private final Map<String, String> config;
+ private final Map<String, Map<String, String>> runnableConfigs;
public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir,
String zkConnectStr, RunId twillRunId, String twillAppName,
- int reservedMemory, @Nullable String rmSchedulerAddr,
- Map<String, Map<String, String>> logLevels, Map<String, Integer> maxRetries,
- double minHeapRatio, boolean logCollectionEnabled) {
+ @Nullable String rmSchedulerAddr, Map<String, Map<String, String>> logLevels,
+ Map<String, Integer> maxRetries, Map<String, String> config,
+ Map<String, Map<String, String>> runnableConfigs) {
this.twillSpecification = twillSpecification;
this.fsUser = fsUser;
this.twillAppDir = twillAppDir;
this.zkConnectStr = zkConnectStr;
this.twillRunId = twillRunId;
this.twillAppName = twillAppName;
- this.reservedMemory = reservedMemory;
this.rmSchedulerAddr = rmSchedulerAddr;
this.logLevels = logLevels;
this.maxRetries = maxRetries;
- this.minHeapRatio = minHeapRatio;
- this.logCollectionEnabled = logCollectionEnabled;
+ this.config = config;
+ this.runnableConfigs = runnableConfigs;
}
public TwillSpecification getTwillSpecification() {
@@ -87,19 +86,46 @@ public class TwillRuntimeSpecification {
return twillAppName;
}
- public int getReservedMemory() {
- return reservedMemory;
+ /**
+ * Returns the minimum heap ratio for the application master.
+ */
+ public double getAMMinHeapRatio() {
+ return getMinHeapRatio(config);
+ }
+
+ /**
+ * Returns the minimum heap ratio for the given runnable.
+ */
+ public double getMinHeapRatio(String runnableName) {
+ return getMinHeapRatio(runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config);
}
- public double getMinHeapRatio() {
- return minHeapRatio;
+ /**
+ * Returns the reserved non-heap memory size in MB for the application master.
+ */
+ public int getAMReservedMemory() {
+ return config.containsKey(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB) ?
+ Integer.parseInt(config.get(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB)) :
+ Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB;
+ }
+
+ /**
+ * Returns the reserved non-heap memory size in MB for the given runnable.
+ */
+ public int getReservedMemory(String runnableName) {
+ Map<String, String> conf = runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config;
+ return conf.containsKey(Configs.Keys.JAVA_RESERVED_MEMORY_MB) ?
+ Integer.parseInt(conf.get(Configs.Keys.JAVA_RESERVED_MEMORY_MB)) :
+ Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
}
/**
* Returns whether log collection is enabled.
*/
public boolean isLogCollectionEnabled() {
- return logCollectionEnabled;
+ return config.containsKey(Configs.Keys.LOG_COLLECTION_ENABLED) ?
+ Boolean.parseBoolean(config.get(Configs.Keys.LOG_COLLECTION_ENABLED)) :
+ Configs.Defaults.LOG_COLLECTION_ENABLED;
}
@Nullable
@@ -116,6 +142,20 @@ public class TwillRuntimeSpecification {
}
/**
+ * Returns the configuration for the application.
+ */
+ public Map<String, String> getConfig() {
+ return config;
+ }
+
+ /**
+ * Returns the configurations for each runnable.
+ */
+ public Map<String, Map<String, String>> getRunnableConfigs() {
+ return runnableConfigs;
+ }
+
+ /**
* Returns the ZK connection string for the Kafka used for log collections,
* or {@code null} if log collection is disabled.
*/
@@ -127,4 +167,13 @@ public class TwillRuntimeSpecification {
// When addressing TWILL-147, a field can be introduced to carry this value.
return String.format("%s/%s/%s/kafka", getZkConnectStr(), getTwillAppName(), getTwillAppRunId());
}
+
+ /**
+ * Returns the minimum heap ratio ({@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}) based on the given configuration.
+ */
+ private double getMinHeapRatio(Map<String, String> config) {
+ return config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO) ?
+ Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) :
+ Configs.Defaults.HEAP_RESERVED_MIN_RATIO;
+ }
}
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
index bb4d435..c9196c4 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
@@ -41,6 +41,7 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
private static final String INSTANCE_ID = "instanceId";
private static final String HOST = "host";
private static final String MEMORY_MB = "memoryMB";
+ private static final String MAX_HEAP_MEMORY_MB = "maxHeapMemoryMB";
private static final String VIRTUAL_CORES = "virtualCores";
private static final String DEBUG_PORT = "debugPort";
private static final String LOG_LEVELS = "logLevels";
@@ -53,6 +54,7 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
json.addProperty(INSTANCE_ID, src.getInstanceId());
json.addProperty(HOST, src.getHost());
json.addProperty(MEMORY_MB, src.getMemoryMB());
+ json.addProperty(MAX_HEAP_MEMORY_MB, src.getMaxHeapMemoryMB());
json.addProperty(VIRTUAL_CORES, src.getVirtualCores());
if (src.getDebugPort() != null) {
json.addProperty(DEBUG_PORT, src.getDebugPort());
@@ -69,12 +71,16 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
JsonObject jsonObj = json.getAsJsonObject();
Map<String, LogEntry.Level> logLevels =
context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, LogEntry.Level>>() { }.getType());
- return new DefaultTwillRunResources(jsonObj.get(INSTANCE_ID).getAsInt(),
- jsonObj.get(CONTAINER_ID).getAsString(),
- jsonObj.get(VIRTUAL_CORES).getAsInt(),
- jsonObj.get(MEMORY_MB).getAsInt(),
- jsonObj.get(HOST).getAsString(),
- jsonObj.has(DEBUG_PORT) ? jsonObj.get(DEBUG_PORT).getAsInt() : null,
- logLevels);
+ int memoryMB = jsonObj.get(MEMORY_MB).getAsInt();
+ return new DefaultTwillRunResources(
+ jsonObj.get(INSTANCE_ID).getAsInt(),
+ jsonObj.get(CONTAINER_ID).getAsString(),
+ jsonObj.get(VIRTUAL_CORES).getAsInt(),
+ memoryMB,
+ // For backward compatibility when a newer Twill client re-attached to running app started with older version.
+ jsonObj.has(MAX_HEAP_MEMORY_MB) ? jsonObj.get(MAX_HEAP_MEMORY_MB).getAsInt() : memoryMB,
+ jsonObj.get(HOST).getAsString(),
+ jsonObj.has(DEBUG_PORT) ? jsonObj.get(DEBUG_PORT).getAsInt() : null,
+ logLevels);
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
index 5ff05e8..710a9f7 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
@@ -39,18 +39,20 @@ import java.util.Map;
final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntimeSpecification>,
JsonDeserializer<TwillRuntimeSpecification> {
+ private static final Type MAP_STRING_MAP_STRING_STRING_TYPE =
+ new TypeToken<Map<String, Map<String, String>>>() { }.getType();
+
private static final String FS_USER = "fsUser";
private static final String TWILL_APP_DIR = "twillAppDir";
private static final String ZK_CONNECT_STR = "zkConnectStr";
private static final String TWILL_RUNID = "twillRunId";
private static final String TWILL_APP_NAME = "twillAppName";
- private static final String RESERVED_MEMORY = "reservedMemory";
- private static final String HEAP_RESERVED_MIN_RATIO = "minHeapRatio";
private static final String RM_SCHEDULER_ADDR = "rmSchedulerAddr";
private static final String TWILL_SPEC = "twillSpecification";
private static final String LOG_LEVELS = "logLevels";
private static final String MAX_RETRIES = "maxRetries";
- private static final String LOG_COLLECTION_ENABLED = "logCollectionEnabled";
+ private static final String CONFIG = "config";
+ private static final String RUNNABLE_CONFIGS = "runnableConfigs";
@Override
public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
@@ -60,19 +62,19 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
json.addProperty(ZK_CONNECT_STR, src.getZkConnectStr());
json.addProperty(TWILL_RUNID, src.getTwillAppRunId().getId());
json.addProperty(TWILL_APP_NAME, src.getTwillAppName());
- json.addProperty(RESERVED_MEMORY, src.getReservedMemory());
- json.addProperty(HEAP_RESERVED_MIN_RATIO, src.getMinHeapRatio());
if (src.getRmSchedulerAddr() != null) {
json.addProperty(RM_SCHEDULER_ADDR, src.getRmSchedulerAddr());
}
json.add(TWILL_SPEC,
context.serialize(src.getTwillSpecification(), new TypeToken<TwillSpecification>() { }.getType()));
json.add(LOG_LEVELS,
- context.serialize(src.getLogLevels(), new TypeToken<Map<String, Map<String, String>>>() { }.getType()));
+ context.serialize(src.getLogLevels(), MAP_STRING_MAP_STRING_STRING_TYPE));
json.add(MAX_RETRIES,
context.serialize(src.getMaxRetries(), new TypeToken<Map<String, Integer>>() { }.getType()));
- json.addProperty(LOG_COLLECTION_ENABLED, src.isLogCollectionEnabled());
-
+ json.add(CONFIG,
+ context.serialize(src.getConfig(), new TypeToken<Map<String, String>>() { }.getType()));
+ json.add(RUNNABLE_CONFIGS,
+ context.serialize(src.getRunnableConfigs(), MAP_STRING_MAP_STRING_STRING_TYPE));
return json;
}
@@ -84,9 +86,13 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
TwillSpecification twillSpecification = context.deserialize(
jsonObj.get(TWILL_SPEC), new TypeToken<TwillSpecification>() { }.getType());
Map<String, Map<String, String>> logLevels =
- context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, Map<String, String>>>() { }.getType());
+ context.deserialize(jsonObj.get(LOG_LEVELS), MAP_STRING_MAP_STRING_STRING_TYPE);
Map<String, Integer> maxRetries =
context.deserialize(jsonObj.get(MAX_RETRIES), new TypeToken<Map<String, Integer>>() { }.getType());
+ Map<String, String> config =
+ context.deserialize(jsonObj.get(CONFIG), new TypeToken<Map<String, String>>() { }.getType());
+ Map<String, Map<String, String>> runnableConfigs =
+ context.deserialize(jsonObj.get(RUNNABLE_CONFIGS), MAP_STRING_MAP_STRING_STRING_TYPE);
return new TwillRuntimeSpecification(twillSpecification,
jsonObj.get(FS_USER).getAsString(),
@@ -94,12 +100,11 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
jsonObj.get(ZK_CONNECT_STR).getAsString(),
RunIds.fromString(jsonObj.get(TWILL_RUNID).getAsString()),
jsonObj.get(TWILL_APP_NAME).getAsString(),
- jsonObj.get(RESERVED_MEMORY).getAsInt(),
jsonObj.has(RM_SCHEDULER_ADDR) ?
jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null,
logLevels,
maxRetries,
- jsonObj.get(HEAP_RESERVED_MIN_RATIO).getAsDouble(),
- jsonObj.get(LOG_COLLECTION_ENABLED).getAsBoolean());
+ config,
+ runnableConfigs);
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 523ffce..a2ebf7b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -71,6 +71,7 @@ import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.internal.utils.Instances;
+import org.apache.twill.internal.utils.Resources;
import org.apache.twill.internal.yarn.AbstractYarnTwillService;
import org.apache.twill.internal.yarn.YarnAMClient;
import org.apache.twill.internal.yarn.YarnContainerInfo;
@@ -127,8 +128,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
private final ExpectedContainers expectedContainers;
private final YarnAMClient amClient;
private final JvmOptions jvmOpts;
- private final int reservedMemory;
- private final double minHeapRatio;
private final EventHandler eventHandler;
private final Location applicationLocation;
private final PlacementPolicyManager placementPolicyManager;
@@ -151,8 +150,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
this.amClient = amClient;
this.credentials = createCredentials();
this.jvmOpts = loadJvmOptions();
- this.reservedMemory = twillRuntimeSpec.getReservedMemory();
- this.minHeapRatio = twillRuntimeSpec.getMinHeapRatio();
this.twillSpec = twillRuntimeSpec.getTwillSpecification();
this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies());
this.environments = getEnvironments();
@@ -198,11 +195,18 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
private RunningContainers createRunningContainers(ContainerId appMasterContainerId,
String appMasterHost) throws Exception {
+ int containerMemoryMB = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB));
+
+ // We can't get the -Xmx easily, so just recompute the -Xmx in the same way that the client does
+ int maxHeapMemoryMB = Resources.computeMaxHeapSize(containerMemoryMB,
+ twillRuntimeSpec.getAMReservedMemory(),
+ twillRuntimeSpec.getAMMinHeapRatio());
TwillRunResources appMasterResources = new DefaultTwillRunResources(
0,
appMasterContainerId.toString(),
Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES)),
- Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB)),
+ containerMemoryMB,
+ maxHeapMemoryMB,
appMasterHost, null);
String appId = appMasterContainerId.getApplicationAttemptId().getApplicationId().toString();
return new RunningContainers(appId, appMasterResources, zkClient, applicationLocation,
@@ -667,7 +671,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
TwillContainerLauncher launcher = new TwillContainerLauncher(
twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
- containerCount, jvmOpts, reservedMemory, getSecureStoreLocation(), minHeapRatio);
+ containerCount, jvmOpts, twillRuntimeSpec.getReservedMemory(runnableName), getSecureStoreLocation(),
+ twillRuntimeSpec.getMinHeapRatio(runnableName));
runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index a950c46..c85c372 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -163,9 +163,9 @@ final class RunningContainers {
containerInfo.getId(),
containerInfo.getVirtualCores(),
containerInfo.getMemoryMB(),
+ launcher.getMaxHeapMemoryMB(),
containerInfo.getHost().getHostName(),
controller);
-
resourceReport.addRunResources(runnableName, resources);
containerStats.put(runnableName, containerInfo);
@@ -734,9 +734,9 @@ final class RunningContainers {
private Integer dynamicDebugPort = null;
private DynamicTwillRunResources(int instanceId, String containerId,
- int cores, int memoryMB, String host,
+ int cores, int memoryMB, int maxHeapMemoryMB, String host,
TwillContainerController controller) {
- super(instanceId, containerId, cores, memoryMB, host, null);
+ super(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, null);
this.controller = controller;
}
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 52e18eb..5442fa0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -147,6 +147,7 @@ final class YarnTwillPreparer implements TwillPreparer {
private final Map<String, Map<String, String>> logLevels = Maps.newHashMap();
private final LocationCache locationCache;
private final Map<String, Integer> maxRetries = Maps.newHashMap();
+ private final Map<String, Map<String, String>> runnableConfigs = Maps.newHashMap();
private String schedulerQueue;
private String extraOptions;
private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG;
@@ -183,6 +184,13 @@ final class YarnTwillPreparer implements TwillPreparer {
}
@Override
+ public TwillPreparer withConfiguration(String runnableName, Map<String, String> config) {
+ confirmRunnableName(runnableName);
+ runnableConfigs.put(runnableName, Maps.newHashMap(config));
+ return this;
+ }
+
+ @Override
public TwillPreparer addLogHandler(LogHandler handler) {
logHandlers.add(handler);
return this;
@@ -382,10 +390,11 @@ final class YarnTwillPreparer implements TwillPreparer {
createApplicationJar(createBundler(classAcceptor), localFiles);
createResourcesJar(createBundler(classAcceptor), localFiles);
+ TwillRuntimeSpecification twillRuntimeSpec;
Path runtimeConfigDir = Files.createTempDirectory(getLocalStagingDir().toPath(),
Constants.Files.RUNTIME_CONFIG_JAR);
try {
- saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
+ twillRuntimeSpec = saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
saveLogback(runtimeConfigDir.resolve(Constants.Files.LOGBACK_TEMPLATE));
saveClassPaths(runtimeConfigDir);
saveJvmOptions(extraOptions, debugOptions, runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS));
@@ -405,10 +414,9 @@ final class YarnTwillPreparer implements TwillPreparer {
// appMaster.jar
// org.apache.twill.internal.appmaster.ApplicationMasterMain
// false
-
- int reservedMemoryMB = config.getInt(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB,
- Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB);
- int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), reservedMemoryMB, getMinHeapRatio());
+ int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
+ twillRuntimeSpec.getAMReservedMemory(),
+ twillRuntimeSpec.getAMMinHeapRatio());
return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(),
createSubmissionCredentials())
.addCommand(
@@ -439,22 +447,6 @@ final class YarnTwillPreparer implements TwillPreparer {
}
/**
- * Returns the minimum heap ratio based on the configuration.
- */
- private double getMinHeapRatio() {
- // doing this way to support hadoop-2.0 profile
- String minHeapRatioStr = config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO);
- return (minHeapRatioStr == null) ? Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr);
- }
-
- /**
- * Returns the reserved memory size in MB based on the configuration.
- */
- private int getReservedMemory() {
- return config.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB, Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
- }
-
- /**
* Returns the local staging directory based on the configuration.
*/
private File getLocalStagingDir() {
@@ -670,7 +662,7 @@ final class YarnTwillPreparer implements TwillPreparer {
return localFiles;
}
- private void saveSpecification(TwillSpecification spec, Path targetFile) throws IOException {
+ private TwillRuntimeSpecification saveSpecification(TwillSpecification spec, Path targetFile) throws IOException {
final Multimap<String, LocalFile> runnableLocalFiles = populateRunnableLocalFiles(spec);
// Rewrite LocalFiles inside twillSpec
@@ -692,15 +684,20 @@ final class YarnTwillPreparer implements TwillPreparer {
}
TwillSpecification newTwillSpec = new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(),
spec.getPlacementPolicies(), eventHandler);
- boolean logCollectionEnabled = config.getBoolean(Configs.Keys.LOG_COLLECTION_ENABLED,
- Configs.Defaults.LOG_COLLECTION_ENABLED);
- TwillRuntimeSpecificationAdapter.create().toJson(
- new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(),
- appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
- getReservedMemory(), config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
- logLevels, maxRetries, getMinHeapRatio(), logCollectionEnabled), writer);
+ Map<String, String> configMap = Maps.newHashMap();
+ for (Map.Entry<String, String> entry : config) {
+ configMap.put(entry.getKey(), entry.getValue());
+ }
+
+ TwillRuntimeSpecification twillRuntimeSpec = new TwillRuntimeSpecification(
+ newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(),
+ appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
+ config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
+ logLevels, maxRetries, configMap, runnableConfigs);
+ TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec, writer);
+ LOG.debug("Done {}", targetFile);
+ return twillRuntimeSpec;
}
- LOG.debug("Done {}", targetFile);
}
private void saveLogback(Path targetFile) throws IOException {
http://git-wip-us.apache.org/repos/asf/twill/blob/f1931deb/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index f5143ce..73f1476 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -25,17 +25,19 @@ import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.discovery.ServiceDiscovered;
+import org.apache.twill.internal.utils.Resources;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
-import java.util.Collections;
+import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -67,9 +69,16 @@ public class ContainerSizeTestRun extends BaseYarnTest {
@Test
public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException {
TwillRunner runner = getTwillRunner();
+ String runnableName = "sleep";
+
TwillController controller = runner.prepare(new MaxHeapApp())
- // Alter the AM container size
- .withConfiguration(Collections.singletonMap(Configs.Keys.YARN_AM_MEMORY_MB, "256"))
+ // Alter the AM container size and heap ratio
+ .withConfiguration(ImmutableMap.of(Configs.Keys.YARN_AM_MEMORY_MB, "256",
+ Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.65"))
+ // Use a different heap ratio and reserved memory size for the runnable
+ .withConfiguration(runnableName,
+ ImmutableMap.of(Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.8",
+ Configs.Keys.JAVA_RESERVED_MEMORY_MB, "1024"))
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.start();
@@ -77,10 +86,20 @@ public class ContainerSizeTestRun extends BaseYarnTest {
ServiceDiscovered discovered = controller.discoverService("sleep");
Assert.assertTrue(waitForSize(discovered, 1, 120));
- // Verify the AM container size
+ // Verify the AM container size and heap size
ResourceReport resourceReport = controller.getResourceReport();
Assert.assertNotNull(resourceReport);
Assert.assertEquals(256, resourceReport.getAppMasterResources().getMemoryMB());
+ Assert.assertEquals(Resources.computeMaxHeapSize(256, Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB, 0.65d),
+ resourceReport.getAppMasterResources().getMaxHeapMemoryMB());
+
+ // Verify the runnable container heap size
+ Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources(runnableName);
+ Assert.assertFalse(runnableResources.isEmpty());
+ TwillRunResources resources = runnableResources.iterator().next();
+ Assert.assertEquals(Resources.computeMaxHeapSize(resources.getMemoryMB(), 1024, 0.8d),
+ resources.getMaxHeapMemoryMB());
+
} finally {
controller.terminate().get(120, TimeUnit.SECONDS);
}
[5/5] twill git commit: Only copy twill configurations into the
runtime spec.
Posted by ch...@apache.org.
Only copy twill configurations into the runtime spec.
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6ccc7158
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6ccc7158
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6ccc7158
Branch: refs/heads/feature/TWILL-241-per-runnable-opts
Commit: 6ccc7158e13a0b7182197cc35c91d96fc9947541
Parents: 577e6c7
Author: Terence Yim <ch...@apache.org>
Authored: Fri Aug 4 21:16:00 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Aug 4 21:16:00 2017 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/6ccc7158/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index c2b0ee5..0eba62b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -698,7 +698,9 @@ final class YarnTwillPreparer implements TwillPreparer {
spec.getPlacementPolicies(), eventHandler);
Map<String, String> configMap = Maps.newHashMap();
for (Map.Entry<String, String> entry : config) {
- configMap.put(entry.getKey(), entry.getValue());
+ if (entry.getKey().startsWith("twill.")) {
+ configMap.put(entry.getKey(), entry.getValue());
+ }
}
TwillRuntimeSpecification twillRuntimeSpec = new TwillRuntimeSpecification(
[4/5] twill git commit: Fix check style issue.
Posted by ch...@apache.org.
Fix check style issue.
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/577e6c7c
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/577e6c7c
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/577e6c7c
Branch: refs/heads/feature/TWILL-241-per-runnable-opts
Commit: 577e6c7cf86abd576a3ac676faba96165c44d146
Parents: f9d5a59
Author: Terence Yim <ch...@apache.org>
Authored: Fri Aug 4 19:01:33 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Aug 4 19:01:33 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/twill/internal/DefaultTwillRunResources.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/577e6c7c/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
index f05074e..6f8a052 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
@@ -45,7 +45,8 @@ public class DefaultTwillRunResources implements TwillRunResources {
*/
public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB,
String host, Integer debugPort) {
- this(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, debugPort, Collections.<String, Level>emptyMap());
+ this(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, debugPort,
+ Collections.<String, Level>emptyMap());
}
public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB,
[2/5] twill git commit: (TWILL-241) Added support for per runnable
JVM options - Also removed JvmOptionsCodec since JvmOptions only uses simple
types
Posted by ch...@apache.org.
(TWILL-241) Added support for per runnable JVM options
- Also removed JvmOptionsCodec since JvmOptions only uses simple types
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/29a7999f
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/29a7999f
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/29a7999f
Branch: refs/heads/feature/TWILL-241-per-runnable-opts
Commit: 29a7999f45859996595287fb1c28b225a2564ed9
Parents: f1931de
Author: Terence Yim <ch...@apache.org>
Authored: Fri Aug 4 16:19:32 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Aug 4 18:06:33 2017 -0700
----------------------------------------------------------------------
.../org/apache/twill/api/TwillPreparer.java | 13 +++
.../org/apache/twill/internal/JvmOptions.java | 17 ++-
.../twill/internal/TwillContainerLauncher.java | 10 +-
.../twill/internal/json/JvmOptionsCodec.java | 111 -------------------
.../internal/json/JvmOptionsCodecTest.java | 107 ------------------
.../appmaster/ApplicationMasterService.java | 18 +--
.../apache/twill/yarn/YarnTwillPreparer.java | 73 +++++++-----
.../twill/yarn/CustomClassLoaderRunnable.java | 3 +-
.../twill/yarn/CustomClassLoaderTestRun.java | 3 +-
.../apache/twill/yarn/JvmOptionsTestRun.java | 103 +++++++++++++++++
.../org/apache/twill/yarn/YarnTestSuite.java | 1 +
11 files changed, 195 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index 35930d2..812a086 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -87,6 +87,19 @@ public interface TwillPreparer {
TwillPreparer setJVMOptions(String options);
/**
+ * This methods sets the extra JVM options that will be passed to the java command line for the given runnable
+ * of the application started through this {@link org.apache.twill.api.TwillPreparer} instance.
+ * The options set for the given runnable will be appended to any global options set through the
+ * {@link #setJVMOptions(String)} or {@link #addJVMOptions(String)} method.
+ *
+ * This is intended for advance usage. All options will be passed unchanged to the java command line. Invalid
+ * options could cause application not able to start.
+ *
+ * @param options extra JVM options.
+ */
+ TwillPreparer setJVMOptions(String runnableName, String options);
+
+ /**
* This methods adds extra JVM options that will be passed to the java command line for every runnable
* of the application started through this {@link org.apache.twill.api.TwillPreparer} instance.
*
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java b/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java
index 945561b..6e35c6c 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Booleans;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -30,17 +31,29 @@ import java.util.Set;
public final class JvmOptions {
private final String extraOptions;
+ private final Map<String, String> runnableExtraOptions;
private final DebugOptions debugOptions;
- public JvmOptions(String extraOptions, DebugOptions debugOptions) {
+ public JvmOptions(String extraOptions, Map<String, String> runnableExtraOptions, DebugOptions debugOptions) {
this.extraOptions = extraOptions;
+ this.runnableExtraOptions = runnableExtraOptions;
this.debugOptions = debugOptions;
}
- public String getExtraOptions() {
+ /**
+ * Returns the extra options for the application master.
+ */
+ public String getAMExtraOptions() {
return extraOptions;
}
+ /**
+ * Returns the extra options for the given runnable.
+ */
+ public String getRunnableExtraOptions(String runnableName) {
+ return runnableExtraOptions.containsKey(runnableName) ? runnableExtraOptions.get(runnableName) : extraOptions;
+ }
+
public DebugOptions getDebugOptions() {
return debugOptions;
}
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 700c0f1..0b98ba6 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -65,8 +65,9 @@ public final class TwillContainerLauncher {
public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo,
ProcessLauncher.PrepareLaunchContext launchContext,
- ZKClient zkClient, int instanceCount, JvmOptions jvmOpts, int reservedMemory,
- Location secureStoreLocation, double minHeapRatio) {
+ ZKClient zkClient, int instanceCount, JvmOptions jvmOpts,
+ int reservedMemory, double minHeapRatio,
+ Location secureStoreLocation) {
this.runtimeSpec = runtimeSpec;
this.containerInfo = containerInfo;
this.launchContext = launchContext;
@@ -151,8 +152,9 @@ public final class TwillContainerLauncher {
"-Dtwill.runnable=$" + Constants.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,
"-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
"-Xmx" + maxHeapSizeMB + "m");
- if (jvmOpts.getExtraOptions() != null) {
- commandBuilder.add(jvmOpts.getExtraOptions());
+ String extraOptions = jvmOpts.getRunnableExtraOptions(runtimeSpec.getName());
+ if (!extraOptions.isEmpty()) {
+ commandBuilder.add(extraOptions);
}
commandBuilder.add(TwillLauncher.class.getName(),
mainClass.getName(),
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java
deleted file mode 100644
index 807840f..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.io.InputSupplier;
-import com.google.common.io.OutputSupplier;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-import org.apache.twill.internal.JvmOptions;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.io.Writer;
-import java.lang.reflect.Type;
-import java.util.Set;
-
-/**
- * Gson codec for {@link JvmOptions}.
- */
-public class JvmOptionsCodec implements JsonSerializer<JvmOptions>, JsonDeserializer<JvmOptions> {
-
- private static final Gson GSON = new GsonBuilder().registerTypeAdapter(JvmOptions.class, new JvmOptionsCodec())
- .registerTypeAdapter(JvmOptions.DebugOptions.class,
- new DebugOptionsCodec())
- .create();
-
- public static void encode(JvmOptions jvmOptions, OutputSupplier<? extends Writer> writerSupplier) throws IOException {
- try (Writer writer = writerSupplier.getOutput()) {
- GSON.toJson(jvmOptions, writer);
- }
- }
-
- public static JvmOptions decode(InputSupplier<? extends Reader> readerSupplier) throws IOException {
- try (Reader reader = readerSupplier.getInput()) {
- return GSON.fromJson(reader, JvmOptions.class);
- }
- }
-
- @Override
- public JvmOptions deserialize(JsonElement json, Type type, JsonDeserializationContext context)
- throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- String extraOptions = context.deserialize(jsonObj.get("extraOptions"), String.class);
- JvmOptions.DebugOptions debugOptions = context.deserialize(jsonObj.get("debugOptions"),
- JvmOptions.DebugOptions.class);
- return new JvmOptions(extraOptions, debugOptions);
- }
-
- @Override
- public JsonElement serialize(JvmOptions jvmOptions, Type type, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
- json.add("extraOptions", context.serialize(jvmOptions.getExtraOptions()));
- json.add("debugOptions", context.serialize(jvmOptions.getDebugOptions()));
- return json;
- }
-
- private static class DebugOptionsCodec
- implements JsonSerializer<JvmOptions.DebugOptions>, JsonDeserializer<JvmOptions.DebugOptions> {
-
- @Override
- public JvmOptions.DebugOptions deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
- throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- Boolean doDebug = context.deserialize(jsonObj.get("doDebug"), Boolean.class);
- if (!doDebug) {
- return JvmOptions.DebugOptions.NO_DEBUG;
- }
- Boolean doSuspend = context.deserialize(jsonObj.get("doSuspend"), Boolean.class);
- Set<String> runnables = context.deserialize(jsonObj.get("runnables"),
- new TypeToken<Set<String>>() { }.getType());
- return new JvmOptions.DebugOptions(true, doSuspend, runnables == null ? null : ImmutableSet.copyOf(runnables));
- }
-
- @Override
- public JsonElement serialize(JvmOptions.DebugOptions src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
- json.add("doDebug", context.serialize(src.doDebug()));
- json.add("doSuspend", context.serialize(src.doSuspend()));
- if (src.getRunnables() != null) {
- json.add("runnables", context.serialize(src.getRunnables()));
- }
- return json;
- }
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java b/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java
deleted file mode 100644
index 2791e72..0000000
--- a/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.io.InputSupplier;
-import com.google.common.io.OutputSupplier;
-import org.apache.twill.internal.JvmOptions;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Tests the JvmOptions Codec.
- */
-public class JvmOptionsCodecTest {
-
- @Test
- public void testNoNulls() throws Exception {
- JvmOptions options = new JvmOptions("-version",
- new JvmOptions.DebugOptions(true, false, ImmutableSet.of("one", "two")));
- final StringWriter writer = new StringWriter();
- JvmOptionsCodec.encode(options, new OutputSupplier<Writer>() {
- @Override
- public Writer getOutput() throws IOException {
- return writer;
- }
- });
- JvmOptions options1 = JvmOptionsCodec.decode(new InputSupplier<Reader>() {
- @Override
- public Reader getInput() throws IOException {
- return new StringReader(writer.toString());
- }
- });
- Assert.assertEquals(options.getExtraOptions(), options1.getExtraOptions());
- Assert.assertEquals(options.getDebugOptions().doDebug(), options1.getDebugOptions().doDebug());
- Assert.assertEquals(options.getDebugOptions().doSuspend(), options1.getDebugOptions().doSuspend());
- Assert.assertEquals(options.getDebugOptions().getRunnables(), options1.getDebugOptions().getRunnables());
- }
-
- @Test
- public void testSomeNulls() throws Exception {
- JvmOptions options = new JvmOptions(null, new JvmOptions.DebugOptions(false, false, null));
- final StringWriter writer = new StringWriter();
- JvmOptionsCodec.encode(options, new OutputSupplier<Writer>() {
- @Override
- public Writer getOutput() throws IOException {
- return writer;
- }
- });
- JvmOptions options1 = JvmOptionsCodec.decode(new InputSupplier<Reader>() {
- @Override
- public Reader getInput() throws IOException {
- return new StringReader(writer.toString());
- }
- });
- Assert.assertEquals(options.getExtraOptions(), options1.getExtraOptions());
- Assert.assertEquals(options.getDebugOptions().doDebug(), options1.getDebugOptions().doDebug());
- Assert.assertEquals(options.getDebugOptions().doSuspend(), options1.getDebugOptions().doSuspend());
- Assert.assertEquals(options.getDebugOptions().getRunnables(), options1.getDebugOptions().getRunnables());
- }
-
- @Test
- public void testNoRunnables() throws Exception {
- List<String> noRunnables = Collections.emptyList();
- JvmOptions options = new JvmOptions(null, new JvmOptions.DebugOptions(true, false, noRunnables));
- final StringWriter writer = new StringWriter();
- JvmOptionsCodec.encode(options, new OutputSupplier<Writer>() {
- @Override
- public Writer getOutput() throws IOException {
- return writer;
- }
- });
- JvmOptions options1 = JvmOptionsCodec.decode(new InputSupplier<Reader>() {
- @Override
- public Reader getInput() throws IOException {
- return new StringReader(writer.toString());
- }
- });
- Assert.assertEquals(options.getExtraOptions(), options1.getExtraOptions());
- Assert.assertEquals(options.getDebugOptions().doDebug(), options1.getDebugOptions().doDebug());
- Assert.assertEquals(options.getDebugOptions().doSuspend(), options1.getDebugOptions().doSuspend());
- Assert.assertEquals(options.getDebugOptions().getRunnables(), options1.getDebugOptions().getRunnables());
- }
-}
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index a2ebf7b..4917f4d 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -30,7 +30,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Ranges;
import com.google.common.collect.Sets;
-import com.google.common.io.InputSupplier;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -65,7 +64,6 @@ import org.apache.twill.internal.JvmOptions;
import org.apache.twill.internal.ProcessLauncher;
import org.apache.twill.internal.TwillContainerLauncher;
import org.apache.twill.internal.TwillRuntimeSpecification;
-import org.apache.twill.internal.json.JvmOptionsCodec;
import org.apache.twill.internal.json.LocalFileCodec;
import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
import org.apache.twill.internal.state.Message;
@@ -84,7 +82,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
@@ -167,14 +164,11 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
private JvmOptions loadJvmOptions() throws IOException {
final File jvmOptsFile = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.JVM_OPTIONS);
if (!jvmOptsFile.exists()) {
- return new JvmOptions(null, JvmOptions.DebugOptions.NO_DEBUG);
+ return new JvmOptions("", Collections.<String, String>emptyMap(), JvmOptions.DebugOptions.NO_DEBUG);
+ }
+ try (Reader reader = Files.newBufferedReader(jvmOptsFile.toPath(), StandardCharsets.UTF_8)) {
+ return GSON.fromJson(reader, JvmOptions.class);
}
- return JvmOptionsCodec.decode(new InputSupplier<Reader>() {
- @Override
- public Reader getInput() throws IOException {
- return new FileReader(jvmOptsFile);
- }
- });
}
@SuppressWarnings("unchecked")
@@ -671,8 +665,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
TwillContainerLauncher launcher = new TwillContainerLauncher(
twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
- containerCount, jvmOpts, twillRuntimeSpec.getReservedMemory(runnableName), getSecureStoreLocation(),
- twillRuntimeSpec.getMinHeapRatio(runnableName));
+ containerCount, jvmOpts, twillRuntimeSpec.getReservedMemory(runnableName),
+ twillRuntimeSpec.getMinHeapRatio(runnableName), getSecureStoreLocation());
runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 5442fa0..c2b0ee5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -75,7 +75,6 @@ import org.apache.twill.internal.appmaster.ApplicationMasterMain;
import org.apache.twill.internal.container.TwillContainerMain;
import org.apache.twill.internal.io.LocationCache;
import org.apache.twill.internal.json.ArgumentsCodec;
-import org.apache.twill.internal.json.JvmOptionsCodec;
import org.apache.twill.internal.json.LocalFileCodec;
import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
import org.apache.twill.internal.utils.Dependencies;
@@ -114,6 +113,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
+import javax.annotation.Nullable;
/**
* Implementation for {@link TwillPreparer} to prepare and launch distributed application on Hadoop YARN.
@@ -148,14 +148,15 @@ final class YarnTwillPreparer implements TwillPreparer {
private final LocationCache locationCache;
private final Map<String, Integer> maxRetries = Maps.newHashMap();
private final Map<String, Map<String, String>> runnableConfigs = Maps.newHashMap();
- private String schedulerQueue;
+ private final Map<String, String> runnableExtraOptions = Maps.newHashMap();
private String extraOptions;
private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG;
+ private String schedulerQueue;
private ClassAcceptor classAcceptor;
private String classLoaderClassName;
YarnTwillPreparer(Configuration config, TwillSpecification twillSpec, RunId runId,
- String zkConnectString, Location appLocation, String extraOptions,
+ String zkConnectString, Location appLocation, @Nullable String extraOptions,
LocationCache locationCache, YarnTwillControllerFactory controllerFactory) {
this.config = config;
this.twillSpec = twillSpec;
@@ -164,7 +165,7 @@ final class YarnTwillPreparer implements TwillPreparer {
this.appLocation = appLocation;
this.controllerFactory = controllerFactory;
this.credentials = createCredentials();
- this.extraOptions = extraOptions;
+ this.extraOptions = extraOptions == null ? "" : extraOptions;
this.classAcceptor = new ClassAcceptor();
this.locationCache = locationCache;
}
@@ -209,13 +210,23 @@ final class YarnTwillPreparer implements TwillPreparer {
@Override
public TwillPreparer setJVMOptions(String options) {
+ Preconditions.checkArgument(options != null, "JVM options cannot be null.");
this.extraOptions = options;
return this;
}
@Override
+ public TwillPreparer setJVMOptions(String runnableName, String options) {
+ confirmRunnableName(runnableName);
+ Preconditions.checkArgument(options != null, "JVM options cannot be null.");
+ runnableExtraOptions.put(runnableName, options);
+ return this;
+ }
+
+ @Override
public TwillPreparer addJVMOptions(String options) {
- this.extraOptions = extraOptions == null ? options : extraOptions + " " + options;
+ Preconditions.checkArgument(options != null, "JVM options cannot be null.");
+ this.extraOptions = extraOptions.isEmpty() ? options : extraOptions + " " + options;
return this;
}
@@ -226,6 +237,9 @@ final class YarnTwillPreparer implements TwillPreparer {
@Override
public TwillPreparer enableDebugging(boolean doSuspend, String... runnables) {
+ for (String runnableName : runnables) {
+ confirmRunnableName(runnableName);
+ }
this.debugOptions = new JvmOptions.DebugOptions(true, doSuspend, ImmutableSet.copyOf(runnables));
return this;
}
@@ -379,9 +393,6 @@ final class YarnTwillPreparer implements TwillPreparer {
new Callable<ProcessController<YarnApplicationReport>>() {
@Override
public ProcessController<YarnApplicationReport> call() throws Exception {
-
- String extraOptions = getExtraOptions();
-
// Local files needed by AM
Map<String, LocalFile> localFiles = Maps.newHashMap();
@@ -391,13 +402,14 @@ final class YarnTwillPreparer implements TwillPreparer {
createResourcesJar(createBundler(classAcceptor), localFiles);
TwillRuntimeSpecification twillRuntimeSpec;
+ JvmOptions jvmOptions;
Path runtimeConfigDir = Files.createTempDirectory(getLocalStagingDir().toPath(),
Constants.Files.RUNTIME_CONFIG_JAR);
try {
twillRuntimeSpec = saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
saveLogback(runtimeConfigDir.resolve(Constants.Files.LOGBACK_TEMPLATE));
saveClassPaths(runtimeConfigDir);
- saveJvmOptions(extraOptions, debugOptions, runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS));
+ jvmOptions = saveJvmOptions(runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS));
saveArguments(new Arguments(arguments, runnableArgs),
runtimeConfigDir.resolve(Constants.Files.ARGUMENTS));
saveEnvironments(runtimeConfigDir.resolve(Constants.Files.ENVIRONMENTS));
@@ -426,7 +438,7 @@ final class YarnTwillPreparer implements TwillPreparer {
"-Dtwill.app=$" + Constants.TWILL_APP_NAME,
"-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
"-Xmx" + memory + "m",
- extraOptions,
+ jvmOptions.getAMExtraOptions(),
TwillLauncher.class.getName(),
ApplicationMasterMain.class.getName(),
Boolean.FALSE.toString())
@@ -456,12 +468,12 @@ final class YarnTwillPreparer implements TwillPreparer {
/**
* Returns the extra options for the container JVM.
*/
- private String getExtraOptions() {
- String extraOptions = this.extraOptions == null ? "" : this.extraOptions;
- if (classLoaderClassName != null) {
- extraOptions += " -D" + Constants.TWILL_CONTAINER_CLASSLOADER + "=" + classLoaderClassName;
+ private String addClassLoaderClassName(String extraOptions) {
+ if (classLoaderClassName == null) {
+ return extraOptions;
}
- return extraOptions;
+ String classLoaderProperty = "-D" + Constants.TWILL_CONTAINER_CLASSLOADER + "=" + classLoaderClassName;
+ return extraOptions.isEmpty() ? classLoaderProperty : " " + classLoaderProperty;
}
private void setEnv(String runnableName, Map<String, String> env, boolean overwrite) {
@@ -759,20 +771,31 @@ final class YarnTwillPreparer implements TwillPreparer {
Joiner.on(':').join(classPaths).getBytes(StandardCharsets.UTF_8));
}
- private void saveJvmOptions(String extraOptions,
- JvmOptions.DebugOptions debugOptions, final Path targetPath) throws IOException {
- if (extraOptions.isEmpty() && JvmOptions.DebugOptions.NO_DEBUG.equals(debugOptions)) {
+ private JvmOptions saveJvmOptions(final Path targetPath) throws IOException {
+ // Updates the extra options with the classloader name if necessary
+ final String globalOptions = addClassLoaderClassName(extraOptions);
+ // Append runnable specific extra options.
+ Map<String, String> runnableExtraOptions = Maps.newHashMap(
+ Maps.transformValues(this.runnableExtraOptions, new Function<String, String>() {
+ @Override
+ public String apply(String extraOptions) {
+ return globalOptions.isEmpty() ? extraOptions : globalOptions + " " + extraOptions;
+ }
+ }));
+
+ JvmOptions jvmOptions = new JvmOptions(globalOptions, runnableExtraOptions, debugOptions);
+ if (globalOptions.isEmpty() && runnableExtraOptions.isEmpty()
+ && JvmOptions.DebugOptions.NO_DEBUG.equals(debugOptions)) {
// If no vm options, no need to localize the file.
- return;
+ return jvmOptions;
}
+
LOG.debug("Creating {}", targetPath);
- JvmOptionsCodec.encode(new JvmOptions(extraOptions, debugOptions), new OutputSupplier<Writer>() {
- @Override
- public Writer getOutput() throws IOException {
- return Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8);
- }
- });
+ try (Writer writer = Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8)) {
+ new Gson().toJson(new JvmOptions(globalOptions, runnableExtraOptions, debugOptions), writer);
+ }
LOG.debug("Done {}", targetPath);
+ return jvmOptions;
}
private void saveArguments(Arguments arguments, final Path targetPath) throws IOException {
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java
index 66bcd42..591f931 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLatch;
*/
public final class CustomClassLoaderRunnable extends AbstractTwillRunnable {
- static final String SERVICE_NAME = "custom.service";
static final String GENERATED_CLASS_NAME = "org.apache.twill.test.Generated";
private static final Logger LOG = LoggerFactory.getLogger(CustomClassLoaderRunnable.class);
@@ -42,7 +41,7 @@ public final class CustomClassLoaderRunnable extends AbstractTwillRunnable {
try {
Class<?> cls = Class.forName(GENERATED_CLASS_NAME);
java.lang.reflect.Method announce = cls.getMethod("announce", ServiceAnnouncer.class, String.class, int.class);
- announce.invoke(cls.newInstance(), getContext(), SERVICE_NAME, 54321);
+ announce.invoke(cls.newInstance(), getContext(), System.getProperty("service.name"), 54321);
Uninterruptibles.awaitUninterruptibly(stopLatch);
} catch (Exception e) {
LOG.error("Failed to call announce on " + GENERATED_CLASS_NAME, e);
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java
index 0ac43a6..f0a75b2 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java
@@ -34,9 +34,10 @@ public class CustomClassLoaderTestRun extends BaseYarnTest {
TwillController controller = getTwillRunner().prepare(new CustomClassLoaderRunnable())
.setClassLoader(CustomClassLoader.class.getName())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .setJVMOptions(CustomClassLoaderRunnable.class.getSimpleName(), "-Dservice.name=custom")
.start();
- Assert.assertTrue(waitForSize(controller.discoverService(CustomClassLoaderRunnable.SERVICE_NAME), 1, 120));
+ Assert.assertTrue(waitForSize(controller.discoverService("custom"), 1, 120));
controller.terminate().get();
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java
new file mode 100644
index 0000000..3ec49e2
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Unit test for testing extra JVM options setting for runnables.
+ */
+public class JvmOptionsTestRun extends BaseYarnTest {
+
+ @Test
+ public void testExtraOptions() throws InterruptedException, ExecutionException {
+ // Start the testing app with jvm options at both global level as well as for the specific runnables.
+ TwillController controller = getTwillRunner()
+ .prepare(new JvmOptionsApplication())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
+ .setJVMOptions("-Dservice.name=default")
+ .setJVMOptions("r2", "-Dservice.name=r2")
+ .start();
+
+ // For r1 and r3 will be using "default" as the service name.
+ waitForSize(controller.discoverService("default"), 2, 120);
+ // r2 will be use "r2" as the service name.
+ waitForSize(controller.discoverService("r2"), 1, 120);
+
+ controller.terminate().get();
+ }
+
+ /**
+ * Application for testing extra jvm options
+ */
+ public static final class JvmOptionsApplication implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName(JvmOptionsApplication.class.getSimpleName())
+ .withRunnable()
+ .add("r1", new SimpleRunnable()).noLocalFiles()
+ .add("r2", new SimpleRunnable()).noLocalFiles()
+ .add("r3", new SimpleRunnable()).noLocalFiles()
+ .anyOrder()
+ .build();
+ }
+ }
+
+ /**
+ * A runnable that simple announce itself to some name based on the system property and wait for stop signal.
+ */
+ public static final class SimpleRunnable extends AbstractTwillRunnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleRunnable.class);
+
+ private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+ @Override
+ public void run() {
+ String runnableName = getContext().getSpecification().getName();
+ String serviceName = System.getProperty("service.name");
+ LOG.info("Announcing with name {} for runnable {}", serviceName, runnableName);
+
+ // Compute a unique port name based on runnable name (running names are r[0-9]+)
+ getContext().announce(serviceName, 12345 + Integer.parseInt(runnableName.substring(1)));
+ try {
+ stopLatch.await();
+ } catch (InterruptedException e) {
+ LOG.warn("Run thread interrupted", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopLatch.countDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/twill/blob/29a7999f/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index 0911a3d..0bb7fce 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -33,6 +33,7 @@ import org.junit.runners.Suite;
EnvironmentTestRun.class,
FailureRestartTestRun.class,
InitializeFailTestRun.class,
+ JvmOptionsTestRun.class,
LocalFileTestRun.class,
LogHandlerTestRun.class,
LogLevelChangeTestRun.class,
[3/5] twill git commit: Updated the config reading logic.
Posted by ch...@apache.org.
Updated the config reading logic.
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/f9d5a596
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/f9d5a596
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/f9d5a596
Branch: refs/heads/feature/TWILL-241-per-runnable-opts
Commit: f9d5a596b0d99c0ca1830113d25d30f520f85bc9
Parents: 29a7999
Author: Terence Yim <ch...@apache.org>
Authored: Fri Aug 4 19:00:00 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Aug 4 19:00:00 2017 -0700
----------------------------------------------------------------------
.../internal/TwillRuntimeSpecification.java | 41 +++++++++++++-------
.../apache/twill/yarn/ContainerSizeTestRun.java | 14 +++++--
2 files changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/f9d5a596/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
index 636d94d..2974870 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
@@ -90,33 +90,30 @@ public class TwillRuntimeSpecification {
* Returns the minimum heap ratio for the application master.
*/
public double getAMMinHeapRatio() {
- return getMinHeapRatio(config);
+ return getMinHeapRatio(config, Configs.Defaults.HEAP_RESERVED_MIN_RATIO);
}
/**
* Returns the minimum heap ratio for the given runnable.
*/
public double getMinHeapRatio(String runnableName) {
- return getMinHeapRatio(runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config);
+ double ratio = getMinHeapRatio(runnableConfigs.get(runnableName), 0d);
+ return ratio <= 0d ? getMinHeapRatio(config, Configs.Defaults.HEAP_RESERVED_MIN_RATIO) : ratio;
}
/**
* Returns the reserved non-heap memory size in MB for the application master.
*/
public int getAMReservedMemory() {
- return config.containsKey(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB) ?
- Integer.parseInt(config.get(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB)) :
- Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB;
+ return getReservedMemory(config, Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB);
}
/**
* Returns the reserved non-heap memory size in MB for the given runnable.
*/
public int getReservedMemory(String runnableName) {
- Map<String, String> conf = runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config;
- return conf.containsKey(Configs.Keys.JAVA_RESERVED_MEMORY_MB) ?
- Integer.parseInt(conf.get(Configs.Keys.JAVA_RESERVED_MEMORY_MB)) :
- Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
+ int memory = getReservedMemory(runnableConfigs.get(runnableName), -1);
+ return memory < 0 ? getReservedMemory(config, Configs.Defaults.JAVA_RESERVED_MEMORY_MB) : memory;
}
/**
@@ -171,9 +168,27 @@ public class TwillRuntimeSpecification {
/**
* Returns the minimum heap ratio ({@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}) based on the given configuration.
*/
- private double getMinHeapRatio(Map<String, String> config) {
- return config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO) ?
- Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) :
- Configs.Defaults.HEAP_RESERVED_MIN_RATIO;
+ private double getMinHeapRatio(@Nullable Map<String, String> config, double defaultValue) {
+ if (config == null) {
+ return defaultValue;
+ }
+
+ double ratio = config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO) ?
+ Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) : defaultValue;
+ // Ratio can't be <= 0
+ return ratio <= 0d ? defaultValue : ratio;
+ }
+
+ /**
+ * Returns the reserved memory size ({@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}) based on the given configuration.
+ */
+ private int getReservedMemory(@Nullable Map<String, String> config, int defaultValue) {
+ if (config == null) {
+ return defaultValue;
+ }
+ int memory = config.containsKey(Configs.Keys.JAVA_RESERVED_MEMORY_MB) ?
+ Integer.parseInt(config.get(Configs.Keys.JAVA_RESERVED_MEMORY_MB)) : defaultValue;
+ // memory size can't be -ve
+ return memory < 0 ? defaultValue : memory;
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/f9d5a596/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index 73f1476..10dcaf5 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -69,14 +69,13 @@ public class ContainerSizeTestRun extends BaseYarnTest {
@Test
public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException {
TwillRunner runner = getTwillRunner();
- String runnableName = "sleep";
TwillController controller = runner.prepare(new MaxHeapApp())
// Alter the AM container size and heap ratio
.withConfiguration(ImmutableMap.of(Configs.Keys.YARN_AM_MEMORY_MB, "256",
Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.65"))
// Use a different heap ratio and reserved memory size for the runnable
- .withConfiguration(runnableName,
+ .withConfiguration("sleep",
ImmutableMap.of(Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.8",
Configs.Keys.JAVA_RESERVED_MEMORY_MB, "1024"))
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
@@ -94,12 +93,20 @@ public class ContainerSizeTestRun extends BaseYarnTest {
resourceReport.getAppMasterResources().getMaxHeapMemoryMB());
// Verify the runnable container heap size
- Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources(runnableName);
+ Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources("sleep");
Assert.assertFalse(runnableResources.isEmpty());
TwillRunResources resources = runnableResources.iterator().next();
Assert.assertEquals(Resources.computeMaxHeapSize(resources.getMemoryMB(), 1024, 0.8d),
resources.getMaxHeapMemoryMB());
+ // For the sleep2 runnable, we don't set any ratio and reserved memory.
+ // The ratio should get default to 0.65 (app) and reserved memory to 200
+ runnableResources = resourceReport.getRunnableResources("sleep2");
+ Assert.assertFalse(runnableResources.isEmpty());
+ resources = runnableResources.iterator().next();
+ Assert.assertEquals(
+ Resources.computeMaxHeapSize(resources.getMemoryMB(), Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB, 0.65d),
+ resources.getMaxHeapMemoryMB());
} finally {
controller.terminate().get(120, TimeUnit.SECONDS);
}
@@ -181,6 +188,7 @@ public class ContainerSizeTestRun extends BaseYarnTest {
.setName("MaxHeapApp")
.withRunnable()
.add("sleep", new MaxHeapRunnable(12345), res).noLocalFiles()
+ .add("sleep2", new MaxHeapRunnable(23456), res).noLocalFiles()
.anyOrder()
.build();
}