You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/01/24 22:37:07 UTC

[storm] branch 2.1.x-branch updated: [STORM-3567] fix UI showing wrong resource info when topo is not scheduled

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/2.1.x-branch by this push:
     new 15e0d4a  [STORM-3567] fix UI showing wrong resource info when topo is not scheduled
     new 3c2f700  Merge pull request #3199 from RuiLi8080/storm-3567-2.1
15e0d4a is described below

commit 15e0d4a9cd8d93a0ef6e57796ac110cd69847eff
Author: Rui Li <ru...@verizonmedia.com>
AuthorDate: Thu Jan 16 14:35:17 2020 -0600

    [STORM-3567] fix UI showing wrong resource info when topo is not scheduled
---
 .../org/apache/storm/daemon/nimbus/Nimbus.java     | 67 ++++++++++++++--------
 1 file changed, 44 insertions(+), 23 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index d9dd550..37bd01a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -4059,7 +4059,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             Map<List<Integer>, Map<String, Object>> beats = common.beats;
             Map<Integer, String> taskToComp = common.taskToComponent;
             StormTopology topology = common.topology;
-            Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
             StormBase base = common.base;
             if (base == null) {
                 throw new WrappedNotAliveException(topoId);
@@ -4103,21 +4102,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 topoPageInfo.set_storm_version(topology.get_storm_version());
             }
 
-            Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
-            for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
-                CommonAggregateStats commonStats = entry.getValue().get_common_stats();
-                setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
-                commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
-            }
-            maybeAddPlaceholderSpoutAggStats(topoPageInfo, topology);
 
-            Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
-            for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
-                CommonAggregateStats commonStats = entry.getValue().get_common_stats();
-                setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
-                commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
-            }
-            maybeAddPlaceholderBoltAggStats(topoPageInfo, topology, includeSys);
+            Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
+
+            addSpoutAggStats(topoPageInfo, topology, topoConf);
+            addBoltAggStats(topoPageInfo, topology, topoConf, includeSys);
 
             if (workerSummaries != null) {
                 topoPageInfo.set_workers(workerSummaries);
@@ -4173,20 +4162,31 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     /**
-     * Add placeholder AggStats allowing topology page to show components before AggStats are populated.
+     * If aggStats are not populated, compute common and component(spout) agg and create placeholder stat.
+     * This allow the topology page to show component spec even the topo is not scheduled.
+     * Otherwise, just fetch data from current topoPageInfo.
      *
      * @param topoPageInfo topology page info holding spout AggStats
      * @param topology     storm topology used to get spout names
+     * @param topoConf     storm topology config
      */
-    private void maybeAddPlaceholderSpoutAggStats(TopologyPageInfo topoPageInfo, StormTopology topology) {
+    private void addSpoutAggStats(TopologyPageInfo topoPageInfo, StormTopology topology, Map<String, Object> topoConf) {
+        Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
+
+        // if agg stats were not populated yet, create placeholder
         if (topoPageInfo.get_id_to_spout_agg_stats().isEmpty()) {
             for (Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+                String spoutName = entry.getKey();
+                SpoutSpec spoutSpec = entry.getValue();
+
                 // component
                 ComponentAggregateStats placeholderComponentStats = new ComponentAggregateStats();
                 placeholderComponentStats.set_type(ComponentType.SPOUT);
 
                 // common aggregate
-                CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(entry.getValue());
+                CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(spoutSpec);
+                commonStats.set_resources_map(spoutResources.getOrDefault(spoutName, new NormalizedResourceRequest())
+                            .toNormalizedMap());
                 placeholderComponentStats.set_common_stats(commonStats);
 
                 // spout aggregate
@@ -4195,23 +4195,36 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 SpecificAggregateStats specificStats = new SpecificAggregateStats();
                 specificStats.set_spout(spoutAggStats);
                 placeholderComponentStats.set_specific_stats(specificStats);
-
-                topoPageInfo.get_id_to_spout_agg_stats().put(entry.getKey(), placeholderComponentStats);
+                topoPageInfo.get_id_to_spout_agg_stats().put(spoutName, placeholderComponentStats);
+            }
+        } else {
+            for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
+                CommonAggregateStats commonStats = entry.getValue().get_common_stats();
+                setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
+                commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
             }
         }
     }
 
     /**
-     * Add placeholder AggStats allowing topology page to show components before AggStats are populated.
+     * If aggStats are not populated, compute common and component(bolt) agg and create placeholder stat.
+     * This allow the topology page to show component spec even the topo is not scheduled.
+     * Otherwise, just fetch data from current topoPageInfo.
      *
      * @param topoPageInfo  topology page info holding bolt AggStats
      * @param topology      storm topology used to get bolt names
+     * @param topoConf      storm topology config
      * @param includeSys    whether to show system bolts
      */
-    private void maybeAddPlaceholderBoltAggStats(TopologyPageInfo topoPageInfo, StormTopology topology, boolean includeSys) {
+    private void addBoltAggStats(TopologyPageInfo topoPageInfo, StormTopology topology,
+                                                 Map<String, Object> topoConf, boolean includeSys) {
+        Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
+
+        // if agg stats were not populated yet, create placeholder
         if (topoPageInfo.get_id_to_bolt_agg_stats().isEmpty()) {
             for (Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
                 String boltName = entry.getKey();
+                Bolt bolt = entry.getValue();
                 if ((!includeSys && Utils.isSystemId(boltName)) || boltName.equals(Constants.SYSTEM_COMPONENT_ID)) {
                     continue;
                 }
@@ -4221,7 +4234,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 placeholderComponentStats.set_type(ComponentType.BOLT);
 
                 // common aggregate
-                CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(entry.getValue());
+                CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(bolt);
+                commonStats.set_resources_map(boltResources.getOrDefault(boltName, new NormalizedResourceRequest())
+                            .toNormalizedMap());
                 placeholderComponentStats.set_common_stats(commonStats);
 
                 // bolt aggregate
@@ -4236,6 +4251,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
 
                 topoPageInfo.get_id_to_bolt_agg_stats().put(boltName, placeholderComponentStats);
             }
+        } else {
+            for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
+                CommonAggregateStats commonStats = entry.getValue().get_common_stats();
+                setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
+                commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
+            }
         }
     }