You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:56 UTC
[15/18] storm git commit: Added in latency as reported by the ui
Added in latency as reported by the ui
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/19508b94
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/19508b94
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/19508b94
Branch: refs/heads/master
Commit: 19508b94253a813aca4adca8ef373cef87a69e6d
Parents: 96e68c0
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 1 10:28:22 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Sep 1 10:28:22 2017 -0500
----------------------------------------------------------------------
examples/storm-loadgen/README.md | 1 +
.../apache/storm/loadgen/LoadMetricsServer.java | 26 ++++++++++++++++++--
2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/19508b94/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 6c476bd..1858ea4 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -129,6 +129,7 @@ There are a lot of different metrics supported
|executors| The number of running executors in the monitored topologies | all
|workers| The number of workers the monitored topologies are running on | all
|skipped\_max\_spout| The number of ms in total that the spout reported it skipped trying to emit because of `topology.max.spout.pending`. This is the sum for all spouts and can be used to decide if setting the value higher will likely improve throughput. `congested` reports individual spouts that appear to be slowed down by this to a large degree. | all
+|ui\_complete\_latency| This is a special metric, as it is the average completion latency as reported on the ui for `:all-time`. Because it is comes from the UI it does not follow the normal windows. Within a window the maximum value reported is used. | all
|target_rate| The target rate in sentences per second for the ThroughputVsLatency topology | ThroughputVsLatency
|spout_parallel| The parallelism of the spout for the `ThroughputVsLatency` topology. | ThroughputVsLatency
|split_parallel| The parallelism of the split bolt for the `ThroughputVsLatency` topology. | ThroughputVsLatency
http://git-wip-us.apache.org/repos/asf/storm/blob/19508b94/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index e6c1616..a44ab45 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -50,6 +50,7 @@ import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.SpoutStats;
import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologyPageInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.utils.Utils;
@@ -91,6 +92,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
public static class Measurements {
private final Histogram histo;
+ private double uiCompleteLatency;
private long skippedMaxSpoutMs;
private double userMs;
private double sysMs;
@@ -115,7 +117,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
*/
public Measurements(long uptimeSecs, long acked, long timeWindow, long failed, Histogram histo,
double userMs, double sysMs, double gcMs, long memBytes, Set<String> topologyIds,
- long workers, long executors, long hosts, Map<String, String> congested, long skippedMaxSpoutMs) {
+ long workers, long executors, long hosts, Map<String, String> congested, long skippedMaxSpoutMs,
+ double uiCompleteLatency) {
this.uptimeSecs = uptimeSecs;
this.acked = acked;
this.timeWindow = timeWindow;
@@ -131,6 +134,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
this.hosts = hosts;
this.congested = congested;
this.skippedMaxSpoutMs = skippedMaxSpoutMs;
+ this.uiCompleteLatency = uiCompleteLatency;
}
/**
@@ -152,6 +156,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
hosts = 0;
congested = new HashMap<>();
skippedMaxSpoutMs = 0;
+ uiCompleteLatency = 0.0;
}
/**
@@ -174,6 +179,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
hosts = Math.max(hosts, other.hosts);
congested.putAll(other.congested);
skippedMaxSpoutMs += other.skippedMaxSpoutMs;
+ uiCompleteLatency = Math.max(uiCompleteLatency, other.uiCompleteLatency);
}
public double getLatencyAtPercentile(double percential, TimeUnit unit) {
@@ -196,6 +202,10 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
return convert(histo.getStdDeviation(), TimeUnit.NANOSECONDS, unit);
}
+ public double getUiCompleteLatency(TimeUnit unit) {
+ return convert(uiCompleteLatency, TimeUnit.MILLISECONDS, unit);
+ }
+
public double getUserTime(TimeUnit unit) {
return convert(userMs, TimeUnit.MILLISECONDS, unit);
}
@@ -424,6 +434,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
tmp.put("os_name", new MetricExtractor((m, unit) -> System.getProperty("os.name"), ""));
tmp.put("os_version", new MetricExtractor((m, unit) -> System.getProperty("os.version"), ""));
tmp.put("config_override", new MetricExtractor((m, unit) -> Utils.readCommandLineOpts(), ""));
+ tmp.put("ui_complete_latency", new MetricExtractor((m, unit) -> m.getUiCompleteLatency(unit)));
NAMED_EXTRACTORS = Collections.unmodifiableMap(tmp);
}
@@ -874,8 +885,12 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
int uptime = 0;
long acked = 0;
long failed = 0;
+ double totalLatMs = 0;
+ long totalLatCount = 0;
for (String id: ids) {
TopologyInfo info = client.getTopologyInfo(id);
+ @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+ TopologyPageInfo tpi = client.getTopologyPageInfo(id, ":all-time", false);
uptime = Math.max(uptime, info.get_uptime_secs());
for (ExecutorSummary exec : info.get_executors()) {
hosts.add(exec.get_host());
@@ -900,6 +915,12 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
}
}
}
+ Double latency = tpi.get_topology_stats().get_window_to_complete_latencies_ms().get(":all-time");
+ Long latAcked = tpi.get_topology_stats().get_window_to_acked().get(":all-time");
+ if (latency != null && latAcked != null) {
+ totalLatCount += latAcked;
+ totalLatMs += (latAcked * latency);
+ }
}
@SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
long failedThisTime = failed - prevFailed;
@@ -923,7 +944,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
long memBytes = readMemory();
allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime, copy, user, sys, gc, memBytes,
- ids, workers.size(), executors, hosts.size(), congested.getAndSet(new ConcurrentHashMap<>()), skippedMaxSpout));
+ ids, workers.size(), executors, hosts.size(), congested.getAndSet(new ConcurrentHashMap<>()), skippedMaxSpout,
+ totalLatMs / totalLatCount));
Measurements inWindow = Measurements.combine(allCombined, null, windowLength);
for (MetricResultsReporter reporter: reporters) {
reporter.reportWindow(inWindow, allCombined);