You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/19 18:21:39 UTC

[1/3] storm git commit: STORM-3147: Port ClusterSummary to StormMetricsRegistry

Repository: storm
Updated Branches:
  refs/heads/master 6d0f2ebd2 -> 7ec618c10


STORM-3147: Port ClusterSummary to StormMetricsRegistry


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02428594
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02428594
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02428594

Branch: refs/heads/master
Commit: 02428594b53801572f657f4d0bff85b2213b3723
Parents: 8c90f12
Author: Zhengdai Hu <hu...@gmail.com>
Authored: Fri Jul 13 14:22:20 2018 -0500
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Sep 17 21:27:33 2018 +0200

----------------------------------------------------------------------
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 291 +++++++++++++++++--
 1 file changed, 260 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/02428594/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 5c48271..36eca99 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -18,8 +18,12 @@
 
 package org.apache.storm.daemon.nimbus;
 
+import com.codahale.metrics.CachedGauge;
+import com.codahale.metrics.DerivativeGauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
 import com.codahale.metrics.Timer;
 import java.io.File;
 import java.io.FileInputStream;
@@ -50,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.function.UnaryOperator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -595,6 +600,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         this.principalToLocal = ClientAuthUtils.getPrincipalToLocalPlugin(conf);
         this.supervisorClasspaths = Collections.unmodifiableNavigableMap(
             Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// We don't use the classpath part of this, so just an empty list
+        clusterMetricSet = new ClusterSummaryMetricSet();
     }
 
     // TOPOLOGY STATE TRANSITIONS
@@ -816,7 +822,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 LOG.info("Assign executors: {}", execToPort.keySet());
                 numAddedSlot += count;
                 numAddedExec += execToPort.size();
-                }
+            }
         } else if (newAssignments.isEmpty()) {
             for (Entry<String, Assignment> entry : existingAssignments.entrySet()) {
                 final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
@@ -825,7 +831,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 LOG.info("Remove executors: {}", execToPort.keySet());
                 numRemovedSlot += count;
                 numRemovedExec += execToPort.size();
-                    }
+            }
         } else {
             MapDifference<String, Assignment> difference = Maps.difference(existingAssignments, newAssignments);
             if (anyChanged = !difference.areEqual()) {
@@ -859,8 +865,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                         if (execEntry.getValue().equals(oldExecToSlot.get(execEntry.getKey()))) {
                             commonExecCount++;
                             commonSlots.add(execEntry.getValue());
-                }
-            }
+                        }
+                    }
                     long commonSlotCount = commonSlots.size();
 
                     //Treat reassign as remove and add
@@ -868,7 +874,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                     numRemovedExec += oldExecToSlot.size() - commonExecCount;
                     numAddedSlot += slots.size() - commonSlotCount;
                     numAddedExec += execToSlot.size() - commonExecCount;
-        }
+                }
             }
             LOG.debug("{} assignments unchanged: {}", difference.entriesInCommon().size(), difference.entriesInCommon().keySet());
         }
@@ -881,12 +887,21 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
 
         if (anyChanged) {
             LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu());
-            nodeIdToResources.get().forEach((id, node) ->
+            nodeIdToResources.get().forEach((id, node) -> {
+                final double availableMem = node.getAvailableMem();
+                if (availableMem < 0) {
+                    LOG.warn("Memory over-scheduled on {}", id, availableMem);
+                }
+                final double availableCpu = node.getAvailableCpu();
+                if (availableCpu < 0) {
+                    LOG.warn("CPU over-scheduled on {}", id, availableCpu);
+                }
                 LOG.info(
                     "Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
                         + "CPU: {}, Available CPU: {}, fragmented: {}",
-                    id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
-                    node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node)));
+                        id, node.getTotalMem(), node.getUsedMem(), availableMem,
+                        node.getTotalCpu(), node.getUsedCpu(), availableCpu, isFragmented(node));
+            });
         }
         return anyChanged;
     }
@@ -2277,9 +2292,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                     execToNodePort = new HashMap<>();
                 }
                 Set<String> allNodes = new HashSet<>();
