You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/09/26 21:23:03 UTC
[storm] branch 2.1.x-branch updated: STORM-3506 prevent topo conf
from overriding some system properties (#3125)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/2.1.x-branch by this push:
new 5de9e09 STORM-3506 prevent topo conf from overriding some system properties (#3125)
5de9e09 is described below
commit 5de9e0951815adfed26deae6a293a23360b12f97
Author: agresch <ag...@gmail.com>
AuthorDate: Thu Sep 26 16:10:23 2019 -0500
STORM-3506 prevent topo conf from overriding some system properties (#3125)
---
.../org/apache/storm/daemon/nimbus/Nimbus.java | 23 ++++++++++++++++++++++
1 file changed, 23 insertions(+)
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 96d6c21..6c8734c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1086,6 +1086,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
@SuppressWarnings("unchecked")
+ /**
+ * Create a normalized topology conf.
+ *
+ * @param conf the nimbus conf
+ * @param topoConf initial topology conf
+ * @param topology the Storm topology
+ */
private static Map<String, Object> normalizeConf(Map<String, Object> conf, Map<String, Object> topoConf, StormTopology topology) {
//ensure that serializations are same for all tasks no matter what's on
// the supervisors. this also allows you to declare the serializations as a sequence
@@ -1113,6 +1120,21 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
ret.put(Config.TOPOLOGY_ACKER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
+
+ // Don't allow topoConf to override various cluster-specific properties.
+ // Specifically adding the cluster settings to the topoConf here will make sure these settings
+ // also override the subsequently generated conf picked up locally on the classpath.
+ //
+ // We will be dealing with 3 confs:
+ // 1) the submitted topoConf created here
+ // 2) the combined classpath conf with the topoConf added on top
+ // 3) the nimbus conf with conf 2 above added on top.
+ //
+ // By first forcing the topology conf to contain the nimbus settings, we guarantee all three confs
+ // will have the correct settings that cannot be overriden by the submitter.
+ ret.put(Config.STORM_CGROUP_HIERARCHY_DIR, conf.get(Config.STORM_CGROUP_HIERARCHY_DIR));
+ ret.put(Config.WORKER_METRICS, conf.get(Config.WORKER_METRICS));
+
return ret;
}
@@ -3042,6 +3064,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (!(Boolean) conf.getOrDefault(DaemonConfig.STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED, false)) {
topoConf.remove(Config.TOPOLOGY_CLASSPATH_BEGINNING);
}
+
String topoVersionString = topology.get_storm_version();
if (topoVersionString == null) {
topoVersionString = (String) conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion());