You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/19 18:21:39 UTC
[1/3] storm git commit: STORM-3147: Port ClusterSummary to
StormMetricsRegistry
Repository: storm
Updated Branches:
refs/heads/master 6d0f2ebd2 -> 7ec618c10
STORM-3147: Port ClusterSummary to StormMetricsRegistry
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02428594
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02428594
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02428594
Branch: refs/heads/master
Commit: 02428594b53801572f657f4d0bff85b2213b3723
Parents: 8c90f12
Author: Zhengdai Hu <hu...@gmail.com>
Authored: Fri Jul 13 14:22:20 2018 -0500
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Sep 17 21:27:33 2018 +0200
----------------------------------------------------------------------
.../org/apache/storm/daemon/nimbus/Nimbus.java | 291 +++++++++++++++++--
1 file changed, 260 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/02428594/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 5c48271..36eca99 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -18,8 +18,12 @@
package org.apache.storm.daemon.nimbus;
+import com.codahale.metrics.CachedGauge;
+import com.codahale.metrics.DerivativeGauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.codahale.metrics.Timer;
import java.io.File;
import java.io.FileInputStream;
@@ -50,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -595,6 +600,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
this.principalToLocal = ClientAuthUtils.getPrincipalToLocalPlugin(conf);
this.supervisorClasspaths = Collections.unmodifiableNavigableMap(
Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// We don't use the classpath part of this, so just an empty list
+ clusterMetricSet = new ClusterSummaryMetricSet();
}
// TOPOLOGY STATE TRANSITIONS
@@ -816,7 +822,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
LOG.info("Assign executors: {}", execToPort.keySet());
numAddedSlot += count;
numAddedExec += execToPort.size();
- }
+ }
} else if (newAssignments.isEmpty()) {
for (Entry<String, Assignment> entry : existingAssignments.entrySet()) {
final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
@@ -825,7 +831,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
LOG.info("Remove executors: {}", execToPort.keySet());
numRemovedSlot += count;
numRemovedExec += execToPort.size();
- }
+ }
} else {
MapDifference<String, Assignment> difference = Maps.difference(existingAssignments, newAssignments);
if (anyChanged = !difference.areEqual()) {
@@ -859,8 +865,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (execEntry.getValue().equals(oldExecToSlot.get(execEntry.getKey()))) {
commonExecCount++;
commonSlots.add(execEntry.getValue());
- }
- }
+ }
+ }
long commonSlotCount = commonSlots.size();
//Treat reassign as remove and add
@@ -868,7 +874,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
numRemovedExec += oldExecToSlot.size() - commonExecCount;
numAddedSlot += slots.size() - commonSlotCount;
numAddedExec += execToSlot.size() - commonExecCount;
- }
+ }
}
LOG.debug("{} assignments unchanged: {}", difference.entriesInCommon().size(), difference.entriesInCommon().keySet());
}
@@ -881,12 +887,21 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (anyChanged) {
LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu());
- nodeIdToResources.get().forEach((id, node) ->
+ nodeIdToResources.get().forEach((id, node) -> {
+ final double availableMem = node.getAvailableMem();
+ if (availableMem < 0) {
+ LOG.warn("Memory over-scheduled on {}", id, availableMem);
+ }
+ final double availableCpu = node.getAvailableCpu();
+ if (availableCpu < 0) {
+ LOG.warn("CPU over-scheduled on {}", id, availableCpu);
+ }
LOG.info(
"Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
+ "CPU: {}, Available CPU: {}, fragmented: {}",
- id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
- node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node)));
+ id, node.getTotalMem(), node.getUsedMem(), availableMem,
+ node.getTotalCpu(), node.getUsedCpu(), availableCpu, isFragmented(node));
+ });
}
return anyChanged;
}
@@ -2277,9 +2292,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
execToNodePort = new HashMap<>();
}
Set<String> allNodes = new HashSet<>();
- for (List<Object> nodePort : execToNodePort.values()) {
- allNodes.add((String) nodePort.get(0));
- }
+ for (List<Object> nodePort : execToNodePort.values()) {
+ allNodes.add((String) nodePort.get(0));
+ }
Map<String, String> allNodeHost = new HashMap<>();
Assignment existingAssignment = existingAssignments.get(topoId);
if (existingAssignment != null) {
@@ -2713,8 +2728,16 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
ret.set_used_mem(resources.getUsedMem());
ret.set_used_cpu(resources.getUsedCpu());
if (isFragmented(resources)) {
- ret.set_fragmented_cpu(resources.getAvailableCpu());
- ret.set_fragmented_mem(resources.getAvailableMem());
+ final double availableCpu = resources.getAvailableCpu();
+ if (availableCpu < 0) {
+ LOG.warn("Negative fragmented CPU on {}", supervisorId);
+ }
+ ret.set_fragmented_cpu(availableCpu);
+ final double availableMem = resources.getAvailableMem();
+ if (availableMem < 0) {
+ LOG.warn("Negative fragmented Mem on {}", supervisorId);
+ }
+ ret.set_fragmented_mem(availableMem);
}
}
if (info.is_set_version()) {
@@ -2872,6 +2895,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
for (String topoId : state.activeStorms()) {
transition(topoId, TopologyActions.STARTUP, null);
}
+ clusterMetricSet.setActive(true);
}
final boolean doNotReassign = (Boolean) conf.getOrDefault(ServerConfigUtils.NIMBUS_DO_NOT_REASSIGN, false);
@@ -2922,19 +2946,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
});
- metricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size());
- metricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory);
- metricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu);
- metricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values().parallelStream()
- .mapToDouble(SupervisorResources::getAvailableMem)
+ StormMetricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values()
+ .parallelStream()
+ .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0))
.sum());
- metricsRegistry.registerGauge("nimbus:available-cpu", () -> nodeIdToResources.get().values().parallelStream()
- .mapToDouble(SupervisorResources::getAvailableCpu)
+ StormMetricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values()
+ .parallelStream()
+ .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableCpu(), 0))
.sum());
- metricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values().parallelStream()
+ StormMetricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values()
+ .parallelStream()
.mapToDouble(SupervisorResources::getTotalMem)
.sum());
- metricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values().parallelStream()
+ StormMetricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values()
+ .parallelStream()
.mapToDouble(SupervisorResources::getTotalCpu)
.sum());
metricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
@@ -2947,16 +2972,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
});
metricsRegistry.registerMeter("nimbus:num-launched").mark();
- timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
- () -> {
- try {
- if (isLeader()) {
- sendClusterMetricsToExecutors();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
+ () -> {
+ try {
+ if (isLeader()) {
+ sendClusterMetricsToExecutors();
}
- });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ //Should we make the delaySecs and recurSecs in sync with any conf value?
+ // They should be around the reporting interval, but it's not configurable
+ timer.scheduleRecurring(5, 5, clusterMetricSet);
} catch (Exception e) {
if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
throw e;
@@ -4632,6 +4661,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (metricsStore != null) {
metricsStore.close();
}
+ //Put after timer close to avoid race condition
+ clusterMetricSet.setActive(false);
LOG.info("Shut down master");
} catch (Exception e) {
throw new RuntimeException(e);
@@ -4767,4 +4798,202 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
+ private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+ static final int CACHING_WINDOW = 5;
+ static final String SUMMARY = "summary";
+
+ private final Map<String, com.codahale.metrics.Metric> clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() {
+ @Override
+ public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) {
+ return super.put(StormMetricsRegistry.name(SUMMARY, key), value);
}
+ };
+ private final Function<String, Histogram> registerHistogram = (name) -> {
+ //This histogram reflects the data distribution across only one ClusterSummary, i.e.,
+ // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment.
+ // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update
+ final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ clusterSummaryMetrics.put(name, histogram);
+ return histogram;
+ };
+ private volatile boolean active = false;
+
+ //NImbus metrics distribution
+ private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs");
+
+ //Supervisor metrics distribution
+ private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs");
+ private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers");
+ private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers");
+ private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem");
+ private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu");
+ private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem");
+ private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu");
+
+ //Topology metrics distribution
+ private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks");
+ private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors");
+ private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers");
+ private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs");
+ private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count");
+ private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap");
+ private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap");
+ private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu");
+ private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap");
+ private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap");
+ private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu");
+
+ /**
+ * Constructor to put all items in ClusterSummary in MetricSet as a metric.
+ * All metrics are derived from a cached ClusterSummary object,
+ * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters.
+ * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than
+ * reporting interval to avoid outdated reporting.
+ */
+ ClusterSummaryMetricSet() {
+ //Break the code if out of sync to thrift protocol
+ assert ClusterSummary._Fields.values().length == 3
+ && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS
+ && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES
+ && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES;
+
+ final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
+ @Override
+ protected ClusterSummary loadValue() {
+ try {
+ ClusterSummary newSummary = getClusterInfoImpl();
+ LOG.debug("the new summary is {}", newSummary);
+ //Force update histogram upon each cache refresh
+ //This behavior relies on the fact that most common implementation of Reporter
+ // reports Gauges before Histograms. Because DerivativeGauge will trigger cache
+ // refresh upon reporter's query, histogram will also be updated before query
+ updateHistogram(newSummary);
+ return newSummary;
+ } catch (Exception e) {
+ LOG.warn("Get cluster info exception.", e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new DerivativeGauge<ClusterSummary, Long>(cachedSummary) {
+ @Override
+ protected Long transform(ClusterSummary clusterSummary) {
+ return clusterSummary.get_nimbuses().stream()
+ .filter(NimbusSummary::is_isLeader)
+ .count();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:num-nimbuses", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+ @Override
+ protected Integer transform(ClusterSummary clusterSummary) {
+ return clusterSummary.get_nimbuses_size();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:num-supervisors", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+ @Override
+ protected Integer transform(ClusterSummary clusterSummary) {
+ return clusterSummary.get_supervisors_size();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:num-topologies", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+ @Override
+ protected Integer transform(ClusterSummary clusterSummary) {
+ return clusterSummary.get_topologies_size();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:num-total-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+ @Override
+ protected Integer transform(ClusterSummary clusterSummary) {
+ return clusterSummary.get_supervisors().stream()
+ .mapToInt(SupervisorSummary::get_num_workers)
+ .sum();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:num-total-used-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+ @Override
+ protected Integer transform(ClusterSummary clusterSummary) {
+ return clusterSummary.get_supervisors().stream()
+ .mapToInt(SupervisorSummary::get_num_used_workers)
+ .sum();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
+ @Override
+ protected Double transform(ClusterSummary clusterSummary) {
+ return clusterSummary.get_supervisors().stream()
+ //Filtered negative value
+ .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0))
+ .sum();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
+ @Override
+ protected Double transform(ClusterSummary clusterSummary) {
+ return clusterSummary.get_supervisors().stream()
+ //Filtered negative value
+ .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0))
+ .sum();
+ }
+ });
+ }
+
+ private void updateHistogram(ClusterSummary newSummary) {
+ for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) {
+ nimbusUptime.update(nimbusSummary.get_uptime_secs());
+ }
+ for (SupervisorSummary summary : newSummary.get_supervisors()) {
+ supervisorsUptime.update(summary.get_uptime_secs());
+ supervisorsNumWorkers.update(summary.get_num_workers());
+ supervisorsNumUsedWorkers.update(summary.get_num_used_workers());
+ supervisorsUsedMem.update(Math.round(summary.get_used_mem()));
+ supervisorsUsedCpu.update(Math.round(summary.get_used_cpu()));
+ supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem()));
+ supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu()));
+ }
+ for (TopologySummary summary : newSummary.get_topologies()) {
+ topologiesNumTasks.update(summary.get_num_tasks());
+ topologiesNumExecutors.update(summary.get_num_executors());
+ topologiesNumWorker.update(summary.get_num_workers());
+ topologiesUptime.update(summary.get_uptime_secs());
+ topologiesReplicationCount.update(summary.get_replication_count());
+ topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap()));
+ topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap()));
+ topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu()));
+ topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap()));
+ topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap()));
+ topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu()));
+ }
+ }
+
+ //This is not thread safe
+ void setActive(final boolean active) {
+ if (this.active != active) {
+ this.active = active;
+ if (active) {
+ StormMetricsRegistry.registerMetricSet(this);
+ } else {
+ //Could be replaced when metrics support remove all functions
+ // https://github.com/dropwizard/metrics/pull/1280
+ StormMetricsRegistry.unregisterMetricSet(this);
+ }
+ }
+ }
+
+ @Override
+ public Map<String, com.codahale.metrics.Metric> getMetrics() {
+ return clusterSummaryMetrics;
+ }
+
+ @Override
+ public void run() {
+ try {
+ //State changed
+ setActive(isLeader());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
+
[2/3] storm git commit: STORM-3147: Fix minor nits,
rebase to use non-static metrics registry
Posted by bo...@apache.org.
STORM-3147: Fix minor nits, rebase to use non-static metrics registry
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/392803c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/392803c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/392803c9
Branch: refs/heads/master
Commit: 392803c9bb2fe81a5df57f695a0e6d7c37bd1f2e
Parents: 0242859
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Mon Sep 17 22:21:12 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Sep 17 22:47:33 2018 +0200
----------------------------------------------------------------------
.../org/apache/storm/daemon/nimbus/Nimbus.java | 72 ++++++++++----------
.../storm/metric/StormMetricsRegistry.java | 13 ++++
2 files changed, 50 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/392803c9/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 36eca99..45c1c87 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.CachedGauge;
import com.codahale.metrics.DerivativeGauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.codahale.metrics.Timer;
@@ -449,6 +450,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private final IPrincipalToLocal principalToLocal;
private final StormMetricsRegistry metricsRegistry;
private final ResourceMetrics resourceMetrics;
+ private final ClusterSummaryMetricSet clusterMetricSet;
private MetricStore metricsStore;
private IAuthorizer authorizationHandler;
//Cached CuratorFramework, mainly used for BlobStore.
@@ -600,7 +602,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
this.principalToLocal = ClientAuthUtils.getPrincipalToLocalPlugin(conf);
this.supervisorClasspaths = Collections.unmodifiableNavigableMap(
Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// We don't use the classpath part of this, so just an empty list
- clusterMetricSet = new ClusterSummaryMetricSet();
+ clusterMetricSet = new ClusterSummaryMetricSet(metricsRegistry);
}
// TOPOLOGY STATE TRANSITIONS
@@ -2946,19 +2948,19 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
});
- StormMetricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values()
+ metricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values()
.parallelStream()
.mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0))
.sum());
- StormMetricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values()
+ metricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values()
.parallelStream()
.mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableCpu(), 0))
.sum());
- StormMetricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values()
+ metricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values()
.parallelStream()
.mapToDouble(SupervisorResources::getTotalMem)
.sum());
- StormMetricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values()
+ metricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values()
.parallelStream()
.mapToDouble(SupervisorResources::getTotalCpu)
.sum());
@@ -2982,9 +2984,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
throw new RuntimeException(e);
}
});
-
- //Should we make the delaySecs and recurSecs in sync with any conf value?
- // They should be around the reporting interval, but it's not configurable
+
timer.scheduleRecurring(5, 5, clusterMetricSet);
} catch (Exception e) {
if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
@@ -4661,7 +4661,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (metricsStore != null) {
metricsStore.close();
}
- //Put after timer close to avoid race condition
clusterMetricSet.setActive(false);
LOG.info("Shut down master");
} catch (Exception e) {
@@ -4798,16 +4797,25 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
- private class ClusterSummaryMetricSet implements MetricSet, Runnable {
- static final int CACHING_WINDOW = 5;
- static final String SUMMARY = "summary";
+ private static class ClusterSummaryMetrics implements MetricSet {
+ private static final String SUMMARY = "summary";
+ private final Map<String, com.codahale.metrics.Metric> metrics = new HashMap<>();
+
+ public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) {
+ return metrics.put(MetricRegistry.name(SUMMARY, key), value);
+ }
- private final Map<String, com.codahale.metrics.Metric> clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() {
- @Override
- public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) {
- return super.put(StormMetricsRegistry.name(SUMMARY, key), value);
-}
- };
+ @Override
+ public Map<String, com.codahale.metrics.Metric> getMetrics() {
+ return metrics;
+ }
+ }
+
+ private class ClusterSummaryMetricSet implements Runnable {
+ private static final int CACHING_WINDOW = 5;
+
+ private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics();
+
private final Function<String, Histogram> registerHistogram = (name) -> {
//This histogram reflects the data distribution across only one ClusterSummary, i.e.,
// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment.
@@ -4842,6 +4850,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap");
private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap");
private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu");
+ private final StormMetricsRegistry metricsRegistry;
/**
* Constructor to put all items in ClusterSummary in MetricSet as a metric.
@@ -4850,7 +4859,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
* In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than
* reporting interval to avoid outdated reporting.
*/
- ClusterSummaryMetricSet() {
+ ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) {
+ this.metricsRegistry = metricsRegistry;
//Break the code if out of sync to thrift protocol
assert ClusterSummary._Fields.values().length == 3
&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS
@@ -4862,11 +4872,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
protected ClusterSummary loadValue() {
try {
ClusterSummary newSummary = getClusterInfoImpl();
- LOG.debug("the new summary is {}", newSummary);
- //Force update histogram upon each cache refresh
- //This behavior relies on the fact that most common implementation of Reporter
- // reports Gauges before Histograms. Because DerivativeGauge will trigger cache
- // refresh upon reporter's query, histogram will also be updated before query
+ LOG.debug("The new summary is {}", newSummary);
+ /*
+ * Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before
+ * Histograms. Because DerivativeGauge will trigger cache refresh upon reporter's query, histogram will also be
+ * updated before query
+ */
updateHistogram(newSummary);
return newSummary;
} catch (Exception e) {
@@ -4966,29 +4977,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
}
- //This is not thread safe
void setActive(final boolean active) {
if (this.active != active) {
this.active = active;
if (active) {
- StormMetricsRegistry.registerMetricSet(this);
+ metricsRegistry.registerAll(clusterSummaryMetrics);
} else {
- //Could be replaced when metrics support remove all functions
- // https://github.com/dropwizard/metrics/pull/1280
- StormMetricsRegistry.unregisterMetricSet(this);
+ metricsRegistry.removeAll(clusterSummaryMetrics);
}
}
}
@Override
- public Map<String, com.codahale.metrics.Metric> getMetrics() {
- return clusterSummaryMetrics;
- }
-
- @Override
public void run() {
try {
- //State changed
setActive(isLeader());
} catch (Exception e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/storm/blob/392803c9/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index c4d2f3f..cc98804 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -15,7 +15,9 @@ package org.apache.storm.metric;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Timer;
import java.util.List;
@@ -54,6 +56,17 @@ public class StormMetricsRegistry {
public <V> Gauge<V> registerGauge(final String name, Gauge<V> gauge) {
return registry.gauge(name, () -> gauge);
}
+
+ public void registerAll(MetricSet metrics) {
+ registry.registerAll(metrics);
+ }
+
+ public void removeAll(MetricSet metrics) {
+ //Could be replaced when metrics support remove all functions
+ // https://github.com/dropwizard/metrics/pull/1280
+ Map<String, Metric> nameToMetric = metrics.getMetrics();
+ registry.removeMatching((name, metric) -> nameToMetric.containsKey(name));
+ }
public void startMetricsReporters(Map<String, Object> daemonConf) {
reporters = MetricsUtils.getPreparableReporters(daemonConf);
[3/3] storm git commit: Merge branch 'STORM-3147' of
https://github.com/srdo/storm into STORM-3147
Posted by bo...@apache.org.
Merge branch 'STORM-3147' of https://github.com/srdo/storm into STORM-3147
STORM-3147: Add metrics based on ClusterSummary
This closes #2840
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7ec618c1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7ec618c1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7ec618c1
Branch: refs/heads/master
Commit: 7ec618c10b95392fa9a033e360f1b2ac035b05ad
Parents: 6d0f2eb 392803c
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Wed Sep 19 12:52:53 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Wed Sep 19 12:52:53 2018 -0500
----------------------------------------------------------------------
.../org/apache/storm/daemon/nimbus/Nimbus.java | 293 +++++++++++++++++--
.../storm/metric/StormMetricsRegistry.java | 13 +
2 files changed, 275 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7ec618c1/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------