-                    for (List<Object> nodePort : execToNodePort.values()) {
-                        allNodes.add((String) nodePort.get(0));
-                    }
+                for (List<Object> nodePort : execToNodePort.values()) {
+                    allNodes.add((String) nodePort.get(0));
+                }
                 Map<String, String> allNodeHost = new HashMap<>();
                 Assignment existingAssignment = existingAssignments.get(topoId);
                 if (existingAssignment != null) {
@@ -2713,8 +2728,16 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             ret.set_used_mem(resources.getUsedMem());
             ret.set_used_cpu(resources.getUsedCpu());
             if (isFragmented(resources)) {
-                ret.set_fragmented_cpu(resources.getAvailableCpu());
-                ret.set_fragmented_mem(resources.getAvailableMem());
+                final double availableCpu = resources.getAvailableCpu();
+                if (availableCpu < 0) {
+                    LOG.warn("Negative fragmented CPU on {}", supervisorId);
+                }
+                ret.set_fragmented_cpu(availableCpu);
+                final double availableMem = resources.getAvailableMem();
+                if (availableMem < 0) {
+                    LOG.warn("Negative fragmented Mem on {}", supervisorId);
+                }
+                ret.set_fragmented_mem(availableMem);
             }
         }
         if (info.is_set_version()) {
@@ -2872,6 +2895,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 for (String topoId : state.activeStorms()) {
                     transition(topoId, TopologyActions.STARTUP, null);
                 }
+                clusterMetricSet.setActive(true);
             }
 
             final boolean doNotReassign = (Boolean) conf.getOrDefault(ServerConfigUtils.NIMBUS_DO_NOT_REASSIGN, false);
@@ -2922,19 +2946,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                                         }
                                     });
 
-            metricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size());
-            metricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory);
-            metricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu);
-            metricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values().parallelStream()
-                .mapToDouble(SupervisorResources::getAvailableMem)
+            StormMetricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values()
+                .parallelStream()
+                .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0))
                 .sum());
-            metricsRegistry.registerGauge("nimbus:available-cpu", () -> nodeIdToResources.get().values().parallelStream()
-                .mapToDouble(SupervisorResources::getAvailableCpu)
+            StormMetricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values()
+                .parallelStream()
+                .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableCpu(), 0))
                 .sum());
-            metricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values().parallelStream()
+            StormMetricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values()
+                .parallelStream()
                 .mapToDouble(SupervisorResources::getTotalMem)
                 .sum());
-            metricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values().parallelStream()
+            StormMetricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values()
+                .parallelStream()
                 .mapToDouble(SupervisorResources::getTotalCpu)
                 .sum());
             metricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
@@ -2947,16 +2972,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             });
             metricsRegistry.registerMeter("nimbus:num-launched").mark();
 
-                timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
-                                        () -> {
-                                            try {
-                                                if (isLeader()) {
-                                                    sendClusterMetricsToExecutors();
-                                                }
-                                            } catch (Exception e) {
-                                                throw new RuntimeException(e);
+            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
+                                    () -> {
+                                        try {
+                                            if (isLeader()) {
+                                                sendClusterMetricsToExecutors();
                                             }
-                                        });
+                                        } catch (Exception e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                    });
+
+            //Should we make the delaySecs and recurSecs in sync with any conf value?
+            // They should be around the reporting interval, but it's not configurable
+            timer.scheduleRecurring(5, 5, clusterMetricSet);
         } catch (Exception e) {
             if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                 throw e;
@@ -4632,6 +4661,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             if (metricsStore != null) {
                 metricsStore.close();
             }
+            //Put after timer close to avoid race condition
+            clusterMetricSet.setActive(false);
             LOG.info("Shut down master");
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -4767,4 +4798,202 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
 
     }
 
+    private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+        static final int CACHING_WINDOW = 5;
+        static final String SUMMARY = "summary";
+
+        private final Map<String, com.codahale.metrics.Metric> clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() {
+            @Override
+            public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) {
+                return super.put(StormMetricsRegistry.name(SUMMARY, key), value);
 }
