You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:51 UTC
[10/18] storm git commit: STORM-2702: added in simple congestion
detection
STORM-2702: added in simple congestion detection
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a4c372c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a4c372c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a4c372c9
Branch: refs/heads/master
Commit: a4c372c95e5ad97beedc534e55d35cc09a962520
Parents: 0e3fc5a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Aug 24 17:13:50 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Aug 24 17:13:50 2017 -0500
----------------------------------------------------------------------
examples/storm-loadgen/README.md | 3 +-
.../loadgen/HttpForwardingMetricsConsumer.java | 4 ++-
.../loadgen/HttpForwardingMetricsServer.java | 4 +--
.../apache/storm/loadgen/LoadMetricsServer.java | 34 ++++++++++++++++----
4 files changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a4c372c9/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 1e5c69d..29d27ab 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -86,7 +86,7 @@ Not all options are supported by all reporters.
|Reporter Option| Description | Supported Reporters|
|---------------|-------------|--------------------|
|time | Set the time unit that you want latency and CPU reported in. This can be from nanoseconds up to seconds. Most names are supported for the types| legacy, csv, tsv, fixed|
-|columns | A comma separated list of columns to output (see below for the metrics supported). A `*` is replaced by all metrics. Defaults to "start_time", "end_time", "rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids" | csv, tsv, fixed |
+|columns | A comma separated list of columns to output (see below for the metrics supported). A `*` is replaced by all metrics. Defaults to "start_time", "end_time", "rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids", "congested" | csv, tsv, fixed |
|extraColumns | Like columns but ones that should be added to the defaults instead of replacing them. A `*` is replaced by all metrics. | csv, tsv, fixed |
|meta | An arbitrary string that will appear as a "meta" column at the end. This helps when appending to files to keep different runs separated | csv, tsv, fixed|
|columnWidth | The width of each field | fixed|
@@ -118,6 +118,7 @@ There are a lot of different metrics supported
|end_time| The ending time of the metrics window from the the first topology was launched. | all
|time_window| the length in seconds for the time window. | all
|ids| The topology ids that are being tracked | all
+|congested| Componets that appear to be congested | all
|storm_version| The version of storm as reported by the client | all
|java_version| The version of java as reported by the client | all
|os_arch| The OS architecture as reported by the client | all
http://git-wip-us.apache.org/repos/asf/storm/blob/a4c372c9/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
index 5829e9d..645000d 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
@@ -48,6 +48,7 @@ public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
private transient URL url;
private transient IErrorReporter errorReporter;
private transient KryoValuesSerializer serializer;
+ private transient String topologyId;
@Override
public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
@@ -55,6 +56,7 @@ public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
url = new URL((String)registrationArgument);
this.errorReporter = errorReporter;
serializer = new KryoValuesSerializer(topoConf);
+ topologyId = context.getStormId();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -67,7 +69,7 @@ public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
con.setRequestMethod("POST");
con.setDoOutput(true);
try (Output out = new Output(con.getOutputStream())) {
- serializer.serializeInto(Arrays.asList(taskInfo, dataPoints), out);
+ serializer.serializeInto(Arrays.asList(taskInfo, dataPoints, topologyId), out);
out.flush();
}
//The connection is not sent unless a response is requested
http://git-wip-us.apache.org/repos/asf/storm/blob/a4c372c9/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
index 99a980b..247d017 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
@@ -57,7 +57,7 @@ public abstract class HttpForwardingMetricsServer {
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
Input in = new Input(request.getInputStream());
List<Object> metrics = des.get().deserializeFrom(in);
- handle((TaskInfo)metrics.get(0), (Collection<DataPoint>)metrics.get(1));
+ handle((TaskInfo)metrics.get(0), (Collection<DataPoint>)metrics.get(1), (String)metrics.get(2));
response.setStatus(HttpServletResponse.SC_OK);
}
}
@@ -74,7 +74,7 @@ public abstract class HttpForwardingMetricsServer {
}
//This needs to be thread safe
- public abstract void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
+ public abstract void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints, String topologyId);
/**
* Start the server.
http://git-wip-us.apache.org/repos/asf/storm/blob/a4c372c9/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index 457dedf..36050ae 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -19,11 +19,9 @@
package org.apache.storm.loadgen;
import com.google.common.annotations.VisibleForTesting;
-import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
-import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,6 +37,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -104,6 +103,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
private long workers;
private long executors;
private long hosts;
+ private Map<String, String> congested;
/**
* Constructor.
@@ -114,7 +114,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
*/
public Measurements(long uptimeSecs, long acked, long timeWindow, long failed, Histogram histo,
double userMs, double sysMs, double gcMs, long memBytes, Set<String> topologyIds,
- long workers, long executors, long hosts) {
+ long workers, long executors, long hosts, Map<String, String> congested) {
this.uptimeSecs = uptimeSecs;
this.acked = acked;
this.timeWindow = timeWindow;
@@ -128,6 +128,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
this.workers = workers;
this.executors = executors;
this.hosts = hosts;
+ this.congested = congested;
}
/**
@@ -147,6 +148,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
workers = 0;
executors = 0;
hosts = 0;
+ congested = new HashMap<>();
}
/**
@@ -167,6 +169,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
workers = Math.max(workers, other.workers);
executors = Math.max(executors, other.executors);
hosts = Math.max(hosts, other.hosts);
+ congested.putAll(other.congested);
}
public double getLatencyAtPercentile(double percential, TimeUnit unit) {
@@ -257,6 +260,10 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
return executors;
}
+ public Map<String, String> getCongested() {
+ return congested;
+ }
+
static Measurements combine(List<Measurements> measurements, Integer start, Integer count) {
if (count == null) {
count = measurements.size();
@@ -396,6 +403,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
tmp.put("uptime", new MetricExtractor((m, unit) -> m.getUptimeSecs(), "s"));
tmp.put("time_window", new MetricExtractor((m, unit) -> m.getTimeWindow(), "s"));
tmp.put("ids", new MetricExtractor((m, unit) -> m.getTopologyIds(), ""));
+ tmp.put("congested", new MetricExtractor((m, unit) -> m.getCongested(), ""));
tmp.put("workers", new MetricExtractor((m, unit) -> m.getWorkers(), ""));
tmp.put("hosts", new MetricExtractor((m, unit) -> m.getHosts(), ""));
tmp.put("executors", new MetricExtractor((m, unit) -> m.getExecutors(), ""));
@@ -468,7 +476,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
} else {
//Wrapping it makes it mutable
extractors = new ArrayList<>(Arrays.asList("start_time", "end_time", "rate",
- "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids"));
+ "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids", "congested"));
}
if (query.containsKey("extraColumns")) {
@@ -723,6 +731,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
private final AtomicLong gcCount = new AtomicLong(0);
private final AtomicLong gcMs = new AtomicLong(0);
private final ConcurrentHashMap<String, MemMeasure> memoryBytes = new ConcurrentHashMap<>();
+ private final AtomicReference<ConcurrentHashMap<String, String>> congested = new AtomicReference<>(new ConcurrentHashMap<>());
private final List<MetricResultsReporter> reporters;
private long prevAcked = 0;
private long prevFailed = 0;
@@ -903,7 +912,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
long memBytes = readMemory();
allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime, copy, user, sys, gc, memBytes,
- ids, workers.size(), executors, hosts.size()));
+ ids, workers.size(), executors, hosts.size(), congested.getAndSet(new ConcurrentHashMap<>())));
Measurements inWindow = Measurements.combine(allCombined, null, windowLength);
for (MetricResultsReporter reporter: reporters) {
reporter.reportWindow(inWindow, allCombined);
@@ -912,8 +921,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
@Override
@SuppressWarnings("unchecked")
- public void handle(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
- //crud no simple way to tie this to a given topology :(
+ public void handle(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints, String topologyId) {
String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
for (IMetricsConsumer.DataPoint dp: dataPoints) {
if (dp.name.startsWith("comp-lat-histo") && dp.value instanceof Histogram) {
@@ -952,6 +960,18 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
}
mm.update(((Number)val).longValue());
}
+ } else if (dp.name.equals("__receive")) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object pop = m.get("population");
+ Object cap = m.get("capacity");
+ if (pop instanceof Number && cap instanceof Number) {
+ double full = ((Number) pop).doubleValue() / ((Number) cap).doubleValue();
+ if (full >= 0.8) {
+ congested.get().put(
+ topologyId + ":" + taskInfo.srcComponentId + ":" + taskInfo.srcTaskId,
+ "receive " + pop + "/" + cap);
+ }
+ }
}
}
}