You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/06/09 21:35:55 UTC

[storm] branch master updated: STORM-3647: Adds OFF HEAP to worker child opts

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 712af2f  STORM-3647: Adds OFF HEAP to worker child opts
     new 15028d0  Merge pull request #3281 from govind-menon/master
712af2f is described below

commit 712af2f60cf8cc52156cf42a6b13ea1b53482133
Author: Govind Menon <go...@gmail.com>
AuthorDate: Thu Jun 4 13:26:24 2020 -0500

    STORM-3647: Adds OFF HEAP to worker child opts
---
 .../storm/daemon/supervisor/BasicContainer.java    | 40 +++++++++++++++-------
 .../daemon/supervisor/BasicContainerTest.java      | 35 +++++++++++--------
 2 files changed, 48 insertions(+), 27 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index 4db6f3f..4d1609c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -444,7 +444,7 @@ public class BasicContainer extends Container {
         return CPJ.join(workercp);
     }
 
-    private String substituteChildOptsInternal(String string, int memOnheap) {
+    private String substituteChildOptsInternal(String string, int memOnheap, int memOffheap) {
         if (StringUtils.isNotBlank(string)) {
             String p = String.valueOf(port);
             string = string.replace("%ID%", p);
@@ -454,6 +454,9 @@ public class BasicContainer extends Container {
             if (memOnheap > 0) {
                 string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
             }
+            if (memOffheap > 0) {
+                string = string.replace("%OFF-HEAP-MEM%", String.valueOf(memOffheap));
+            }
             if (memoryLimitMb > 0) {
                 string = string.replace("%LIMIT-MEM%", String.valueOf(memoryLimitMb));
             }
@@ -462,13 +465,13 @@ public class BasicContainer extends Container {
     }
 
     protected List<String> substituteChildopts(Object value) {
-        return substituteChildopts(value, -1);
+        return substituteChildopts(value, -1, -1);
     }
 
-    protected List<String> substituteChildopts(Object value, int memOnheap) {
+    protected List<String> substituteChildopts(Object value, int memOnheap, int memOffHeap) {
         List<String> rets = new ArrayList<>();
         if (value instanceof String) {
-            String string = substituteChildOptsInternal((String) value, memOnheap);
+            String string = substituteChildOptsInternal((String) value, memOnheap, memOffHeap);
             if (StringUtils.isNotBlank(string)) {
                 String[] strings = string.split("\\s+");
                 for (String s : strings) {
@@ -481,7 +484,7 @@ public class BasicContainer extends Container {
             @SuppressWarnings("unchecked")
             List<String> objects = (List<String>) value;
             for (String object : objects) {
-                String str = substituteChildOptsInternal(object, memOnheap);
+                String str = substituteChildOptsInternal(object, memOnheap, memOffHeap);
                 if (StringUtils.isNotBlank(str)) {
                     rets.add(str);
                 }
@@ -586,10 +589,20 @@ public class BasicContainer extends Container {
         return memOnheap;
     }
 
-    private List<String> getWorkerProfilerChildOpts(int memOnheap) {
+    private int getMemOffHeap(WorkerResources resources) {
+        int memOffheap = 0;
+        if (resources != null && resources.is_set_mem_off_heap() && resources.get_mem_off_heap() > 0) {
+            memOffheap = (int) Math.ceil(resources.get_mem_off_heap());
+        }
+        return memOffheap;
+    }
+
+    private List<String> getWorkerProfilerChildOpts(int memOnheap, int memOffheap) {
         List<String> workerProfilerChildopts = new ArrayList<>();
         if (ObjectReader.getBoolean(conf.get(DaemonConfig.WORKER_PROFILER_ENABLED), false)) {
-            workerProfilerChildopts = substituteChildopts(conf.get(DaemonConfig.WORKER_PROFILER_CHILDOPTS), memOnheap);
+            workerProfilerChildopts = substituteChildopts(
+                    conf.get(DaemonConfig.WORKER_PROFILER_CHILDOPTS), memOnheap, memOffheap
+            );
         }
         return workerProfilerChildopts;
     }
@@ -615,7 +628,7 @@ public class BasicContainer extends Container {
      *
      * @throws IOException on any error.
      */
-    private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,
+    private List<String> mkLaunchCommand(final int memOnheap, final int memOffheap, final String stormRoot,
                                          final String jlp, final String numaId) throws IOException {
         final String javaCmd = javaCmd("java");
         final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
@@ -653,12 +666,12 @@ public class BasicContainer extends Container {
         commandList.add("-server");
         commandList.addAll(commonParams);
         commandList.add("-Dlog4j.configurationFile=" + workerLog4jConfig);
-        commandList.addAll(substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), memOnheap));
-        commandList.addAll(substituteChildopts(topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap));
+        commandList.addAll(substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), memOnheap, memOffheap));
+        commandList.addAll(substituteChildopts(topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap, memOffheap));
         commandList.addAll(substituteChildopts(Utils.OR(
             topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
-            conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
-        commandList.addAll(getWorkerProfilerChildOpts(memOnheap));
+            conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap, memOffheap));
+        commandList.addAll(getWorkerProfilerChildOpts(memOnheap, memOffheap));
         commandList.add("-Djava.library.path=" + jlp);
         commandList.add("-Dstorm.conf.file=" + topoConfFile);
         commandList.add("-Dstorm.options=" + stormOptions);
@@ -833,6 +846,7 @@ public class BasicContainer extends Container {
 
         final WorkerResources resources = assignment.get_resources();
         final int memOnHeap = getMemOnHeap(resources);
+        final int memOffHeap = getMemOffHeap(resources);
         memoryLimitMb = calculateMemoryLimit(resources, memOnHeap);
         final String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, topologyId);
         String jlp = javaLibraryPath(stormRoot, conf);
@@ -857,7 +871,7 @@ public class BasicContainer extends Container {
             resourceIsolationManager.reserveResourcesForWorker(workerId, (int) memoryLimitMb, cpu, numaId);
         }
 
-        List<String> commandList = mkLaunchCommand(memOnHeap, stormRoot, jlp, numaId);
+        List<String> commandList = mkLaunchCommand(memOnHeap, memOffHeap, stormRoot, jlp, numaId);
 
         LOG.info("Launching worker with command: {}. ", ServerUtils.shellCmd(commandList));
 
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index e8bb376..cb95229 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -599,6 +599,7 @@ public class BasicContainerTest {
         int supervisorPort = 6628;
         int port = 9999;
         int memOnheap = 512;
+        int memOffheap = 256;
 
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
@@ -617,18 +618,24 @@ public class BasicContainerTest {
         assertListEquals(Arrays.asList(
             "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
             "-Xms256m",
-            "-Xmx512m"),
-                         mc.substituteChildopts(
-                             "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m",
-                             memOnheap));
-
-        assertListEquals(Arrays.asList(
-            "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
-            "-Xms256m",
-            "-Xmx512m"),
-                         mc.substituteChildopts(Arrays.asList(
-                             "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log", "-Xms256m",
-                             "-Xmx%HEAP-MEM%m"), memOnheap));
+                "-Xmx512m", "-XX:MaxDirectMemorySize=256m"),
+                mc.substituteChildopts(
+                        "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m -XX:MaxDirectMemorySize=%OFF-HEAP-MEM%m",
+                        memOnheap, memOffheap));
+
+        assertListEquals(
+                Arrays.asList(
+                        "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
+                        "-Xms256m",
+                        "-Xmx512m"
+                ),
+                mc.substituteChildopts(
+                        Arrays.asList(
+                        "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log", "-Xms256m",
+                        "-Xmx%HEAP-MEM%m"
+                        ), memOnheap, memOffheap
+                )
+        );
 
         assertListEquals(Collections.emptyList(),
                          mc.substituteChildopts(null));
@@ -672,8 +679,8 @@ public class BasicContainerTest {
         }
 
         @Override
-        public List<String> substituteChildopts(Object value, int memOnheap) {
-            return super.substituteChildopts(value, memOnheap);
+        public List<String> substituteChildopts(Object value, int memOnheap, int memOffHeap) {
+            return super.substituteChildopts(value, memOnheap, memOffHeap);
         }
 
         @Override