You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/09/26 21:23:03 UTC

[storm] branch 2.1.x-branch updated: STORM-3506 prevent topo conf from overriding some system properties (#3125)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/2.1.x-branch by this push:
     new 5de9e09  STORM-3506 prevent topo conf from overriding some system properties (#3125)
5de9e09 is described below

commit 5de9e0951815adfed26deae6a293a23360b12f97
Author: agresch <ag...@gmail.com>
AuthorDate: Thu Sep 26 16:10:23 2019 -0500

    STORM-3506 prevent topo conf from overriding some system properties (#3125)
---
 .../org/apache/storm/daemon/nimbus/Nimbus.java     | 23 ++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 96d6c21..6c8734c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1086,6 +1086,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     @SuppressWarnings("unchecked")
+    /**
+     * Create a normalized topology conf.
+     *
+     * @param conf the nimbus conf
+     * @param topoConf initial topology conf
+     * @param topology the Storm topology
+     */
     private static Map<String, Object> normalizeConf(Map<String, Object> conf, Map<String, Object> topoConf, StormTopology topology) {
         //ensure that serializations are same for all tasks no matter what's on
         // the supervisors. this also allows you to declare the serializations as a sequence
@@ -1113,6 +1120,21 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         ret.put(Config.TOPOLOGY_ACKER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
         ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
         ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
+
+        // Don't allow topoConf to override various cluster-specific properties.
+        // Specifically adding the cluster settings to the topoConf here will make sure these settings
+        // also override the subsequently generated conf picked up locally on the classpath.
+        //
+        // We will be dealing with 3 confs:
+        // 1) the submitted topoConf created here
+        // 2) the combined classpath conf with the topoConf added on top
+        // 3) the nimbus conf with conf 2 above added on top.
+        //
+        // By first forcing the topology conf to contain the nimbus settings, we guarantee all three confs
+        // will have the correct settings that cannot be overriden by the submitter.
+        ret.put(Config.STORM_CGROUP_HIERARCHY_DIR, conf.get(Config.STORM_CGROUP_HIERARCHY_DIR));
+        ret.put(Config.WORKER_METRICS, conf.get(Config.WORKER_METRICS));
+
         return ret;
     }
 
@@ -3042,6 +3064,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             if (!(Boolean) conf.getOrDefault(DaemonConfig.STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED, false)) {
                 topoConf.remove(Config.TOPOLOGY_CLASSPATH_BEGINNING);
             }
+
             String topoVersionString = topology.get_storm_version();
             if (topoVersionString == null) {
                 topoVersionString = (String) conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion());