You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/09/26 21:10:36 UTC
[storm] branch master updated: STORM-3506 prevent topo conf from
overriding some system properties (#3125)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new a2a2028 STORM-3506 prevent topo conf from overriding some system properties (#3125)
a2a2028 is described below
commit a2a202805eadf1bf5c6761e4304f0afc961da163
Author: agresch <ag...@gmail.com>
AuthorDate: Thu Sep 26 16:10:23 2019 -0500
STORM-3506 prevent topo conf from overriding some system properties (#3125)
---
.../org/apache/storm/daemon/nimbus/Nimbus.java | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 69b2bb1..d2abeb0 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1086,6 +1086,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
@SuppressWarnings("unchecked")
+ /**
+ * Create a normalized topology conf.
+ *
+ * @param conf the nimbus conf
+ * @param topoConf initial topology conf
+ * @param topology the Storm topology
+ */
private static Map<String, Object> normalizeConf(Map<String, Object> conf, Map<String, Object> topoConf, StormTopology topology) {
//ensure that serializations are same for all tasks no matter what's on
// the supervisors. this also allows you to declare the serializations as a sequence
@@ -1114,6 +1121,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
+ // Don't allow topoConf to override various cluster-specific properties.
+ // Specifically adding the cluster settings to the topoConf here will make sure these settings
+ // also override the subsequently generated conf picked up locally on the classpath.
+ //
+ // We will be dealing with 3 confs:
+ // 1) the submitted topoConf created here
+ // 2) the combined classpath conf with the topoConf added on top
+ // 3) the nimbus conf with conf 2 above added on top.
+ //
+ // By first forcing the topology conf to contain the nimbus settings, we guarantee all three confs
+ // will have the correct settings that cannot be overriden by the submitter.
+ ret.put(Config.STORM_CGROUP_HIERARCHY_DIR, conf.get(Config.STORM_CGROUP_HIERARCHY_DIR));
+ ret.put(Config.WORKER_METRICS, conf.get(Config.WORKER_METRICS));
+
if (mergedConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) {
int workerTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS));
int workerMaxTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.WORKER_MAX_TIMEOUT_SECS));
@@ -3100,6 +3121,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (!(Boolean) conf.getOrDefault(DaemonConfig.STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED, false)) {
topoConf.remove(Config.TOPOLOGY_CLASSPATH_BEGINNING);
}
+
String topoVersionString = topology.get_storm_version();
if (topoVersionString == null) {
topoVersionString = (String) conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion());