You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:51 UTC

[10/18] storm git commit: STORM-2702: added in simple congestion detection

STORM-2702: added in simple congestion detection


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a4c372c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a4c372c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a4c372c9

Branch: refs/heads/master
Commit: a4c372c95e5ad97beedc534e55d35cc09a962520
Parents: 0e3fc5a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Aug 24 17:13:50 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Aug 24 17:13:50 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/README.md                |  3 +-
 .../loadgen/HttpForwardingMetricsConsumer.java  |  4 ++-
 .../loadgen/HttpForwardingMetricsServer.java    |  4 +--
 .../apache/storm/loadgen/LoadMetricsServer.java | 34 ++++++++++++++++----
 4 files changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a4c372c9/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 1e5c69d..29d27ab 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -86,7 +86,7 @@ Not all options are supported by all reporters.
 |Reporter Option| Description | Supported Reporters|
 |---------------|-------------|--------------------|
 |time | Set the time unit that you want latency and CPU reported in.  This can be from nanoseconds up to seconds.  Most names are supported for the types| legacy, csv, tsv, fixed|
-|columns | A comma separated list of columns to output (see below for the metrics supported).  A `*` is replaced by all metrics. Defaults to "start_time", "end_time", "rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids" | csv, tsv, fixed |
+|columns | A comma separated list of columns to output (see below for the metrics supported).  A `*` is replaced by all metrics. Defaults to "start_time", "end_time", "rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids", "congested" | csv, tsv, fixed |
 |extraColumns | Like columns but ones that should be added to the defaults instead of replacing them. A `*` is replaced by all metrics. | csv, tsv, fixed |
 |meta | An arbitrary string that will appear as a "meta" column at the end.  This helps when appending to files to keep different runs separated | csv, tsv, fixed|
 |columnWidth | The width of each field | fixed|
@@ -118,6 +118,7 @@ There are a lot of different metrics supported
 |end_time| The ending time of the metrics window from the the first topology was launched. | all
 |time_window| the length in seconds for the time window. | all
 |ids| The topology ids that are being tracked | all
+|congested| Componets that appear to be congested | all
 |storm_version| The version of storm as reported by the client | all
 |java_version| The version of java as reported by the client | all
 |os_arch| The OS architecture as reported by the client | all

http://git-wip-us.apache.org/repos/asf/storm/blob/a4c372c9/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
index 5829e9d..645000d 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
@@ -48,6 +48,7 @@ public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
     private transient URL url;
     private transient IErrorReporter errorReporter;
     private transient KryoValuesSerializer serializer;
+    private transient String topologyId;
 
     @Override
     public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { 
@@ -55,6 +56,7 @@ public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
             url = new URL((String)registrationArgument);
             this.errorReporter = errorReporter;
             serializer = new KryoValuesSerializer(topoConf);
+            topologyId = context.getStormId();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -67,7 +69,7 @@ public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
             con.setRequestMethod("POST");
             con.setDoOutput(true);
             try (Output out = new Output(con.getOutputStream())) {
-                serializer.serializeInto(Arrays.asList(taskInfo, dataPoints), out);
+                serializer.serializeInto(Arrays.asList(taskInfo, dataPoints, topologyId), out);
                 out.flush();
             }
             //The connection is not sent unless a response is requested

http://git-wip-us.apache.org/repos/asf/storm/blob/a4c372c9/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
index 99a980b..247d017 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
@@ -57,7 +57,7 @@ public abstract class HttpForwardingMetricsServer {
         protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
             Input in = new Input(request.getInputStream());
             List<Object> metrics = des.get().deserializeFrom(in);
-            handle((TaskInfo)metrics.get(0), (Collection<DataPoint>)metrics.get(1));
+            handle((TaskInfo)metrics.get(0), (Collection<DataPoint>)metrics.get(1), (String)metrics.get(2));
             response.setStatus(HttpServletResponse.SC_OK);
         }
     }
@@ -74,7 +74,7 @@ public abstract class HttpForwardingMetricsServer {
     }
 
     //This needs to be thread safe
