You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/06/09 21:35:55 UTC
[storm] branch master updated: STORM-3647: Adds OFF HEAP to worker
child opts
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 712af2f STORM-3647: Adds OFF HEAP to worker child opts
new 15028d0 Merge pull request #3281 from govind-menon/master
712af2f is described below
commit 712af2f60cf8cc52156cf42a6b13ea1b53482133
Author: Govind Menon <go...@gmail.com>
AuthorDate: Thu Jun 4 13:26:24 2020 -0500
STORM-3647: Adds OFF HEAP to worker child opts
---
.../storm/daemon/supervisor/BasicContainer.java | 40 +++++++++++++++-------
.../daemon/supervisor/BasicContainerTest.java | 35 +++++++++++--------
2 files changed, 48 insertions(+), 27 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index 4db6f3f..4d1609c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -444,7 +444,7 @@ public class BasicContainer extends Container {
return CPJ.join(workercp);
}
- private String substituteChildOptsInternal(String string, int memOnheap) {
+ private String substituteChildOptsInternal(String string, int memOnheap, int memOffheap) {
if (StringUtils.isNotBlank(string)) {
String p = String.valueOf(port);
string = string.replace("%ID%", p);
@@ -454,6 +454,9 @@ public class BasicContainer extends Container {
if (memOnheap > 0) {
string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
}
+ if (memOffheap > 0) {
+ string = string.replace("%OFF-HEAP-MEM%", String.valueOf(memOffheap));
+ }
if (memoryLimitMb > 0) {
string = string.replace("%LIMIT-MEM%", String.valueOf(memoryLimitMb));
}
@@ -462,13 +465,13 @@ public class BasicContainer extends Container {
}
protected List<String> substituteChildopts(Object value) {
- return substituteChildopts(value, -1);
+ return substituteChildopts(value, -1, -1);
}
- protected List<String> substituteChildopts(Object value, int memOnheap) {
+ protected List<String> substituteChildopts(Object value, int memOnheap, int memOffHeap) {
List<String> rets = new ArrayList<>();
if (value instanceof String) {
- String string = substituteChildOptsInternal((String) value, memOnheap);
+ String string = substituteChildOptsInternal((String) value, memOnheap, memOffHeap);
if (StringUtils.isNotBlank(string)) {
String[] strings = string.split("\\s+");
for (String s : strings) {
@@ -481,7 +484,7 @@ public class BasicContainer extends Container {
@SuppressWarnings("unchecked")
List<String> objects = (List<String>) value;
for (String object : objects) {
- String str = substituteChildOptsInternal(object, memOnheap);
+ String str = substituteChildOptsInternal(object, memOnheap, memOffHeap);
if (StringUtils.isNotBlank(str)) {
rets.add(str);
}
@@ -586,10 +589,20 @@ public class BasicContainer extends Container {
return memOnheap;
}
- private List<String> getWorkerProfilerChildOpts(int memOnheap) {
+ private int getMemOffHeap(WorkerResources resources) {
+ int memOffheap = 0;
+ if (resources != null && resources.is_set_mem_off_heap() && resources.get_mem_off_heap() > 0) {
+ memOffheap = (int) Math.ceil(resources.get_mem_off_heap());
+ }
+ return memOffheap;
+ }
+
+ private List<String> getWorkerProfilerChildOpts(int memOnheap, int memOffheap) {
List<String> workerProfilerChildopts = new ArrayList<>();
if (ObjectReader.getBoolean(conf.get(DaemonConfig.WORKER_PROFILER_ENABLED), false)) {
- workerProfilerChildopts = substituteChildopts(conf.get(DaemonConfig.WORKER_PROFILER_CHILDOPTS), memOnheap);
+ workerProfilerChildopts = substituteChildopts(
+ conf.get(DaemonConfig.WORKER_PROFILER_CHILDOPTS), memOnheap, memOffheap
+ );
}
return workerProfilerChildopts;
}
@@ -615,7 +628,7 @@ public class BasicContainer extends Container {
*
* @throws IOException on any error.
*/
- private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,
+ private List<String> mkLaunchCommand(final int memOnheap, final int memOffheap, final String stormRoot,
final String jlp, final String numaId) throws IOException {
final String javaCmd = javaCmd("java");
final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
@@ -653,12 +666,12 @@ public class BasicContainer extends Container {
commandList.add("-server");
commandList.addAll(commonParams);
commandList.add("-Dlog4j.configurationFile=" + workerLog4jConfig);
- commandList.addAll(substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), memOnheap));
- commandList.addAll(substituteChildopts(topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap));
+ commandList.addAll(substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), memOnheap, memOffheap));
+ commandList.addAll(substituteChildopts(topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap, memOffheap));
commandList.addAll(substituteChildopts(Utils.OR(
topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
- conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
- commandList.addAll(getWorkerProfilerChildOpts(memOnheap));
+ conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap, memOffheap));
+ commandList.addAll(getWorkerProfilerChildOpts(memOnheap, memOffheap));
commandList.add("-Djava.library.path=" + jlp);
commandList.add("-Dstorm.conf.file=" + topoConfFile);
commandList.add("-Dstorm.options=" + stormOptions);
@@ -833,6 +846,7 @@ public class BasicContainer extends Container {
final WorkerResources resources = assignment.get_resources();
final int memOnHeap = getMemOnHeap(resources);
+ final int memOffHeap = getMemOffHeap(resources);
memoryLimitMb = calculateMemoryLimit(resources, memOnHeap);
final String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, topologyId);
String jlp = javaLibraryPath(stormRoot, conf);
@@ -857,7 +871,7 @@ public class BasicContainer extends Container {
resourceIsolationManager.reserveResourcesForWorker(workerId, (int) memoryLimitMb, cpu, numaId);
}
- List<String> commandList = mkLaunchCommand(memOnHeap, stormRoot, jlp, numaId);
+ List<String> commandList = mkLaunchCommand(memOnHeap, memOffHeap, stormRoot, jlp, numaId);
LOG.info("Launching worker with command: {}. ", ServerUtils.shellCmd(commandList));
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index e8bb376..cb95229 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -599,6 +599,7 @@ public class BasicContainerTest {
int supervisorPort = 6628;
int port = 9999;
int memOnheap = 512;
+ int memOffheap = 256;
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
@@ -617,18 +618,24 @@ public class BasicContainerTest {
assertListEquals(Arrays.asList(
"-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
"-Xms256m",
- "-Xmx512m"),
- mc.substituteChildopts(
- "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m",
- memOnheap));
-
- assertListEquals(Arrays.asList(
- "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
- "-Xms256m",
- "-Xmx512m"),
- mc.substituteChildopts(Arrays.asList(
- "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log", "-Xms256m",
- "-Xmx%HEAP-MEM%m"), memOnheap));
+ "-Xmx512m", "-XX:MaxDirectMemorySize=256m"),
+ mc.substituteChildopts(
+ "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m -XX:MaxDirectMemorySize=%OFF-HEAP-MEM%m",
+ memOnheap, memOffheap));
+
+ assertListEquals(
+ Arrays.asList(
+ "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
+ "-Xms256m",
+ "-Xmx512m"
+ ),
+ mc.substituteChildopts(
+ Arrays.asList(
+ "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log", "-Xms256m",
+ "-Xmx%HEAP-MEM%m"
+ ), memOnheap, memOffheap
+ )
+ );
assertListEquals(Collections.emptyList(),
mc.substituteChildopts(null));
@@ -672,8 +679,8 @@ public class BasicContainerTest {
}
@Override
- public List<String> substituteChildopts(Object value, int memOnheap) {
- return super.substituteChildopts(value, memOnheap);
+ public List<String> substituteChildopts(Object value, int memOnheap, int memOffHeap) {
+ return super.substituteChildopts(value, memOnheap, memOffHeap);
}
@Override