You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:56 UTC

[15/18] storm git commit: Added in latency as reported by the ui

Added in latency as reported by the ui


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/19508b94
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/19508b94
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/19508b94

Branch: refs/heads/master
Commit: 19508b94253a813aca4adca8ef373cef87a69e6d
Parents: 96e68c0
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 1 10:28:22 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Sep 1 10:28:22 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/README.md                |  1 +
 .../apache/storm/loadgen/LoadMetricsServer.java | 26 ++++++++++++++++++--
 2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/19508b94/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 6c476bd..1858ea4 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -129,6 +129,7 @@ There are a lot of different metrics supported
 |executors| The number of running executors in the monitored topologies | all
 |workers| The number of workers the monitored topologies are running on | all
 |skipped\_max\_spout| The number of ms in total that the spout reported it skipped trying to emit because of `topology.max.spout.pending`. This is the sum for all spouts and can be used to decide if setting the value higher will likely improve throughput. `congested` reports individual spouts that appear to be slowed down by this to a large degree. | all
+|ui\_complete\_latency| This is a special metric, as it is the average completion latency as reported on the ui for `:all-time`. Because it is comes from the UI it does not follow the normal windows.  Within a window the maximum value reported is used.  | all
 |target_rate| The target rate in sentences per second for the ThroughputVsLatency topology | ThroughputVsLatency
 |spout_parallel| The parallelism of the spout for the `ThroughputVsLatency` topology. | ThroughputVsLatency
 |split_parallel| The parallelism of the split bolt for the `ThroughputVsLatency` topology. | ThroughputVsLatency

http://git-wip-us.apache.org/repos/asf/storm/blob/19508b94/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index e6c1616..a44ab45 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -50,6 +50,7 @@ import org.apache.storm.generated.ExecutorSummary;
 import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.SpoutStats;
 import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologyPageInfo;
 import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.metric.api.IMetricsConsumer;
 import org.apache.storm.utils.Utils;
@@ -91,6 +92,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
 
     public static class Measurements {
         private final Histogram histo;
+        private double uiCompleteLatency;
         private long skippedMaxSpoutMs;
         private double userMs;
         private double sysMs;
@@ -115,7 +117,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
          */
         public Measurements(long uptimeSecs, long acked, long timeWindow, long failed, Histogram histo,
                             double userMs, double sysMs, double gcMs, long memBytes, Set<String> topologyIds,
-                            long workers, long executors, long hosts, Map<String, String> congested, long skippedMaxSpoutMs) {
+                            long workers, long executors, long hosts, Map<String, String> congested, long skippedMaxSpoutMs,
+                            double uiCompleteLatency) {
             this.uptimeSecs = uptimeSecs;
             this.acked = acked;
             this.timeWindow = timeWindow;
@@ -131,6 +134,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             this.hosts = hosts;
             this.congested = congested;
             this.skippedMaxSpoutMs = skippedMaxSpoutMs;
+            this.uiCompleteLatency = uiCompleteLatency;
         }
 
         /**
@@ -152,6 +156,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             hosts = 0;
             congested = new HashMap<>();
             skippedMaxSpoutMs = 0;
+            uiCompleteLatency = 0.0;
         }
 
         /**
@@ -174,6 +179,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             hosts = Math.max(hosts, other.hosts);
             congested.putAll(other.congested);
             skippedMaxSpoutMs += other.skippedMaxSpoutMs;
+            uiCompleteLatency = Math.max(uiCompleteLatency, other.uiCompleteLatency);
         }
 
         public double getLatencyAtPercentile(double percential, TimeUnit unit) {
@@ -196,6 +202,10 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             return convert(histo.getStdDeviation(), TimeUnit.NANOSECONDS, unit);
         }
 
+        public double getUiCompleteLatency(TimeUnit unit) {
+            return convert(uiCompleteLatency, TimeUnit.MILLISECONDS, unit);
+        }
+
         public double getUserTime(TimeUnit unit) {
             return convert(userMs, TimeUnit.MILLISECONDS, unit);
         }
@@ -424,6 +434,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         tmp.put("os_name", new MetricExtractor((m, unit) -> System.getProperty("os.name"), ""));
         tmp.put("os_version", new MetricExtractor((m, unit) -> System.getProperty("os.version"), ""));
         tmp.put("config_override", new MetricExtractor((m, unit) -> Utils.readCommandLineOpts(), ""));
+        tmp.put("ui_complete_latency", new MetricExtractor((m, unit) -> m.getUiCompleteLatency(unit)));
         NAMED_EXTRACTORS = Collections.unmodifiableMap(tmp);
     }
 
@@ -874,8 +885,12 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         int uptime = 0;
         long acked = 0;
         long failed = 0;
+        double totalLatMs = 0;
+        long totalLatCount = 0;
         for (String id: ids) {
             TopologyInfo info = client.getTopologyInfo(id);
+            @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+            TopologyPageInfo tpi = client.getTopologyPageInfo(id, ":all-time", false);
             uptime = Math.max(uptime, info.get_uptime_secs());
             for (ExecutorSummary exec : info.get_executors()) {
                 hosts.add(exec.get_host());
@@ -900,6 +915,12 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
                     }
                 }
             }
+            Double latency = tpi.get_topology_stats().get_window_to_complete_latencies_ms().get(":all-time");
+            Long latAcked = tpi.get_topology_stats().get_window_to_acked().get(":all-time");
+            if (latency != null && latAcked != null) {
+                totalLatCount += latAcked;
+                totalLatMs += (latAcked * latency);
+            }
         }
         @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
         long failedThisTime = failed - prevFailed;
@@ -923,7 +944,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         long memBytes = readMemory();
 
         allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime, copy, user, sys, gc, memBytes,
-            ids, workers.size(), executors, hosts.size(), congested.getAndSet(new ConcurrentHashMap<>()), skippedMaxSpout));
+            ids, workers.size(), executors, hosts.size(), congested.getAndSet(new ConcurrentHashMap<>()), skippedMaxSpout,
+            totalLatMs / totalLatCount));
         Measurements inWindow = Measurements.combine(allCombined, null, windowLength);
         for (MetricResultsReporter reporter: reporters) {
             reporter.reportWindow(inWindow, allCombined);