-    public abstract void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
+    public abstract void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints, String topologyId);
 
     /**
      * Start the server.

http://git-wip-us.apache.org/repos/asf/storm/blob/a4c372c9/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index 457dedf..36050ae 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -19,11 +19,9 @@
 package org.apache.storm.loadgen;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
-import java.io.PrintWriter;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -39,6 +37,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -104,6 +103,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         private long workers;
         private long executors;
         private long hosts;
+        private Map<String, String> congested;
 
         /**
          * Constructor.
@@ -114,7 +114,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
          */
         public Measurements(long uptimeSecs, long acked, long timeWindow, long failed, Histogram histo,
                             double userMs, double sysMs, double gcMs, long memBytes, Set<String> topologyIds,
-                            long workers, long executors, long hosts) {
+                            long workers, long executors, long hosts, Map<String, String> congested) {
             this.uptimeSecs = uptimeSecs;
             this.acked = acked;
             this.timeWindow = timeWindow;
@@ -128,6 +128,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             this.workers = workers;
             this.executors = executors;
             this.hosts = hosts;
+            this.congested = congested;
         }
 
         /**
@@ -147,6 +148,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             workers = 0;
             executors = 0;
             hosts = 0;
+            congested = new HashMap<>();
         }
 
         /**
@@ -167,6 +169,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             workers = Math.max(workers, other.workers);
             executors = Math.max(executors, other.executors);
             hosts = Math.max(hosts, other.hosts);
+            congested.putAll(other.congested);
         }
 
         public double getLatencyAtPercentile(double percential, TimeUnit unit) {
@@ -257,6 +260,10 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             return executors;
         }
 
+        public Map<String, String> getCongested() {
+            return congested;
+        }
+
         static Measurements combine(List<Measurements> measurements, Integer start, Integer count) {
             if (count == null) {
                 count = measurements.size();
@@ -396,6 +403,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         tmp.put("uptime",  new MetricExtractor((m, unit) -> m.getUptimeSecs(), "s"));
         tmp.put("time_window",  new MetricExtractor((m, unit) -> m.getTimeWindow(), "s"));
         tmp.put("ids",  new MetricExtractor((m, unit) -> m.getTopologyIds(), ""));
+        tmp.put("congested",  new MetricExtractor((m, unit) -> m.getCongested(), ""));
         tmp.put("workers",  new MetricExtractor((m, unit) -> m.getWorkers(), ""));
         tmp.put("hosts",  new MetricExtractor((m, unit) -> m.getHosts(), ""));
         tmp.put("executors",  new MetricExtractor((m, unit) -> m.getExecutors(), ""));
@@ -468,7 +476,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             } else {
                 //Wrapping it makes it mutable
                 extractors = new ArrayList<>(Arrays.asList("start_time", "end_time", "rate",
-                    "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids"));
+                    "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids", "congested"));
             }
 
             if (query.containsKey("extraColumns")) {
@@ -723,6 +731,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
     private final AtomicLong gcCount = new AtomicLong(0);
     private final AtomicLong gcMs = new AtomicLong(0);
     private final ConcurrentHashMap<String, MemMeasure> memoryBytes = new ConcurrentHashMap<>();
+    private final AtomicReference<ConcurrentHashMap<String, String>> congested = new AtomicReference<>(new ConcurrentHashMap<>());
     private final List<MetricResultsReporter> reporters;
     private long prevAcked = 0;
     private long prevFailed = 0;
@@ -903,7 +912,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         long memBytes = readMemory();
 
         allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime, copy, user, sys, gc, memBytes,
-            ids, workers.size(), executors, hosts.size()));
+            ids, workers.size(), executors, hosts.size(), congested.getAndSet(new ConcurrentHashMap<>())));
         Measurements inWindow = Measurements.combine(allCombined, null, windowLength);
         for (MetricResultsReporter reporter: reporters) {
             reporter.reportWindow(inWindow, allCombined);
@@ -912,8 +921,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
 
     @Override
     @SuppressWarnings("unchecked")
-    public void handle(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
-        //crud no simple way to tie this to a given topology :(
+    public void handle(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints, String topologyId) {
         String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
         for (IMetricsConsumer.DataPoint dp: dataPoints) {
             if (dp.name.startsWith("comp-lat-histo") && dp.value instanceof Histogram) {
@@ -952,6 +960,18 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
                     }
                     mm.update(((Number)val).longValue());
                 }
+            } else if (dp.name.equals("__receive")) {
+                Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                Object pop = m.get("population");
+                Object cap = m.get("capacity");
+                if (pop instanceof Number && cap instanceof Number) {
+                    double full = ((Number) pop).doubleValue() / ((Number) cap).doubleValue();
+                    if (full >= 0.8) {
+                        congested.get().put(
+                            topologyId + ":" + taskInfo.srcComponentId + ":" + taskInfo.srcTaskId,
+                            "receive " + pop + "/" + cap);
+                    }
+                }
             }
         }
     }