You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/01/24 22:37:07 UTC
[storm] branch 2.1.x-branch updated: [STORM-3567] fix UI showing
wrong resource info when topo is not scheduled
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/2.1.x-branch by this push:
new 15e0d4a [STORM-3567] fix UI showing wrong resource info when topo is not scheduled
new 3c2f700 Merge pull request #3199 from RuiLi8080/storm-3567-2.1
15e0d4a is described below
commit 15e0d4a9cd8d93a0ef6e57796ac110cd69847eff
Author: Rui Li <ru...@verizonmedia.com>
AuthorDate: Thu Jan 16 14:35:17 2020 -0600
[STORM-3567] fix UI showing wrong resource info when topo is not scheduled
---
.../org/apache/storm/daemon/nimbus/Nimbus.java | 67 ++++++++++++++--------
1 file changed, 44 insertions(+), 23 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index d9dd550..37bd01a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -4059,7 +4059,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
Map<List<Integer>, Map<String, Object>> beats = common.beats;
Map<Integer, String> taskToComp = common.taskToComponent;
StormTopology topology = common.topology;
- Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
StormBase base = common.base;
if (base == null) {
throw new WrappedNotAliveException(topoId);
@@ -4103,21 +4102,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
topoPageInfo.set_storm_version(topology.get_storm_version());
}
- Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
- for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
- CommonAggregateStats commonStats = entry.getValue().get_common_stats();
- setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
- commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
- }
- maybeAddPlaceholderSpoutAggStats(topoPageInfo, topology);
- Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
- for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
- CommonAggregateStats commonStats = entry.getValue().get_common_stats();
- setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
- commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
- }
- maybeAddPlaceholderBoltAggStats(topoPageInfo, topology, includeSys);
+ Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
+
+ addSpoutAggStats(topoPageInfo, topology, topoConf);
+ addBoltAggStats(topoPageInfo, topology, topoConf, includeSys);
if (workerSummaries != null) {
topoPageInfo.set_workers(workerSummaries);
@@ -4173,20 +4162,31 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
/**
- * Add placeholder AggStats allowing topology page to show components before AggStats are populated.
+ * If aggStats are not populated, compute common and component(spout) agg and create placeholder stat.
+ * This allow the topology page to show component spec even the topo is not scheduled.
+ * Otherwise, just fetch data from current topoPageInfo.
*
* @param topoPageInfo topology page info holding spout AggStats
* @param topology storm topology used to get spout names
+ * @param topoConf storm topology config
*/
- private void maybeAddPlaceholderSpoutAggStats(TopologyPageInfo topoPageInfo, StormTopology topology) {
+ private void addSpoutAggStats(TopologyPageInfo topoPageInfo, StormTopology topology, Map<String, Object> topoConf) {
+ Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
+
+ // if agg stats were not populated yet, create placeholder
if (topoPageInfo.get_id_to_spout_agg_stats().isEmpty()) {
for (Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+ String spoutName = entry.getKey();
+ SpoutSpec spoutSpec = entry.getValue();
+
// component
ComponentAggregateStats placeholderComponentStats = new ComponentAggregateStats();
placeholderComponentStats.set_type(ComponentType.SPOUT);
// common aggregate
- CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(entry.getValue());
+ CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(spoutSpec);
+ commonStats.set_resources_map(spoutResources.getOrDefault(spoutName, new NormalizedResourceRequest())
+ .toNormalizedMap());
placeholderComponentStats.set_common_stats(commonStats);
// spout aggregate
@@ -4195,23 +4195,36 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
SpecificAggregateStats specificStats = new SpecificAggregateStats();
specificStats.set_spout(spoutAggStats);
placeholderComponentStats.set_specific_stats(specificStats);
-
- topoPageInfo.get_id_to_spout_agg_stats().put(entry.getKey(), placeholderComponentStats);
+ topoPageInfo.get_id_to_spout_agg_stats().put(spoutName, placeholderComponentStats);
+ }
+ } else {
+ for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
+ CommonAggregateStats commonStats = entry.getValue().get_common_stats();
+ setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
+ commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
}
}
}
/**
- * Add placeholder AggStats allowing topology page to show components before AggStats are populated.
+ * If aggStats are not populated, compute common and component(bolt) agg and create placeholder stat.
+ * This allow the topology page to show component spec even the topo is not scheduled.
+ * Otherwise, just fetch data from current topoPageInfo.
*
* @param topoPageInfo topology page info holding bolt AggStats
* @param topology storm topology used to get bolt names
+ * @param topoConf storm topology config
* @param includeSys whether to show system bolts
*/
- private void maybeAddPlaceholderBoltAggStats(TopologyPageInfo topoPageInfo, StormTopology topology, boolean includeSys) {
+ private void addBoltAggStats(TopologyPageInfo topoPageInfo, StormTopology topology,
+ Map<String, Object> topoConf, boolean includeSys) {
+ Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
+
+ // if agg stats were not populated yet, create placeholder
if (topoPageInfo.get_id_to_bolt_agg_stats().isEmpty()) {
for (Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
String boltName = entry.getKey();
+ Bolt bolt = entry.getValue();
if ((!includeSys && Utils.isSystemId(boltName)) || boltName.equals(Constants.SYSTEM_COMPONENT_ID)) {
continue;
}
@@ -4221,7 +4234,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
placeholderComponentStats.set_type(ComponentType.BOLT);
// common aggregate
- CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(entry.getValue());
+ CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(bolt);
+ commonStats.set_resources_map(boltResources.getOrDefault(boltName, new NormalizedResourceRequest())
+ .toNormalizedMap());
placeholderComponentStats.set_common_stats(commonStats);
// bolt aggregate
@@ -4236,6 +4251,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
topoPageInfo.get_id_to_bolt_agg_stats().put(boltName, placeholderComponentStats);
}
+ } else {
+ for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
+ CommonAggregateStats commonStats = entry.getValue().get_common_stats();
+ setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
+ commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
+ }
}
}