+        };
+        private final Function<String, Histogram> registerHistogram = (name) -> {
+            //This histogram reflects the data distribution across only one ClusterSummary, i.e.,
+            // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment.
+            // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update
+            final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+            clusterSummaryMetrics.put(name, histogram);
+            return histogram;
+        };
+        private volatile boolean active = false;
+
+        //NImbus metrics distribution
+        private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs");
+
+        //Supervisor metrics distribution
+        private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs");
+        private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers");
+        private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers");
+        private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem");
+        private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu");
+        private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem");
+        private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu");
+
+        //Topology metrics distribution
+        private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks");
+        private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors");
+        private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers");
+        private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs");
+        private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count");
+        private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap");
+        private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap");
+        private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu");
+        private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap");
+        private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap");
+        private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu");
+
+        /**
+         * Constructor to put all items in ClusterSummary in MetricSet as a metric.
+         * All metrics are derived from a cached ClusterSummary object,
+         * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters.
+         * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than
+         * reporting interval to avoid outdated reporting.
+         */
+        ClusterSummaryMetricSet() {
+            //Break the code if out of sync to thrift protocol
+            assert ClusterSummary._Fields.values().length == 3
+                && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS
+                && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES
+                && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES;
+
+            final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
+                @Override
+                protected ClusterSummary loadValue() {
+                    try {
+                        ClusterSummary newSummary = getClusterInfoImpl();
+                        LOG.debug("the new summary is {}", newSummary);
+                        //Force update histogram upon each cache refresh
+                        //This behavior relies on the fact that most common implementation of Reporter
+                        // reports Gauges before Histograms. Because DerivativeGauge will trigger cache
+                        // refresh upon reporter's query, histogram will also be updated before query
+                        updateHistogram(newSummary);
+                        return newSummary;
+                    } catch (Exception e) {
+                        LOG.warn("Get cluster info exception.", e);
+                        throw new RuntimeException(e);
+                    }
+                }
+            };
+
+            clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new DerivativeGauge<ClusterSummary, Long>(cachedSummary) {
+                @Override
+                protected Long transform(ClusterSummary clusterSummary) {
+                    return clusterSummary.get_nimbuses().stream()
+                            .filter(NimbusSummary::is_isLeader)
+                            .count();
+                }
+            });
+            clusterSummaryMetrics.put("cluster:num-nimbuses", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+                @Override
+                protected Integer transform(ClusterSummary clusterSummary) {
+                    return clusterSummary.get_nimbuses_size();
+                }
+            });
+            clusterSummaryMetrics.put("cluster:num-supervisors", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+                @Override
+                protected Integer transform(ClusterSummary clusterSummary) {
+                    return clusterSummary.get_supervisors_size();
+                }
+            });
+            clusterSummaryMetrics.put("cluster:num-topologies", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+                @Override
+                protected Integer transform(ClusterSummary clusterSummary) {
+                    return clusterSummary.get_topologies_size();
+                }
+            });
+            clusterSummaryMetrics.put("cluster:num-total-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+                @Override
+                protected Integer transform(ClusterSummary clusterSummary) {
+                    return clusterSummary.get_supervisors().stream()
+                            .mapToInt(SupervisorSummary::get_num_workers)
+                            .sum();
+                }
+            });
+            clusterSummaryMetrics.put("cluster:num-total-used-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+                @Override
+                protected Integer transform(ClusterSummary clusterSummary) {
+                    return clusterSummary.get_supervisors().stream()
+                            .mapToInt(SupervisorSummary::get_num_used_workers)
+                            .sum();
+                }
+            });
+            clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
+                @Override
+                protected Double transform(ClusterSummary clusterSummary) {
+                    return clusterSummary.get_supervisors().stream()
+                            //Filtered negative value
+                            .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0))
+                            .sum();
+                }
+            });
+            clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
+                @Override
+                protected Double transform(ClusterSummary clusterSummary) {
+                    return clusterSummary.get_supervisors().stream()
+                            //Filtered negative value
+                            .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0))
+                            .sum();
+                }
+            });
+        }
+
+        private void updateHistogram(ClusterSummary newSummary) {
+            for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) {
+                nimbusUptime.update(nimbusSummary.get_uptime_secs());
+            }
+            for (SupervisorSummary summary : newSummary.get_supervisors()) {
+                supervisorsUptime.update(summary.get_uptime_secs());
+                supervisorsNumWorkers.update(summary.get_num_workers());
+                supervisorsNumUsedWorkers.update(summary.get_num_used_workers());
+                supervisorsUsedMem.update(Math.round(summary.get_used_mem()));
+                supervisorsUsedCpu.update(Math.round(summary.get_used_cpu()));
+                supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem()));
+                supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu()));
+            }
+            for (TopologySummary summary : newSummary.get_topologies()) {
+                topologiesNumTasks.update(summary.get_num_tasks());
+                topologiesNumExecutors.update(summary.get_num_executors());
+                topologiesNumWorker.update(summary.get_num_workers());
+                topologiesUptime.update(summary.get_uptime_secs());
+                topologiesReplicationCount.update(summary.get_replication_count());
+                topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap()));
+                topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap()));
+                topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu()));
+                topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap()));
+                topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap()));
+                topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu()));
+            }
+        }
+
+        //This is not thread safe
+        void setActive(final boolean active) {
+            if (this.active != active) {
+                this.active = active;
+                if (active) {
+                    StormMetricsRegistry.registerMetricSet(this);
+                } else {
+                    //Could be replaced when metrics support remove all functions
+                    // https://github.com/dropwizard/metrics/pull/1280
+                    StormMetricsRegistry.unregisterMetricSet(this);
+                }
+            }
+        }
+
+        @Override
+        public Map<String, com.codahale.metrics.Metric> getMetrics() {
+            return clusterSummaryMetrics;
+        }
+
+        @Override
+        public void run() {
+            try {
+                //State changed
+                setActive(isLeader());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
+


[2/3] storm git commit: STORM-3147: Fix minor nits, rebase to use non-static metrics registry

Posted by bo...@apache.org.
STORM-3147: Fix minor nits, rebase to use non-static metrics registry


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/392803c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/392803c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/392803c9

Branch: refs/heads/master
Commit: 392803c9bb2fe81a5df57f695a0e6d7c37bd1f2e
Parents: 0242859
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Mon Sep 17 22:21:12 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Sep 17 22:47:33 2018 +0200

----------------------------------------------------------------------
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 72 ++++++++++----------
 .../storm/metric/StormMetricsRegistry.java      | 13 ++++
 2 files changed, 50 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/392803c9/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 36eca99..45c1c87 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.CachedGauge;
 import com.codahale.metrics.DerivativeGauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.SlidingTimeWindowReservoir;
 import com.codahale.metrics.Timer;
@@ -449,6 +450,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private final IPrincipalToLocal principalToLocal;
     private final StormMetricsRegistry metricsRegistry;
     private final ResourceMetrics resourceMetrics;
+    private final ClusterSummaryMetricSet clusterMetricSet;
     private MetricStore metricsStore;
     private IAuthorizer authorizationHandler;
     //Cached CuratorFramework, mainly used for BlobStore.
@@ -600,7 +602,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         this.principalToLocal = ClientAuthUtils.getPrincipalToLocalPlugin(conf);
         this.supervisorClasspaths = Collections.unmodifiableNavigableMap(
             Utils.getConfiguredClasspathVersions(conf, EMPTY_STRING_LIST));// We don't use the classpath part of this, so just an empty list
-        clusterMetricSet = new ClusterSummaryMetricSet();
+        clusterMetricSet = new ClusterSummaryMetricSet(metricsRegistry);
     }
 
     // TOPOLOGY STATE TRANSITIONS
@@ -2946,19 +2948,19 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                                         }
                                     });
 
