You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:44 UTC
[03/18] storm git commit: STORM-2702: storm-loadgen
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
new file mode 100644
index 0000000..c126ca5
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.HdrHistogram.Histogram;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutStats;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A metrics server that records and reports metrics for a set of running topologies.
+ */
+public class LoadMetricsServer extends HttpForwardingMetricsServer {
+ private static final Logger LOG = LoggerFactory.getLogger(HttpForwardingMetricsServer.class);
+
+ private static class MemMeasure {
+ private long mem = 0;
+ private long time = 0;
+
+ synchronized void update(long mem) {
+ this.mem = mem;
+ time = System.currentTimeMillis();
+ }
+
+ public synchronized long get() {
+ return isExpired() ? 0L : mem;
+ }
+
+ synchronized boolean isExpired() {
+ return (System.currentTimeMillis() - time) >= 20000;
+ }
+ }
+
+ @VisibleForTesting
+ static double convert(double value, TimeUnit from, TimeUnit target) {
+ if (target.compareTo(from) > 0) {
+ return value / from.convert(1, target);
+ }
+ return value * target.convert(1, from);
+ }
+
+ public static class Measurements {
+ private final Histogram histo;
+ private double userMs;
+ private double sysMs;
+ private double gcMs;
+ private long memBytes;
+ private long uptimeSecs;
+ private long timeWindow;
+ private long acked;
+ private long failed;
+ private Set<String> topologyIds;
+
+ /**
+ * Constructor.
+ * @param histo latency histogram.
+ * @param userMs user CPU in ms.
+ * @param sysMs system CPU in ms.
+ * @param gcMs GC CPU in ms.
+ */
+ public Measurements(long uptimeSecs, long acked, long timeWindow, long failed, Histogram histo,
+ double userMs, double sysMs, double gcMs, long memBytes, Set<String> topologyIds) {
+ this.uptimeSecs = uptimeSecs;
+ this.acked = acked;
+ this.timeWindow = timeWindow;
+ this.failed = failed;
+ this.userMs = userMs;
+ this.sysMs = sysMs;
+ this.gcMs = gcMs;
+ this.histo = histo;
+ this.memBytes = memBytes;
+ this.topologyIds = topologyIds;
+ }
+
+ /**
+ * Default Constructor.
+ */
+ public Measurements() {
+ histo = new Histogram(3600000000000L, 3);
+ sysMs = 0;
+ userMs = 0;
+ gcMs = 0;
+ memBytes = 0;
+ uptimeSecs = 0;
+ timeWindow = 0;
+ acked = 0;
+ failed = 0;
+ topologyIds = new HashSet<>();
+ }
+
+ /**
+ * Add other to this.
+ * @param other meaurements to add in.
+ */
+ public void add(Measurements other) {
+ histo.add(other.histo);
+ sysMs += other.sysMs;
+ userMs += other.userMs;
+ gcMs += other.gcMs;
+ memBytes = Math.max(memBytes, other.memBytes);
+ acked += other.acked;
+ failed += other.failed;
+ uptimeSecs = Math.max(uptimeSecs, other.uptimeSecs);
+ timeWindow += other.timeWindow;
+ topologyIds.addAll(other.topologyIds);
+ }
+
+ public double getLatencyAtPercentile(double percential, TimeUnit unit) {
+ return convert(histo.getValueAtPercentile(percential), TimeUnit.NANOSECONDS, unit);
+ }
+
+ public double getMinLatency(TimeUnit unit) {
+ return convert(histo.getMinValue(), TimeUnit.NANOSECONDS, unit);
+ }
+
+ public double getMaxLatency(TimeUnit unit) {
+ return convert(histo.getMaxValue(), TimeUnit.NANOSECONDS, unit);
+ }
+
+ public double getMeanLatency(TimeUnit unit) {
+ return convert(histo.getMean(), TimeUnit.NANOSECONDS, unit);
+ }
+
+ public double getLatencyStdDeviation(TimeUnit unit) {
+ return convert(histo.getStdDeviation(), TimeUnit.NANOSECONDS, unit);
+ }
+
+ public double getUserTime(TimeUnit unit) {
+ return convert(userMs, TimeUnit.MILLISECONDS, unit);
+ }
+
+ public double getSysTime(TimeUnit unit) {
+ return convert(sysMs, TimeUnit.MILLISECONDS, unit);
+ }
+
+ public double getGc(TimeUnit unit) {
+ return convert(gcMs, TimeUnit.MILLISECONDS, unit);
+ }
+
+ public double getMemMb() {
+ return memBytes / (1024.0 * 1024.0);
+ }
+
+ public long getUptimeSecs() {
+ return uptimeSecs;
+ }
+
+ public long getCompleted() {
+ return histo.getTotalCount();
+ }
+
+ public double getCompletedPerSec() {
+ return getCompleted() / (double)timeWindow;
+ }
+
+ public long getAcked() {
+ return acked;
+ }
+
+ public double getAckedPerSec() {
+ return acked / (double)timeWindow;
+ }
+
+ public long getFailed() {
+ return failed;
+ }
+
+ public long startTime() {
+ return uptimeSecs - timeWindow;
+ }
+
+ public long endTime() {
+ return uptimeSecs;
+ }
+
+ public double getTimeWindow() {
+ return timeWindow;
+ }
+
+ public Set<String> getTopologyIds() {
+ return topologyIds;
+ }
+
+ static Measurements combine(List<Measurements> measurements, Integer start, Integer count) {
+ if (count == null) {
+ count = measurements.size();
+ }
+
+ if (start == null) {
+ start = measurements.size() - count;
+ }
+ start = Math.max(0, start);
+ count = Math.min(count, measurements.size() - start);
+
+ Measurements ret = new Measurements();
+ for (int i = start; i < start + count; i ++) {
+ ret.add(measurements.get(i));
+ }
+ return ret;
+ }
+ }
+
+ interface MetricResultsReporter {
+ void start();
+
+ void reportWindow(Measurements inWindow, List<Measurements> allTime);
+
+ void finish(List<Measurements> allTime) throws Exception;
+ }
+
+ abstract static class FileReporter implements MetricResultsReporter {
+ protected final PrintStream out;
+ private final boolean needsClose;
+
+ public FileReporter() throws FileNotFoundException {
+ this(null, null);
+ }
+
+ public FileReporter(String path, Map<String, String> query) throws FileNotFoundException {
+ boolean append = Boolean.parseBoolean(query.getOrDefault("append", "false"));
+
+ if (path == null || "/dev/stdout".equals(path)) {
+ out = System.out;
+ needsClose = false;
+ } else if ("/dev/stderr".equals(path)) {
+ out = System.err;
+ needsClose = false;
+ } else {
+ out = new PrintStream(new FileOutputStream(path, append));
+ needsClose = true;
+ }
+ }
+
+ @Override
+ public void start() {
+ //NOOP
+ }
+
+ @Override
+ public void finish(List<Measurements> allTime) throws Exception {
+ if (needsClose && out != null) {
+ out.close();
+ }
+ }
+ }
+
+ private static final Map<String, TimeUnit> UNIT_MAP;
+
+ static {
+ HashMap<String, TimeUnit> tmp = new HashMap<>();
+ tmp.put("NS", TimeUnit.NANOSECONDS);
+ tmp.put("NANO", TimeUnit.NANOSECONDS);
+ tmp.put("NANOSEC", TimeUnit.NANOSECONDS);
+ tmp.put("NANOSECOND", TimeUnit.NANOSECONDS);
+ tmp.put("NANOSECONDS", TimeUnit.NANOSECONDS);
+ tmp.put("μS", TimeUnit.MICROSECONDS);
+ tmp.put("US", TimeUnit.MICROSECONDS);
+ tmp.put("MICRO", TimeUnit.MICROSECONDS);
+ tmp.put("MICROSEC", TimeUnit.MICROSECONDS);
+ tmp.put("MICROSECOND", TimeUnit.MICROSECONDS);
+ tmp.put("MICROSECONDS", TimeUnit.MICROSECONDS);
+ tmp.put("MS", TimeUnit.MILLISECONDS);
+ tmp.put("MILLI", TimeUnit.MILLISECONDS);
+ tmp.put("MILLISEC", TimeUnit.MILLISECONDS);
+ tmp.put("MILLISECOND", TimeUnit.MILLISECONDS);
+ tmp.put("MILLISECONDS", TimeUnit.MILLISECONDS);
+ tmp.put("S", TimeUnit.SECONDS);
+ tmp.put("SEC", TimeUnit.SECONDS);
+ tmp.put("SECOND", TimeUnit.SECONDS);
+ tmp.put("SECONDS", TimeUnit.SECONDS);
+ tmp.put("M", TimeUnit.MINUTES);
+ tmp.put("MIN", TimeUnit.MINUTES);
+ tmp.put("MINUTE", TimeUnit.MINUTES);
+ tmp.put("MINUTES", TimeUnit.MINUTES);
+ UNIT_MAP = Collections.unmodifiableMap(tmp);
+ }
+
+ private static final Map<TimeUnit, String> TIME_UNIT_NAME;
+
+ static {
+ HashMap<TimeUnit, String> tmp = new HashMap<>();
+ tmp.put(TimeUnit.NANOSECONDS, "ns");
+ tmp.put(TimeUnit.MICROSECONDS, "μs");
+ tmp.put(TimeUnit.MILLISECONDS, "ms");
+ tmp.put(TimeUnit.SECONDS, "s");
+ tmp.put(TimeUnit.MINUTES, "m");
+ TIME_UNIT_NAME = Collections.unmodifiableMap(tmp);
+ }
+
+ private static final Map<String, MetricExtractor> NAMED_EXTRACTORS;
+
+ static {
+ //Perhaps there is a better way to do this???
+ HashMap<String, MetricExtractor> tmp = new HashMap<>();
+ tmp.put("99%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.0, unit)));
+ tmp.put("99.9%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.9, unit)));
+ tmp.put("median", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(50, unit)));
+ tmp.put("mean", new MetricExtractor((m, unit) -> m.getMeanLatency(unit)));
+ tmp.put("min", new MetricExtractor((m, unit) -> m.getMinLatency(unit)));
+ tmp.put("max", new MetricExtractor((m, unit) -> m.getMaxLatency(unit)));
+ tmp.put("stddev", new MetricExtractor((m, unit) -> m.getLatencyStdDeviation(unit)));
+ tmp.put("user_cpu", new MetricExtractor((m, unit) -> m.getUserTime(unit)));
+ tmp.put("sys_cpu", new MetricExtractor((m, unit) -> m.getSysTime(unit)));
+ tmp.put("gc_cpu", new MetricExtractor((m, unit) -> m.getGc(unit)));
+ tmp.put("cores", new MetricExtractor(
+ (m, unit) -> (m.getSysTime(TimeUnit.SECONDS) + m.getUserTime(TimeUnit.SECONDS)) / m.getTimeWindow(),
+ ""));
+ tmp.put("uptime", new MetricExtractor((m, unit) -> m.getUptimeSecs(), "s"));
+ tmp.put("acked", new MetricExtractor((m, unit) -> m.getAcked(), ""));
+ tmp.put("rate", new MetricExtractor((m, unit) -> m.getAckedPerSec(), "tuple/s"));
+ tmp.put("completed", new MetricExtractor((m, unit) -> m.getCompleted(), ""));
+ tmp.put("completion_rate", new MetricExtractor((m, unit) -> m.getCompletedPerSec(), "tuple/s"));
+ tmp.put("mem", new MetricExtractor((m, unit) -> m.getMemMb(), "MB"));
+ tmp.put("failed", new MetricExtractor((m, unit) -> m.getFailed(), ""));
+ tmp.put("start_time", new MetricExtractor((m, unit) -> m.startTime(),"s"));
+ tmp.put("end_time", new MetricExtractor((m, unit) -> m.endTime(), "s"));
+ tmp.put("time_window", new MetricExtractor((m, unit) -> m.getTimeWindow(), "s"));
+ tmp.put("ids", new MetricExtractor((m, unit) -> m.getTopologyIds(), ""));
+ NAMED_EXTRACTORS = Collections.unmodifiableMap(tmp);
+ }
+
+ static class MetricExtractor {
+ private final String unit;
+ private final BiFunction<Measurements, TimeUnit, Object> func;
+
+ public MetricExtractor(BiFunction<Measurements, TimeUnit, Object> func) {
+ this.func = func;
+ this.unit = null;
+ }
+
+ public MetricExtractor(BiFunction<Measurements, TimeUnit, Object> func, String unit) {
+ this.func = func;
+ this.unit = unit;
+ }
+
+ public Object get(Measurements m, TimeUnit unit) {
+ return func.apply(m, unit);
+ }
+
+ public String formatName(String name, TimeUnit targetUnit) {
+ StringBuilder ret = new StringBuilder();
+ ret.append(name);
+ if (unit == null || !unit.isEmpty()) {
+ ret.append("(");
+ if (unit == null) {
+ ret.append(TIME_UNIT_NAME.get(targetUnit));
+ } else {
+ ret.append(unit);
+ }
+ ret.append(")");
+ }
+ return ret.toString();
+ }
+ }
+
+ static class SepValReporter extends FileReporter {
+ private final TimeUnit targetUnit;
+ private final String separator;
+ private final List<String> extractors;
+ private final String meta;
+
+ public SepValReporter(String separator, String path, Map<String, String> query) throws FileNotFoundException {
+ super(path, query);
+ this.separator = separator;
+ targetUnit = UNIT_MAP.get(query.getOrDefault("time", "MILLISECONDS").toUpperCase());
+ if (targetUnit == null) {
+ throw new IllegalArgumentException(query.get("time") + " is not a supported time unit");
+ }
+ if (query.containsKey("columns")) {
+ extractors = Arrays.asList(query.get("columns").split("\\s*,\\s*"));
+ HashSet<String> notFound = new HashSet<>(extractors);
+ notFound.removeAll(NAMED_EXTRACTORS.keySet());
+ if (notFound.size() > 0) {
+ throw new IllegalArgumentException(notFound + " columns are not supported");
+ }
+ } else {
+ //Wrapping it makes it mutable
+ extractors = new ArrayList<>(Arrays.asList("start_time", "end_time", "completion_rate",
+ "mean", "99%ile", "99.9%ile", "cores", "mem", "failed"));
+ }
+
+ if (query.containsKey("extraColumns")) {
+ List<String> moreExtractors = Arrays.asList(query.get("extraColumns").split("\\s*,\\s*"));
+ for (String extractor: moreExtractors) {
+ if (!NAMED_EXTRACTORS.containsKey(extractor)) {
+ throw new IllegalArgumentException(extractor + " is not a supported column");
+ }
+ if (!extractors.contains(extractor)) {
+ extractors.add(extractor);
+ }
+ }
+ }
+
+ meta = query.get("meta");
+ }
+
+ @Override
+ public void start() {
+ boolean first = true;
+ for (String name: extractors) {
+ if (!first) {
+ out.print(separator);
+ }
+ first = false;
+ out.print(NAMED_EXTRACTORS.get(name).formatName(name, targetUnit));
+ }
+ if (meta != null) {
+ out.print(separator);
+ out.print("meta");
+ }
+ out.println();
+ }
+
+ @Override
+ public void reportWindow(Measurements m, List<Measurements> allTime) {
+ boolean first = true;
+ for (String name: extractors) {
+ if (!first) {
+ out.print(separator);
+ }
+ first = false;
+ Object value = NAMED_EXTRACTORS.get(name).get(m, targetUnit);
+ String svalue = value == null ? "" : value.toString();
+ out.print(escape(svalue));
+ }
+ if (meta != null) {
+ out.print(separator);
+ out.print(escape(meta));
+ }
+ out.println();
+ }
+
+ private String escape(String svalue) {
+ return svalue.replace("\\", "\\\\").replace(separator, "\\" + separator);
+ }
+ }
+
+ static class LegacyReporter extends FileReporter {
+ private final TimeUnit targetUnitOverride;
+
+ public LegacyReporter() throws FileNotFoundException {
+ super();
+ targetUnitOverride = null;
+ }
+
+ public LegacyReporter(String path, Map<String, String> query) throws FileNotFoundException {
+ super(path, query);
+ if (query.containsKey("time")) {
+ targetUnitOverride = UNIT_MAP.get(query.get("time").toUpperCase());
+ if (targetUnitOverride == null) {
+ throw new IllegalArgumentException(query.get("time") + " is not a supported time unit");
+ }
+ } else {
+ targetUnitOverride = null;
+ }
+ }
+
+ @Override
+ public void reportWindow(Measurements m, List<Measurements> allTime) {
+ TimeUnit nsOr = TimeUnit.NANOSECONDS;
+ TimeUnit msOr = TimeUnit.MILLISECONDS;
+ if (targetUnitOverride != null) {
+ nsOr = targetUnitOverride;
+ msOr = targetUnitOverride;
+ }
+
+ Measurements total = Measurements.combine(allTime, null, null);
+ out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d "
+ + "99%%: %,15.0f 99.9%%: %,15.0f min: %,15.0f max: %,15.0f mean: %,15.2f "
+ + "stddev: %,15.2f user: %,10.0f sys: %,10.0f gc: %,10.0f mem: %,10.2f\n",
+ m.getUptimeSecs(), m.getAcked(), m.getAckedPerSec(), total.getFailed(),
+ m.getLatencyAtPercentile(99.0, nsOr),
+ m.getLatencyAtPercentile(99.9, nsOr),
+ m.getMinLatency(nsOr),
+ m.getMaxLatency(nsOr),
+ m.getMeanLatency(nsOr),
+ m.getLatencyStdDeviation(nsOr),
+ m.getUserTime(msOr),
+ m.getSysTime(msOr),
+ m.getGc(msOr),
+ m.getMemMb());
+ }
+ }
+
+ /**
+ * Add Command line options for configuring the output of this.
+ * @param options command line options to update
+ */
+ public static void addCommandLineOptions(Options options) {
+ //We want to be able to select the measurement interval
+ // reporting window (We don't need 3 different reports)
+ // We want to be able to specify format (and configs specific to the format)
+ // With perhaps defaults overall
+ options.addOption(Option.builder("r")
+ .longOpt("report-interval")
+ .hasArg()
+ .argName("INTERVAL_SECS")
+ .desc("How long in between reported metrics. Will be rounded up to the next 10 sec boundary.\n"
+ + "default " + DEFAULT_REPORT_INTERVAL)
+ .build());
+
+ options.addOption(Option.builder("w")
+ .longOpt("report-window")
+ .hasArg()
+ .argName("INTERVAL_SECS")
+ .desc("How long of a rolling window should be in each report. Will be rounded up to the next report interval boundary.\n"
+ + "default " + DEFAULT_WINDOW_INTERVAL)
+ .build());
+
+ options.addOption(Option.builder()
+ .longOpt("reporter")
+ .hasArg()
+ .argName("TYPE:PATH?OPTIONS")
+ .desc("Provide the config for a reporter to run. Supported types are:\n"
+ + "LEGACY - (write things out in the legacy format)\n"
+ + "TSV - tab separated values\n"
+ + "CSV - comma separated values")
+ .build());
+
+ }
+
+ public static final long DEFAULT_REPORT_INTERVAL = 30;
+ public static final long DEFAULT_WINDOW_INTERVAL = DEFAULT_REPORT_INTERVAL;
+ private static final Pattern REPORTER_PATTERN = Pattern.compile(
+ "(?<type>[^:?]+)(?::(?<path>[^?]+))?(?:\\?(?<query>.*))?");
+
+ private final Histogram histo = new Histogram(3600000000000L, 3);
+ private final AtomicLong systemCpu = new AtomicLong(0);
+ private final AtomicLong userCpu = new AtomicLong(0);
+ private final AtomicLong gcCount = new AtomicLong(0);
+ private final AtomicLong gcMs = new AtomicLong(0);
+ private final ConcurrentHashMap<String, MemMeasure> memoryBytes = new ConcurrentHashMap<>();
+ private final List<MetricResultsReporter> reporters;
+ private long prevAcked = 0;
+ private long prevFailed = 0;
+ private long prevUptime = 0;
+ private int windowLength = 1;
+ private long reportIntervalSecs = DEFAULT_REPORT_INTERVAL;
+
+ private final LinkedList<Measurements> allCombined = new LinkedList<>();
+
+ LoadMetricsServer(Map<String, Object> conf, CommandLine commandLine) throws URISyntaxException, FileNotFoundException {
+ super(conf);
+ if (commandLine.hasOption("r")) {
+ reportIntervalSecs = Long.parseLong(commandLine.getOptionValue("r"));
+ reportIntervalSecs = ((reportIntervalSecs + 1) / 10) * 10;
+ }
+ if (commandLine.hasOption("w")) {
+ long window = Long.parseLong(commandLine.getOptionValue("w"));
+ windowLength = (int) ((window + 1) / reportIntervalSecs);
+ }
+ reporters = new ArrayList<>();
+ if (commandLine.hasOption("reporter")) {
+ for (String reporterString: commandLine.getOptionValues("reporter")) {
+ Matcher m = REPORTER_PATTERN.matcher(reporterString);
+ if (!m.matches()) {
+ throw new IllegalArgumentException(reporterString + " does nto look like it is a reporter");
+ }
+ String type = m.group("type");
+ String path = m.group("path");
+ Map<String, String> query = new HashMap<>();
+ String queryString = m.group("query");
+ if (queryString != null) {
+ for (String param : queryString.split("&")) {
+ String[] pair = param.split("=");
+ String key = pair[0];
+ String value = pair.length > 1 ? pair[1] : "true";
+ query.put(key, value);
+ }
+ }
+ type = type.toUpperCase();
+ switch (type) {
+ case "LEGACY":
+ reporters.add(new LegacyReporter(path, query));
+ break;
+ case "TSV":
+ reporters.add(new SepValReporter("\t", path, query));
+ break;
+ case "CSV":
+ reporters.add(new SepValReporter(",", path, query));
+ break;
+ default:
+ throw new RuntimeException(type + " is not a supported reporter type");
+ }
+ }
+ } else {
+ reporters.add(new LegacyReporter());
+ }
+ }
+
+ private long readMemory() {
+ long total = 0;
+ for (MemMeasure mem: memoryBytes.values()) {
+ total += mem.get();
+ }
+ return total;
+ }
+
+ private void startMetricsOutput() {
+ for (MetricResultsReporter reporter: reporters) {
+ reporter.start();
+ }
+ }
+
+ private void finishMetricsOutput() throws Exception {
+ for (MetricResultsReporter reporter: reporters) {
+ reporter.finish(allCombined);
+ }
+ }
+
+ /**
+ * Monitor the list of topologies for the given time frame.
+ * @param execTimeMins how long to monitor for
+ * @param client the client to use when monitoring
+ * @param topoNames the names of the topologies to monitor
+ * @throws Exception on any error
+ */
+ public void monitorFor(double execTimeMins, Nimbus.Iface client, Collection<String> topoNames) throws Exception {
+ startMetricsOutput();
+ long iterations = (long) ((execTimeMins * 60) / reportIntervalSecs);
+ for (int i = 0; i < iterations; i++) {
+ Thread.sleep(reportIntervalSecs * 1000);
+ outputMetrics(client, topoNames);
+ }
+ finishMetricsOutput();
+ }
+
+ private void outputMetrics(Nimbus.Iface client, Collection<String> names) throws Exception {
+ ClusterSummary summary = client.getClusterInfo();
+ Set<String> ids = new HashSet<>();
+ for (TopologySummary ts: summary.get_topologies()) {
+ if (names.contains(ts.get_name())) {
+ ids.add(ts.get_id());
+ }
+ }
+ if (ids.size() != names.size()) {
+ throw new Exception("Could not find all topologies: " + names);
+ }
+ int uptime = 0;
+ long acked = 0;
+ long failed = 0;
+ for (String id: ids) {
+ TopologyInfo info = client.getTopologyInfo(id);
+ uptime = Math.max(uptime, info.get_uptime_secs());
+ for (ExecutorSummary exec : info.get_executors()) {
+ if (exec.get_stats() != null && exec.get_stats().get_specific() != null
+ && exec.get_stats().get_specific().is_set_spout()) {
+ SpoutStats stats = exec.get_stats().get_specific().get_spout();
+ Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+ Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+ if (ackedMap != null) {
+ for (String key : ackedMap.keySet()) {
+ if (failedMap != null) {
+ Long tmp = failedMap.get(key);
+ if (tmp != null) {
+ failed += tmp;
+ }
+ }
+ long ackVal = ackedMap.get(key);
+ acked += ackVal;
+ }
+ }
+ }
+ }
+ }
+ @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+ long failedThisTime = failed - prevFailed;
+ @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+ long ackedThisTime = acked - prevAcked;
+ @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+ long thisTime = uptime - prevUptime;
+ prevUptime = uptime;
+ prevAcked = acked;
+ prevFailed = failed;
+
+ Histogram copy = new Histogram(3600000000000L, 3);;
+ synchronized (histo) {
+ copy.add(histo);
+ histo.reset();
+ }
+ long user = userCpu.getAndSet(0);
+ long sys = systemCpu.getAndSet(0);
+ long gc = gcMs.getAndSet(0);
+ long memBytes = readMemory();
+
+ allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime, copy, user, sys, gc, memBytes, ids));
+ Measurements inWindow = Measurements.combine(allCombined, null, windowLength);
+ for (MetricResultsReporter reporter: reporters) {
+ reporter.reportWindow(inWindow, allCombined);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void handle(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
+ //crud no simple way to tie this to a given topology :(
+ String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
+ for (IMetricsConsumer.DataPoint dp: dataPoints) {
+ if (dp.name.startsWith("comp-lat-histo") && dp.value instanceof Histogram) {
+ synchronized (histo) {
+ histo.add((Histogram)dp.value);
+ }
+ } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object sys = m.get("sys-ms");
+ if (sys instanceof Number) {
+ systemCpu.getAndAdd(((Number)sys).longValue());
+ }
+ Object user = m.get("user-ms");
+ if (user instanceof Number) {
+ userCpu.getAndAdd(((Number)user).longValue());
+ }
+ } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object count = m.get("count");
+ if (count instanceof Number) {
+ gcCount.getAndAdd(((Number)count).longValue());
+ }
+ Object time = m.get("timeMs");
+ if (time instanceof Number) {
+ gcMs.getAndAdd(((Number)time).longValue());
+ }
+ } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object val = m.get("usedBytes");
+ if (val instanceof Number) {
+ MemMeasure mm = memoryBytes.get(worker);
+ if (mm == null) {
+ mm = new MemMeasure();
+ MemMeasure tmp = memoryBytes.putIfAbsent(worker, mm);
+ mm = tmp == null ? mm : tmp;
+ }
+ mm.update(((Number)val).longValue());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java
new file mode 100644
index 0000000..95ade59
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+/**
+ * A spout that simulates a real world spout based off of statistics about it.
+ */
+public class LoadSpout extends BaseRichSpout {
+ private static class OutputStreamEngineWithHisto extends OutputStreamEngine {
+ public final HistogramMetric histogram;
+
+ public OutputStreamEngineWithHisto(OutputStream stats, TopologyContext context) {
+ super(stats);
+ histogram = new HistogramMetric(3600000000000L, 3);
+ //TODO perhaps we can adjust the frequency later...
+ context.registerMetric("comp-lat-histo-" + stats.id, histogram, 10);
+ }
+ }
+
+ private static class SentWithTime {
+ public final String streamName;
+ public final Values keyValue;
+ public final long time;
+ public final HistogramMetric histogram;
+
+ SentWithTime(String streamName, Values keyValue, long time, HistogramMetric histogram) {
+ this.streamName = streamName;
+ this.keyValue = keyValue;
+ this.time = time;
+ this.histogram = histogram;
+ }
+
+ public void done() {
+ histogram.recordValue(Math.max(0, System.nanoTime() - time));
+ }
+ }
+
+ private final List<OutputStream> streamStats;
+ private List<OutputStreamEngineWithHisto> streams;
+ private SpoutOutputCollector collector;
+ //This is an attempt to give all of the streams an equal opportunity to emit something.
+ private long nextStreamCounter = 0;
+ private final int numStreams;
+
+ /**
+ * Create a simple load spout with just a set rate per second on the default stream.
+ * @param ratePerSecond the rate to send messages at.
+ */
+ public LoadSpout(double ratePerSecond) {
+ OutputStream test = new OutputStream.Builder()
+ .withId("default")
+ .withRate(new NormalDistStats(ratePerSecond, 0.0, ratePerSecond, ratePerSecond))
+ .build();
+ streamStats = Arrays.asList(test);
+ numStreams = 1;
+ }
+
+ public LoadSpout(LoadCompConf conf) {
+ this.streamStats = Collections.unmodifiableList(new ArrayList<>(conf.streams));
+ numStreams = streamStats.size();
+ }
+
+ @Override
+ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+ streams = Collections.unmodifiableList(streamStats.stream()
+ .map((ss) -> new OutputStreamEngineWithHisto(ss, context)).collect(Collectors.toList()));
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ int size = numStreams;
+ for (int tries = 0; tries < size; tries++) {
+ int index = Math.abs((int) (nextStreamCounter++ % size));
+ OutputStreamEngineWithHisto se = streams.get(index);
+ Long emitTupleTime = se.shouldEmit();
+ if (emitTupleTime != null) {
+ SentWithTime swt =
+ new SentWithTime(se.streamName, getNextValues(se), emitTupleTime, se.histogram);
+ collector.emit(swt.streamName, swt.keyValue, swt);
+ break;
+ }
+ }
+ }
+
+ protected Values getNextValues(OutputStreamEngine se) {
+ return new Values(se.nextKey(), "JUST_SOME_VALUE");
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for (OutputStream s: streamStats) {
+ declarer.declareStream(s.id, new Fields("key", "value"));
+ }
+ }
+
+ @Override
+ public void ack(Object id) {
+ ((SentWithTime)id).done();
+ }
+
+ @Override
+ public void fail(Object id) {
+ SentWithTime swt = (SentWithTime)id;
+ collector.emit(swt.streamName, swt.keyValue, swt);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
new file mode 100644
index 0000000..20b2926
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stats related to something with a normal distribution, and a way to randomly simulate it.
+ */
+public class NormalDistStats implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(NormalDistStats.class);
+ public final double mean;
+ public final double stddev;
+ public final double min;
+ public final double max;
+
+ /**
+ * Read the stats from a config.
+ * @param conf the config.
+ * @return the corresponding stats.
+ */
+ public static NormalDistStats fromConf(Map<String, Object> conf) {
+ return fromConf(conf, null);
+ }
+
+ /**
+ * Read the stats from a config.
+ * @param conf the config.
+ * @param def the default mean.
+ * @return the corresponding stats.
+ */
+ public static NormalDistStats fromConf(Map<String, Object> conf, Double def) {
+ if (conf == null) {
+ conf = Collections.emptyMap();
+ }
+ double mean = ObjectReader.getDouble(conf.get("mean"), def);
+ double stddev = ObjectReader.getDouble(conf.get("stddev"), mean / 4);
+ double min = ObjectReader.getDouble(conf.get("min"), 0.0);
+ double max = ObjectReader.getDouble(conf.get("max"), Double.MAX_VALUE);
+ return new NormalDistStats(mean, stddev, min, max);
+ }
+
+ /**
+ * Return this as a config.
+ * @return the config version of this.
+ */
+ public Map<String, Object> toConf() {
+ Map<String, Object> ret = new HashMap<>();
+ ret.put("mean", mean);
+ ret.put("stddev", stddev);
+ ret.put("min", min);
+ ret.put("max", max);
+ return ret;
+ }
+
+ /**
+ * Create an instance of this from a list of values. The metrics will be computed from the values.
+ * @param values the values to compute metrics from.
+ */
+ public NormalDistStats(List<Double> values) {
+ //Compute the stats for these and save them
+ double min = values.isEmpty() ? 0.0 : values.get(0);
+ double max = values.isEmpty() ? 0.0 : values.get(0);
+ double sum = 0.0;
+ long count = values.size();
+ for (Double v: values) {
+ sum += v;
+ min = Math.min(min, v);
+ max = Math.max(max,v);
+ }
+ double mean = sum / Math.max(count, 1);
+ double sdPartial = 0;
+ for (Double v: values) {
+ sdPartial += Math.pow(v - mean, 2);
+ }
+ double stddev = 0.0;
+ if (count >= 2) {
+ stddev = Math.sqrt(sdPartial / (count - 1));
+ }
+ this.min = min;
+ this.max = max;
+ this.mean = mean;
+ this.stddev = stddev;
+ LOG.debug("Stats for {} are {}", values, this);
+ }
+
+ /**
+ * A Constructor for the pre computed stats.
+ * @param mean the mean of the values.
+ * @param stddev the standard deviation of the values.
+ * @param min the min of the values.
+ * @param max the max of the values.
+ */
+ public NormalDistStats(double mean, double stddev, double min, double max) {
+ this.mean = mean;
+ this.stddev = stddev;
+ this.min = min;
+ this.max = max;
+ }
+
+ /**
+ * Generate a random number that follows the statistical distribution
+ * @param rand the random number generator to use
+ * @return the next number that should follow the statistical distribution.
+ */
+ public double nextRandom(Random rand) {
+ return Math.max(Math.min((rand.nextGaussian() * stddev) + mean, max), min);
+ }
+
+ @Override
+ public String toString() {
+ return "mean: " + mean + " min: " + min + " max: " + max + " stddev: " + stddev;
+ }
+
+ /**
+ * Scale the stats by v. This is not scaling everything proportionally. We don't want the stddev to increase
+ * so instead we scale the mean and shift everything up or down by the same amount.
+ * @param v the amount to scale by 1.0 is nothing 0.5 is half.
+ * @return a copy of this with the needed adjustments.
+ */
+ public NormalDistStats scaleBy(double v) {
+ double newMean = mean * v;
+ double shiftAmount = newMean - mean;
+ return new NormalDistStats(Math.max(0, mean + shiftAmount), stddev,
+ Math.max(0, min + shiftAmount), Math.max(0, max + shiftAmount));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
new file mode 100644
index 0000000..99357d9
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import jdk.nashorn.internal.objects.Global;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.loadgen.NormalDistStats;
+
+/**
+ * A set of measurements about a stream so we can statistically reproduce it.
+ */
+public class OutputStream implements Serializable {
+ //The global stream id is this + the from component it must be a part of.
+ public final String id;
+ public final NormalDistStats rate;
+ public final boolean areKeysSkewed;
+
+ /**
+ * Create an output stream from a config.
+ * @param conf the config to read from.
+ * @return the read OutputStream.
+ */
+ public static OutputStream fromConf(Map<String, Object> conf) {
+ String streamId = (String) conf.getOrDefault("streamId", "default");
+ NormalDistStats rate = NormalDistStats.fromConf((Map<String, Object>) conf.get("rate"));
+ boolean areKeysSkewed = (Boolean) conf.getOrDefault("areKeysSkewed", false);
+ return new OutputStream(streamId, rate, areKeysSkewed);
+ }
+
+ /**
+ * Convert this to a conf.
+ * @return the conf.
+ */
+ public Map<String, Object> toConf() {
+ Map<String, Object> ret = new HashMap<>();
+ ret.put("streamId", id);
+ ret.put("rate", rate.toConf());
+ ret.put("areKeysSkewed", areKeysSkewed);
+ return ret;
+ }
+
+ public OutputStream remap(String origId, Map<GlobalStreamId, GlobalStreamId> remappedStreams) {
+ GlobalStreamId remapped = remappedStreams.get(new GlobalStreamId(origId, id));
+ return new OutputStream(remapped.get_streamId(), rate, areKeysSkewed);
+ }
+
+ public OutputStream scaleThroughput(double v) {
+ return new OutputStream(id, rate.scaleBy(v), areKeysSkewed);
+ }
+
+ public static class Builder {
+ private String id;
+ private NormalDistStats rate;
+ private boolean areKeysSkewed;
+
+ public String getId() {
+ return id;
+ }
+
+ public Builder withId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public NormalDistStats getRate() {
+ return rate;
+ }
+
+ public Builder withRate(NormalDistStats rate) {
+ this.rate = rate;
+ return this;
+ }
+
+ public boolean isAreKeysSkewed() {
+ return areKeysSkewed;
+ }
+
+ public Builder withAreKeysSkewed(boolean areKeysSkewed) {
+ this.areKeysSkewed = areKeysSkewed;
+ return this;
+ }
+
+ public OutputStream build() {
+ return new OutputStream(id, rate, areKeysSkewed);
+ }
+ }
+
+ /**
+ * Create a new stream with stats
+ * @param id the id of the stream
+ * @param rate the rate of tuples being emitted on this stream
+ * @param areKeysSkewed true if keys are skewed else false. For skewed keys
+ * we only simulate it by using a gaussian distribution to the keys instead
+ * of an even distribution. Tere is no effort made right not to measure the
+ * skewness and reproduce it.
+ */
+ public OutputStream(String id, NormalDistStats rate, boolean areKeysSkewed) {
+ this.id = id;
+ this.rate = rate;
+ this.areKeysSkewed = areKeysSkewed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java
new file mode 100644
index 0000000..80111c8
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Provides an API to simulate the output of a stream.
+ * <p>
+ * Right now it is just rate, but in the future we expect to do data skew as well...
+ * </p>
+ */
+public class OutputStreamEngine {
+ private static final double NANO_PER_SEC = 1_000_000_000.0;
+ private static final long UPDATE_RATE_PERIOD_NS = ((long)NANO_PER_SEC * 2);
+ private static final String[] KEYS = new String[2048];
+
+ static {
+ //We get a new random number and seed it to make sure that runs are consistent where possible.
+ Random r = new Random(KEYS.length);
+ for (int i = 0; i < KEYS.length; i++) {
+ KEYS[i] = String.valueOf(r.nextDouble());
+ }
+ }
+
+ private long periodNano;
+ private long emitAmount;
+ private final Random rand;
+ private long nextEmitTime;
+ private long nextRateRandomizeTime;
+ private long emitsLeft;
+ private final OutputStream stats;
+ public final String streamName;
+
+ /**
+ * Create an engine that can simulate the given stats.
+ * @param stats the stats to follow
+ */
+ public OutputStreamEngine(OutputStream stats) {
+ this.stats = stats;
+ rand = ThreadLocalRandom.current();
+ selectNewRate();
+ //Start emitting right now
+ nextEmitTime = System.nanoTime();
+ nextRateRandomizeTime = nextEmitTime + UPDATE_RATE_PERIOD_NS;
+ emitsLeft = emitAmount;
+ streamName = stats.id;
+ }
+
+ private void selectNewRate() {
+ double ratePerSecond = stats.rate.nextRandom(rand);
+ if (ratePerSecond > 0) {
+ periodNano = Math.max(1, (long)(NANO_PER_SEC / ratePerSecond));
+ emitAmount = Math.max(1, (long)((ratePerSecond / NANO_PER_SEC) * periodNano));
+ } else {
+ //if it is is 0 or less it really is 1 per 10 seconds.
+ periodNano = (long)NANO_PER_SEC * 10;
+ emitAmount = 1;
+ }
+ }
+
+ /**
+ * Should we emit or not.
+ * @return the start time of the message, or null of nothing should be emitted.
+ */
+ public Long shouldEmit() {
+ long time = System.nanoTime();
+ if (emitsLeft <= 0 && nextEmitTime <= time) {
+ emitsLeft = emitAmount;
+ nextEmitTime = nextEmitTime + periodNano;
+ }
+
+ if (nextRateRandomizeTime <= time) {
+ //Once every UPDATE_RATE_PERIOD_NS
+ selectNewRate();
+ nextRateRandomizeTime = nextEmitTime + UPDATE_RATE_PERIOD_NS;
+ }
+
+ if (emitsLeft > 0) {
+ emitsLeft--;
+ return nextEmitTime - periodNano;
+ }
+ return null;
+ }
+
+ /**
+ * Get the next key to emit.
+ * @return the key that should be emitted.
+ */
+ public String nextKey() {
+ int keyIndex;
+ if (stats.areKeysSkewed) {
+ //We set the stddev of the skewed keys to be 1/5 of the length, but then we use the absolute value
+ // of that so everything is skewed towards 0
+ keyIndex = Math.min(KEYS.length - 1 , Math.abs((int)(rand.nextGaussian() * KEYS.length / 5)));
+ } else {
+ keyIndex = rand.nextInt(KEYS.length);
+ }
+ return KEYS[keyIndex];
+ }
+
+ public int nextInt(int bound) {
+ return rand.nextInt(bound);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ScopedTopologySet.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ScopedTopologySet.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ScopedTopologySet.java
new file mode 100644
index 0000000..1b6ed74
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ScopedTopologySet.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of topology names that will be killed when this is closed.
+ */
+public class ScopedTopologySet extends HashSet<String> implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(ScopedTopologySet.class);
+ private static final KillOptions NO_WAIT_KILL = new KillOptions();
+
+ static {
+ NO_WAIT_KILL.set_wait_secs(0);
+ }
+
+ private final Nimbus.Iface client;
+ private final Set<String> unmodWrapper;
+
+ public ScopedTopologySet(Nimbus.Iface client) {
+ this.client = client;
+ unmodWrapper = Collections.unmodifiableSet(this);
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new RuntimeException("Unmodifiable Set");
+ }
+
+ @Override
+ public void clear() {
+ throw new RuntimeException("Unmodifiable Set");
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new RuntimeException("Unmodifiable Set");
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new RuntimeException("Unmodifiable Set");
+ }
+
+ @Override
+ public void close() {
+ RuntimeException saved = null;
+ for (Iterator<String> it = super.iterator(); it.hasNext();) {
+ String name = it.next();
+ try {
+ client.killTopologyWithOpts(name, NO_WAIT_KILL);
+ } catch (Exception e) {
+ RuntimeException wrapped = new RuntimeException("Error trying to kill " + name, e);
+ if (saved != null) {
+ saved.addSuppressed(wrapped);
+ } else {
+ saved = wrapped;
+ }
+ }
+ }
+ super.clear();
+ if (saved != null) {
+ throw saved;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
index 96c13c5..2c22b42 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
@@ -15,114 +15,67 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.starter;
-import java.util.Collection;
+package org.apache.storm.loadgen;
+
import java.util.HashMap;
import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-import org.HdrHistogram.Histogram;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.ClusterSummary;
-import org.apache.storm.generated.ExecutorSummary;
-import org.apache.storm.generated.KillOptions;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.generated.SpoutStats;
-import org.apache.storm.generated.TopologyInfo;
-import org.apache.storm.generated.TopologySummary;
-import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
-import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
-import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
-import org.apache.storm.misc.metric.HttpForwardingMetricsServer;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
+import org.apache.storm.metric.LoggingMetricsConsumer;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* WordCount but the spout goes at a predefined rate and we collect
* proper latency statistics.
*/
public class ThroughputVsLatency {
- private static class SentWithTime {
- public final String sentence;
- public final long time;
-
- SentWithTime(String sentence, long time) {
- this.sentence = sentence;
- this.time = time;
- }
- }
-
- public static class FastRandomSentenceSpout extends BaseRichSpout {
- static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
- "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
-
- SpoutOutputCollector _collector;
- long _periodNano;
- long _emitAmount;
- Random _rand;
- long _nextEmitTime;
- long _emitsLeft;
- HistogramMetric _histo;
+ private static final Logger LOG = LoggerFactory.getLogger(GenLoad.class);
+ private static final int TEST_EXECUTE_TIME_DEFAULT = 5;
+ private static final long DEFAULT_RATE_PER_SECOND = 500;
+ private static final String DEFAULT_TOPO_NAME = "wc-test";
+ private static final int DEFAULT_NUM_SPOUTS = 1;
+ private static final int DEFAULT_NUM_SPLITS = 1;
+ private static final int DEFAULT_NUM_COUNTS = 1;
+
+ public static class FastRandomSentenceSpout extends LoadSpout {
+ static final String[] SENTENCES = new String[] {
+ "the cow jumped over the moon",
+ "an apple a day keeps the doctor away",
+ "four score and seven years ago",
+ "snow white and the seven dwarfs",
+ "i am at two with nature"
+ };
+ /**
+ * Constructor.
+ * @param ratePerSecond the rate to emite tuples at.
+ */
public FastRandomSentenceSpout(long ratePerSecond) {
- if (ratePerSecond > 0) {
- _periodNano = Math.max(1, 1000000000/ratePerSecond);
- _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
- } else {
- _periodNano = Long.MAX_VALUE - 1;
- _emitAmount = 1;
- }
+ super(ratePerSecond);
}
@Override
- public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- _rand = ThreadLocalRandom.current();
- _nextEmitTime = System.nanoTime();
- _emitsLeft = _emitAmount;
- _histo = new HistogramMetric(3600000000000L, 3);
- context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
- }
-
- @Override
- public void nextTuple() {
- if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
- _emitsLeft = _emitAmount;
- _nextEmitTime = _nextEmitTime + _periodNano;
- }
-
- if (_emitsLeft > 0) {
- String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
- _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
- _emitsLeft--;
- }
- }
-
- @Override
- public void ack(Object id) {
- long end = System.nanoTime();
- SentWithTime st = (SentWithTime)id;
- _histo.recordValue(end-st.time);
- }
-
- @Override
- public void fail(Object id) {
- SentWithTime st = (SentWithTime)id;
- _collector.emit(new Values(st.sentence), id);
+ protected Values getNextValues(OutputStreamEngine se) {
+ String sentence = SENTENCES[se.nextInt(SENTENCES.length)];
+ return new Values(sentence);
}
@Override
@@ -147,14 +100,15 @@ public class ThroughputVsLatency {
}
public static class WordCount extends BaseBasicBolt {
- Map<String, Integer> counts = new HashMap<String, Integer>();
+ Map<String, Integer> counts = new HashMap<>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
- if (count == null)
+ if (count == null) {
count = 0;
+ }
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
@@ -166,184 +120,103 @@ public class ThroughputVsLatency {
}
}
- private static class MemMeasure {
- private long _mem = 0;
- private long _time = 0;
-
- public synchronized void update(long mem) {
- _mem = mem;
- _time = System.currentTimeMillis();
- }
-
- public synchronized long get() {
- return isExpired() ? 0l : _mem;
- }
-
- public synchronized boolean isExpired() {
- return (System.currentTimeMillis() - _time) >= 20000;
- }
- }
-
- private static final Histogram _histo = new Histogram(3600000000000L, 3);
- private static final AtomicLong _systemCPU = new AtomicLong(0);
- private static final AtomicLong _userCPU = new AtomicLong(0);
- private static final AtomicLong _gcCount = new AtomicLong(0);
- private static final AtomicLong _gcMs = new AtomicLong(0);
- private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
-
- private static long readMemory() {
- long total = 0;
- for (MemMeasure mem: _memoryBytes.values()) {
- total += mem.get();
- }
- return total;
- }
-
- private static long _prev_acked = 0;
- private static long _prev_uptime = 0;
-
- public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
- ClusterSummary summary = client.getClusterInfo();
- String id = null;
- for (TopologySummary ts: summary.get_topologies()) {
- if (name.equals(ts.get_name())) {
- id = ts.get_id();
+ /**
+ * The main entry point for ThroughputVsLatency.
+ * @param args the command line args
+ * @throws Exception on any error.
+ */
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption(Option.builder("h")
+ .longOpt("help")
+ .desc("Print a help message")
+ .build());
+ options.addOption(Option.builder("t")
+ .longOpt("test-time")
+ .argName("MINS")
+ .hasArg()
+ .desc("How long to run the tests for in mins (defaults to " + TEST_EXECUTE_TIME_DEFAULT + ")")
+ .build());
+ options.addOption(Option.builder()
+ .longOpt("rate")
+ .argName("SENTENCES/SEC")
+ .hasArg()
+ .desc("How many sentences per second to run. (defaults to " + DEFAULT_RATE_PER_SECOND + ")")
+ .build());
+ options.addOption(Option.builder()
+ .longOpt("name")
+ .argName("TOPO_NAME")
+ .hasArg()
+ .desc("Name of the topology to run (defaults to " + DEFAULT_TOPO_NAME + ")")
+ .build());
+ options.addOption(Option.builder()
+ .longOpt("spouts")
+ .argName("NUM")
+ .hasArg()
+ .desc("Number of spouts to use (defaults to " + DEFAULT_NUM_SPOUTS + ")")
+ .build());
+ options.addOption(Option.builder()
+ .longOpt("splitters")
+ .argName("NUM")
+ .hasArg()
+ .desc("Number of splitter bolts to use (defaults to " + DEFAULT_NUM_SPLITS + ")")
+ .build());
+ options.addOption(Option.builder()
+ .longOpt("counters")
+ .argName("NUM")
+ .hasArg()
+ .desc("Number of counter bolts to use (defaults to " + DEFAULT_NUM_COUNTS + ")")
+ .build());
+ LoadMetricsServer.addCommandLineOptions(options);
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = null;
+ Exception commandLineException = null;
+ double numMins = TEST_EXECUTE_TIME_DEFAULT;
+ double ratePerSecond = DEFAULT_RATE_PER_SECOND;
+ String name = DEFAULT_TOPO_NAME;
+ int numSpouts = DEFAULT_NUM_SPOUTS;
+ int numSplits = DEFAULT_NUM_SPLITS;
+ int numCounts = DEFAULT_NUM_COUNTS;
+ try {
+ cmd = parser.parse(options, args);
+ if (cmd.hasOption("t")) {
+ numMins = Double.valueOf(cmd.getOptionValue("t"));
}
- }
- if (id == null) {
- throw new Exception("Could not find a topology named "+name);
- }
- TopologyInfo info = client.getTopologyInfo(id);
- int uptime = info.get_uptime_secs();
- long acked = 0;
- long failed = 0;
- for (ExecutorSummary exec: info.get_executors()) {
- if ("spout".equals(exec.get_component_id()) && exec.get_stats() != null && exec.get_stats().get_specific() != null) {
- SpoutStats stats = exec.get_stats().get_specific().get_spout();
- Map<String, Long> failedMap = stats.get_failed().get(":all-time");
- Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
- if (ackedMap != null) {
- for (String key: ackedMap.keySet()) {
- if (failedMap != null) {
- Long tmp = failedMap.get(key);
- if (tmp != null) {
- failed += tmp;
- }
- }
- long ackVal = ackedMap.get(key);
- acked += ackVal;
- }
- }
+ if (cmd.hasOption("rate")) {
+ ratePerSecond = Double.parseDouble(cmd.getOptionValue("rate"));
}
+ if (cmd.hasOption("name")) {
+ name = cmd.getOptionValue("name");
+ }
+ if (cmd.hasOption("spouts")) {
+ numSpouts = Integer.parseInt(cmd.getOptionValue("spouts"));
+ }
+ if (cmd.hasOption("splitters")) {
+ numSplits = Integer.parseInt(cmd.getOptionValue("splitters"));
+ }
+ if (cmd.hasOption("counters")) {
+ numCounts = Integer.parseInt(cmd.getOptionValue("counters"));
+ }
+ } catch (ParseException | NumberFormatException e) {
+ commandLineException = e;
}
- long ackedThisTime = acked - _prev_acked;
- long thisTime = uptime - _prev_uptime;
- long nnpct, nnnpct, min, max;
- double mean, stddev;
- synchronized(_histo) {
- nnpct = _histo.getValueAtPercentile(99.0);
- nnnpct = _histo.getValueAtPercentile(99.9);
- min = _histo.getMinValue();
- max = _histo.getMaxValue();
- mean = _histo.getMean();
- stddev = _histo.getStdDeviation();
- _histo.reset();
- }
- long user = _userCPU.getAndSet(0);
- long sys = _systemCPU.getAndSet(0);
- long gc = _gcMs.getAndSet(0);
- double memMB = readMemory() / (1024.0 * 1024.0);
- System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
- "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
- "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
- uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
- min, max, mean, stddev, user, sys, gc, memMB);
- _prev_uptime = uptime;
- _prev_acked = acked;
- }
-
- public static void kill(Nimbus.Iface client, String name) throws Exception {
- KillOptions opts = new KillOptions();
- opts.set_wait_secs(0);
- client.killTopologyWithOpts(name, opts);
- }
-
- public static void main(String[] args) throws Exception {
- long ratePerSecond = 500;
- if (args != null && args.length > 0) {
- ratePerSecond = Long.valueOf(args[0]);
- }
-
- int parallelism = 4;
- if (args != null && args.length > 1) {
- parallelism = Integer.valueOf(args[1]);
- }
-
- int numMins = 5;
- if (args != null && args.length > 2) {
- numMins = Integer.valueOf(args[2]);
- }
-
- String name = "wc-test";
- if (args != null && args.length > 3) {
- name = args[3];
+ if (commandLineException != null || cmd.hasOption('h')) {
+ if (commandLineException != null) {
+ System.err.println("ERROR " + commandLineException.getMessage());
+ }
+ new HelpFormatter().printHelp("ThroughputVsLatency [options]", options);
+ return;
}
Config conf = new Config();
- HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
- @Override
- public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
- String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
- for (DataPoint dp: dataPoints) {
- if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
- synchronized(_histo) {
- _histo.add((Histogram)dp.value);
- }
- } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
- Map<Object, Object> m = (Map<Object, Object>)dp.value;
- Object sys = m.get("sys-ms");
- if (sys instanceof Number) {
- _systemCPU.getAndAdd(((Number)sys).longValue());
- }
- Object user = m.get("user-ms");
- if (user instanceof Number) {
- _userCPU.getAndAdd(((Number)user).longValue());
- }
- } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
- Map<Object, Object> m = (Map<Object, Object>)dp.value;
- Object count = m.get("count");
- if (count instanceof Number) {
- _gcCount.getAndAdd(((Number)count).longValue());
- }
- Object time = m.get("timeMs");
- if (time instanceof Number) {
- _gcMs.getAndAdd(((Number)time).longValue());
- }
- } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
- Map<Object, Object> m = (Map<Object, Object>)dp.value;
- Object val = m.get("usedBytes");
- if (val instanceof Number) {
- MemMeasure mm = _memoryBytes.get(worker);
- if (mm == null) {
- mm = new MemMeasure();
- MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
- mm = tmp == null ? mm : tmp;
- }
- mm.update(((Number)val).longValue());
- }
- }
- }
- }
- };
-
+ LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd);
metricServer.serve();
String url = metricServer.getUrl();
NimbusClient client = NimbusClient.getConfiguredClient(conf);
- conf.setNumWorkers(parallelism);
- conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
- conf.registerMetricsConsumer(org.apache.storm.misc.metric.HttpForwardingMetricsConsumer.class, url, 1);
- Map<String, String> workerMetrics = new HashMap<String, String>();
+ conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+ conf.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, url, 1);
+ Map<String, String> workerMetrics = new HashMap<>();
if (!NimbusClient.isLocalOverride()) {
//sigar uses JNI and does not work in local mode
workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
@@ -351,27 +224,27 @@ public class ThroughputVsLatency {
conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
- "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
+ "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC "
+ + "-XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
TopologyBuilder builder = new TopologyBuilder();
- int numEach = 4 * parallelism;
- builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
+ builder.setSpout("spout", new FastRandomSentenceSpout((long) ratePerSecond / numSpouts), numSpouts);
+ builder.setBolt("split", new SplitSentence(), numSplits).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), numCounts).fieldsGrouping("split", new Fields("word"));
- builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
- builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
-
- try {
+ int exitStatus = -1;
+ try (ScopedTopologySet topologyNames = new ScopedTopologySet(client.getClient())) {
StormSubmitter.submitTopology(name, conf, builder.createTopology());
+ topologyNames.add(name);
- for (int i = 0; i < numMins * 2; i++) {
- Thread.sleep(30 * 1000);
- printMetrics(client.getClient(), name);
- }
+ metricServer.monitorFor(numMins, client.getClient(), topologyNames);
+ exitStatus = 0;
+ } catch (Exception e) {
+ LOG.error("Error while running test", e);
} finally {
- kill(client.getClient(), name);
- System.exit(0);
+ System.exit(exitStatus);
}
}
}