-            StormMetricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values()
+            metricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values()
                 .parallelStream()
                 .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0))
                 .sum());
-            StormMetricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values()
+            metricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values()
                 .parallelStream()
                 .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableCpu(), 0))
                 .sum());
-            StormMetricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values()
+            metricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values()
                 .parallelStream()
                 .mapToDouble(SupervisorResources::getTotalMem)
                 .sum());
-            StormMetricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values()
+            metricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values()
                 .parallelStream()
                 .mapToDouble(SupervisorResources::getTotalCpu)
                 .sum());
@@ -2982,9 +2984,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                                             throw new RuntimeException(e);
                                         }
                                     });
-
-            //Should we make the delaySecs and recurSecs in sync with any conf value?
-            // They should be around the reporting interval, but it's not configurable
+            
             timer.scheduleRecurring(5, 5, clusterMetricSet);
         } catch (Exception e) {
             if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
@@ -4661,7 +4661,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             if (metricsStore != null) {
                 metricsStore.close();
             }
-            //Put after timer close to avoid race condition
             clusterMetricSet.setActive(false);
             LOG.info("Shut down master");
         } catch (Exception e) {
@@ -4798,16 +4797,25 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
 
     }
 
-    private class ClusterSummaryMetricSet implements MetricSet, Runnable {
-        static final int CACHING_WINDOW = 5;
-        static final String SUMMARY = "summary";
+    private static class ClusterSummaryMetrics implements MetricSet {
+        private static final String SUMMARY = "summary";
+        private final Map<String, com.codahale.metrics.Metric> metrics = new HashMap<>();
+        
+        public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) {
+            return metrics.put(MetricRegistry.name(SUMMARY, key), value);
+        }
 
-        private final Map<String, com.codahale.metrics.Metric> clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() {
-            @Override
-            public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) {
-                return super.put(StormMetricsRegistry.name(SUMMARY, key), value);
-}
-        };
+        @Override
+        public Map<String, com.codahale.metrics.Metric> getMetrics() {
+            return metrics;
+        }
+    }
+    
+    private class ClusterSummaryMetricSet implements Runnable {
+        private static final int CACHING_WINDOW = 5;
+        
+        private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics();
+        
         private final Function<String, Histogram> registerHistogram = (name) -> {
             //This histogram reflects the data distribution across only one ClusterSummary, i.e.,
             // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment.
@@ -4842,6 +4850,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap");
         private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap");
         private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu");
+        private final StormMetricsRegistry metricsRegistry;
 
         /**
          * Constructor to put all items in ClusterSummary in MetricSet as a metric.
@@ -4850,7 +4859,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
          * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than
          * reporting interval to avoid outdated reporting.
          */
-        ClusterSummaryMetricSet() {
+        ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) {
+            this.metricsRegistry = metricsRegistry;
             //Break the code if out of sync to thrift protocol
             assert ClusterSummary._Fields.values().length == 3
                 && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS
@@ -4862,11 +4872,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 protected ClusterSummary loadValue() {
                     try {
                         ClusterSummary newSummary = getClusterInfoImpl();
-                        LOG.debug("the new summary is {}", newSummary);
-                        //Force update histogram upon each cache refresh
-                        //This behavior relies on the fact that most common implementation of Reporter
-                        // reports Gauges before Histograms. Because DerivativeGauge will trigger cache
-                        // refresh upon reporter's query, histogram will also be updated before query
+                        LOG.debug("The new summary is {}", newSummary);
+                        /*
+                         * Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before
+                         * Histograms. Because DerivativeGauge will trigger cache refresh upon reporter's query, histogram will also be
+                         * updated before query
+                         */
                         updateHistogram(newSummary);
                         return newSummary;
                     } catch (Exception e) {
@@ -4966,29 +4977,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             }
         }
 
-        //This is not thread safe
         void setActive(final boolean active) {
             if (this.active != active) {
                 this.active = active;
                 if (active) {
-                    StormMetricsRegistry.registerMetricSet(this);
+                    metricsRegistry.registerAll(clusterSummaryMetrics);
                 } else {
-                    //Could be replaced when metrics support remove all functions
-                    // https://github.com/dropwizard/metrics/pull/1280
-                    StormMetricsRegistry.unregisterMetricSet(this);
+                    metricsRegistry.removeAll(clusterSummaryMetrics);
                 }
             }
         }
 
         @Override
-        public Map<String, com.codahale.metrics.Metric> getMetrics() {
-            return clusterSummaryMetrics;
-        }
-
-        @Override
         public void run() {
             try {
-                //State changed
                 setActive(isLeader());
             } catch (Exception e) {
                 throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/storm/blob/392803c9/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index c4d2f3f..cc98804 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -15,7 +15,9 @@ package org.apache.storm.metric;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Reservoir;
 import com.codahale.metrics.Timer;
 import java.util.List;
@@ -54,6 +56,17 @@ public class StormMetricsRegistry {
     public <V> Gauge<V> registerGauge(final String name, Gauge<V> gauge) {
         return registry.gauge(name, () -> gauge);
     }
+    
+    public void registerAll(MetricSet metrics) {
+        registry.registerAll(metrics);
+    }
+    
+    public void removeAll(MetricSet metrics) {
+        //Could be replaced when metrics support remove all functions
+        // https://github.com/dropwizard/metrics/pull/1280
+        Map<String, Metric> nameToMetric = metrics.getMetrics();
+        registry.removeMatching((name, metric) -> nameToMetric.containsKey(name));
+    }
 
     public void startMetricsReporters(Map<String, Object> daemonConf) {
         reporters = MetricsUtils.getPreparableReporters(daemonConf);


[3/3] storm git commit: Merge branch 'STORM-3147' of https://github.com/srdo/storm into STORM-3147

Posted by bo...@apache.org.
Merge branch 'STORM-3147' of https://github.com/srdo/storm into STORM-3147

 STORM-3147: Add metrics based on ClusterSummary

This closes #2840


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7ec618c1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7ec618c1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7ec618c1

Branch: refs/heads/master
Commit: 7ec618c10b95392fa9a033e360f1b2ac035b05ad
Parents: 6d0f2eb 392803c
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Wed Sep 19 12:52:53 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Wed Sep 19 12:52:53 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 293 +++++++++++++++++--
 .../storm/metric/StormMetricsRegistry.java      |  13 +
 2 files changed, 275 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7ec618c1/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------