You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2017/08/21 19:37:49 UTC

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/2289

    STORM-2702: storm-loadgen

    Some tools to be able to generate load on a cluster for testing.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-2702

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2289.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2289
    
----
commit 0d10b8afe7e282d04b67f1a0c1c90db801842b14
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-08-21T18:29:59Z

    STORM-2702: Part 1.  Move files as needed

commit 6c2dcbedabb88970697f42b6f66bf64177e2ac9c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-08-21T19:36:10Z

    STORM-2702: storm-loadgen

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    Partial review done.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    @knusbaum I think I have addressed all of your review comments so far.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2289#discussion_r135086409
  
    --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java ---
    @@ -0,0 +1,263 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.loadgen;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ObjectInputStream;
    +import java.io.Serializable;
    +import java.util.HashMap;
    +import java.util.Map;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.grouping.PartialKeyGrouping;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A set of measurements about a stream so we can statistically reproduce it.
    + */
    +public class InputStream implements Serializable {
    +    private static final Logger LOG = LoggerFactory.getLogger(InputStream.class);
    +    public final String fromComponent;
    +    public final String toComponent;
    +    public final String id;
    +    public final NormalDistStats execTime;
    +    public final NormalDistStats processTime;
    +    public final GroupingType groupingType;
    +    //Cached GlobalStreamId
    +    private GlobalStreamId gsid = null;
    +
    +    /**
    +     * Create an output stream from a config.
    +     * @param conf the config to read from.
    +     * @return the read OutputStream.
    +     */
    +    public static InputStream fromConf(Map<String, Object> conf) {
    +        String component = (String) conf.get("from");
    +        String toComp = (String) conf.get("to");
    +        NormalDistStats execTime = NormalDistStats.fromConf((Map<String, Object>) conf.get("execTime"));
    +        NormalDistStats processTime = NormalDistStats.fromConf((Map<String, Object>) conf.get("processTime"));
    +        Map<String, Object> grouping = (Map<String, Object>) conf.get("grouping");
    +        GroupingType groupingType = GroupingType.fromConf((String) grouping.get("type"));
    +        String streamId = (String) grouping.getOrDefault("streamId", "default");
    +        return new InputStream(component, toComp, streamId, execTime, processTime, groupingType);
    +    }
    +
    +    /**
    +     * Convert this to a conf.
    +     * @return the conf.
    +     */
    +    public Map<String, Object> toConf() {
    +        Map<String, Object> ret = new HashMap<>();
    +        ret.put("from", fromComponent);
    +        ret.put("to", toComponent);
    +        ret.put("execTime", execTime.toConf());
    +        ret.put("processTime", processTime.toConf());
    +
    +        Map<String, Object> grouping = new HashMap<>();
    +        grouping.put("streamId", id);
    +        grouping.put("type", groupingType.toConf());
    +        ret.put("grouping", grouping);
    +
    +        return ret;
    +    }
    +
    +    public static class Builder {
    +        private String fromComponent;
    +        private String toComponent;
    +        private String id;
    +        private NormalDistStats execTime;
    +        private NormalDistStats processTime;
    +        private GroupingType groupingType = GroupingType.SHUFFLE;
    +
    +        public String getFromComponent() {
    --- End diff --
    
    It doesn't really bother me, but do builders usually have getters?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    @knusbaum do you have any other comments?


---

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2289#discussion_r135074782
  
    --- Diff: examples/storm-loadgen/README.md ---
    @@ -0,0 +1,195 @@
    +# Storm Load Generation Tools
    +
    +A set of tools to place an artificial load on a storm cluster to compare against a different storm cluster.  This is particularly helpful when making changes to the data path in storm to see what if any impact the changes had.  This is also useful for end users that want to compare different hardware setups to see what the trade-offs are, although actually running your real topologies is going to be more accurate.
    +
    +## Methodology
    +The idea behind all of these tools is to measure the trade-offs between latency, throughput, and cost when processing data using Apache Storm.
    +
    +When processing data you typically will know a few things.  First you will know about how much data you are going to be processing.  This will typically be a range of values that change throughput the day.  You also will have an idea of how quickly you need the data processed by.  Often this is measured in terms of the latency it takes to process data at the some percentile or set of percentiles.  This is because of most use cases the value of the data declines over time, and being able to react to the data quickly is more valuable.  You probably also have a budget for how much you are willing to spend to be able to process this data.  There are always trade-offs in how quickly you can process some data and how efficiently you can processes that data both in terms of resource usage (cost) and latency.  These tools are designed to help you explore that space.
    +
    +A note on how latency is measured.  Storm typically measures latency from when a message is emitted by a spout until the point it is fully acked or failed (in many versions of storm it actually does this in the acker instead of the spout so it is trying to be a measure of how long it takes for the actual processing, removing as much of the acker overhead as possible).  For these tools we do it differently.  We simulate a throughput and measure the start time of the tuple from when it would have been emitted if the topology could keep up with the load.  In the normal case this should not be an issue, but if the topology cannot keep up with the throughput you will see the latency grow very high compared to the latency reported by storm.
    +
    +## Tools
    +### CaptureLoad 
    +
    +`CaptureLoad` will look at the topologies on a running cluster and store the structure of and metrics about each of theses topologies storing them in a format that can be used later to reproduce a similar load on the cluster.
    +
    +#### Usage
    +```
    +storm jar storm-loadgen.jar org.apache.storm.loadgen.CaptureLoad [options] [topologyName]*
    +```
    +|Option| Description|
    +|-----|-----|
    +|-a,--anonymize | Strip out any possibly identifiable information|
    +| -h,--help | Print a help message |
    +| -o,--output-dir <file> | Where to write (defaults to ./loadgen/)|
    +
    +#### Limitations
    +This is still a work in progress.  It does not currently capture CPU or memory usage of a topology.  Resource requests (used by RAS when scheduling) within the topology are also not captured yet, nor is the user that actually ran the topology.
    +
    +### GenLoad
    +
    +`GenLoad` will take the files produced by `CaptureLoad` and replay them in a simulated way on a cluster.  It also offers lots of ways to capture metrics about those simulated topologies to be able to compare different software versions of different hardware setups.  You can also make adjustments to the topology before submitting it to change the size or throughput of the topology.
    +
    +### Usage
    +```
    +storm jar storm-loadgen.jar org.apache.storm.loadgen.GenLoad [options] [capture_file]*
    +```
    +
    +|Option| Description|
    +|-----|-----|
    +| --debug | Print debug information about the adjusted topology before submitting it. |
    +|-h,--help | Print a help message |
    +| --local-or-shuffle | Replace shuffle grouping with local or shuffle grouping. |
    +| --parallel &lt;MULTIPLIER(:TOPO:COMP)?> | How much to scale the topology up or down in parallelism. The new parallelism will round up to the next whole number. If a topology + component is supplied only that component will be scaled. If topo or component is blank or a `'*'` all topologies or components matched the other part will be scaled. Only 1 scaling rule, the most specific, will be applied to a component. Providing a topology name is considered more specific than not providing one. (defaults to 1.0 no scaling) |
    +| -r,--report-interval &lt;INTERVAL_SECS> | How long in between reported metrics.  Will be rounded up to the next 10 sec boundary. default 30 |
    +| --reporter &lt;TYPE:FILE?OPTIONS>  | Provide the config for a reporter to run. See below for more information about these |
    +| -t,--test-time &lt;MINS> | How long to run the tests for in mins (defaults to 5) |
    +| --throughput &lt;MULTIPLIER(:TOPO:COMP)?> | How much to scale the topology up or down in throughput. If a topology + component is supplied only that component will be scaled. If topo or component is blank or a `'*'` all topologies or components matched will be scaled. Only 1 scaling rule, the most specific, will be applied to a component. Providing a topology name is considered more specific than not providing one.(defaults to 1.0 no scaling)|
    +| -w,--report-window &lt;INTERVAL_SECS> | How long of a rolling window should be in each report.  Will be rounded up to the next report interval boundary. default 30|
    +
    +## ThroughputVsLatency
    +This is a topology similar to `GenLoad` in most ways, except instead of simulating a load it runs a word count algorithm.
    --- End diff --
    
    In what ways is it similar? This sentence implies that it does not simulate load, which seems to be the main purpose of GenLoad.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    @roshannaik I just added in support for a new default reporter that writes the data out in a fixed width format that should be much more human readable, with formatting, and it is added as a reporter if nothing else is writing to stdout or stderr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    sure thanks.


---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    @revans2  sorry missed your msgs from here. 
    - Yes would be nice to have the actual latency (as reported in UI) to also be included here. I also run TVL in modes where the topology cant keep up. i have been having to collect it separately. can report 0 latency if ACKer is disabled.
    - I often copy and paste these csv results into excel for easy viewing. So the csv results with decimals limited to 3 digits would be nice. Unless the decimals after 3 digits is useful for machine readable not  that many decimal places even for machine readable
    - One usability improvement would for the tool *tee* whatever it is printing into the report onto the console as well. helps in multiple ways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2289#discussion_r135073224
  
    --- Diff: examples/storm-loadgen/README.md ---
    @@ -0,0 +1,195 @@
    +# Storm Load Generation Tools
    +
    +A set of tools to place an artificial load on a storm cluster to compare against a different storm cluster.  This is particularly helpful when making changes to the data path in storm to see what if any impact the changes had.  This is also useful for end users that want to compare different hardware setups to see what the trade-offs are, although actually running your real topologies is going to be more accurate.
    +
    +## Methodology
    +The idea behind all of these tools is to measure the trade-offs between latency, throughput, and cost when processing data using Apache Storm.
    +
    +When processing data you typically will know a few things.  First you will know about how much data you are going to be processing.  This will typically be a range of values that change throughput the day.  You also will have an idea of how quickly you need the data processed by.  Often this is measured in terms of the latency it takes to process data at the some percentile or set of percentiles.  This is because of most use cases the value of the data declines over time, and being able to react to the data quickly is more valuable.  You probably also have a budget for how much you are willing to spend to be able to process this data.  There are always trade-offs in how quickly you can process some data and how efficiently you can processes that data both in terms of resource usage (cost) and latency.  These tools are designed to help you explore that space.
    --- End diff --
    
    Couple nits:
    1. 
    ```
     Often this is measured in terms of the latency it takes to process data at the some percentile or set of percentiles.
    ```
    `the some`
    
    
    2. 
    ```This is because of most use cases ...```
    *in* most use cases? *for* most use cases?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    @roshannaik I think I have added in all of your feature requests so far.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    Another useful thing would be to kill to topo with a simple Ctrl-C on the cmd line using a shutdown handler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2289#discussion_r135078488
  
    --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java ---
    @@ -0,0 +1,468 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.loadgen;
    +
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.DefaultParser;
    +import org.apache.commons.cli.HelpFormatter;
    +import org.apache.commons.cli.Option;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.cli.ParseException;
    +import org.apache.storm.Config;
    +import org.apache.storm.generated.Bolt;
    +import org.apache.storm.generated.BoltStats;
    +import org.apache.storm.generated.ClusterSummary;
    +import org.apache.storm.generated.ComponentCommon;
    +import org.apache.storm.generated.ExecutorSummary;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.Nimbus;
    +import org.apache.storm.generated.SpoutSpec;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.StreamInfo;
    +import org.apache.storm.generated.TopologyInfo;
    +import org.apache.storm.generated.TopologyPageInfo;
    +import org.apache.storm.generated.TopologySummary;
    +import org.apache.storm.generated.WorkerSummary;
    +import org.apache.storm.utils.NimbusClient;
    +import org.apache.storm.utils.ObjectReader;
    +import org.json.simple.JSONObject;
    +import org.json.simple.JSONValue;
    +import org.json.simple.parser.JSONParser;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Capture running topologies for load gen later on.
    + */
    +public class CaptureLoad {
    +    private static final Logger LOG = LoggerFactory.getLogger(CaptureLoad.class);
    +    public static final String DEFAULT_OUT_DIR = "./loadgen/";
    +
    +    private static List<Double> extractBoltValues(List<ExecutorSummary> summaries,
    +                                                  GlobalStreamId id,
    +                                                  Function<BoltStats, Map<String, Map<GlobalStreamId, Double>>> func) {
    +
    +        List<Double> ret = new ArrayList<>();
    +        if (summaries != null) {
    +            for (ExecutorSummary summ : summaries) {
    +                if (summ != null && summ.is_set_stats()) {
    +                    Map<String, Map<GlobalStreamId, Double>> data = func.apply(summ.get_stats().get_specific().get_bolt());
    +                    if (data != null) {
    +                        List<Double> subvalues = data.values().stream()
    +                            .map((subMap) -> subMap.get(id))
    +                            .filter((value) -> value != null)
    +                            .mapToDouble((value) -> value.doubleValue())
    +                            .boxed().collect(Collectors.toList());
    --- End diff --
    
    I think the `mapToDouble` and the `boxed` are unnecessary. 
    They are inverse operations, first taking a `Stream<Double>` from `filter(...)` and making a `DoubleStream`, then reboxing and turning the `DoubleStream` into a `Stream<Double>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    +1


---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    Would be nice if the latencies are trimmed off at 1 digit after decimal point


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2289#discussion_r135085622
  
    --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java ---
    @@ -0,0 +1,108 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.loadgen;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.DefaultParser;
    +import org.apache.commons.cli.HelpFormatter;
    +import org.apache.commons.cli.Option;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.cli.ParseException;
    +import org.apache.storm.Config;
    +import org.apache.storm.generated.ClusterSummary;
    +import org.apache.storm.generated.Nimbus;
    +import org.apache.storm.generated.TopologySummary;
    +import org.apache.storm.loadgen.CaptureLoad;
    +import org.apache.storm.utils.NimbusClient;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Estimate the throughput of all topologies.
    + */
    +public class EstimateThroughput {
    +    private static final Logger LOG = LoggerFactory.getLogger(EstimateThroughput.class);
    +
    +    /**
    +     * Main entry point for estimate throughput command.
    +     * @param args the command line arguments.
    +     * @throws Exception on any error.
    +     */
    +    public static void main(String[] args) throws Exception {
    +        Options options = new Options();
    +        options.addOption(Option.builder("h")
    +            .longOpt("help")
    +            .desc("Print a help message")
    +            .build());
    +        CommandLineParser parser = new DefaultParser();
    +        CommandLine cmd = null;
    +        ParseException pe = null;
    +        try {
    +            cmd = parser.parse(options, args);
    +        } catch (ParseException e) {
    +            pe = e;
    +        }
    +        if (pe != null || cmd.hasOption('h')) {
    +            if (pe != null) {
    +                System.err.println("ERROR " + pe.getMessage());
    --- End diff --
    
    (same)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    The latency reported in the TVL report differs from what is shown in the UI by a factor of 10k. Division error maybe ?
    
    Here is a sample report indicating mean latency of 84,246ms but the UI for the same run shows 8.4ms.
    
    start_time(s),end_time(s),completion_rate(tuple/s),mean(ms),99%ile(ms),99.9%ile(ms),cores,mem(MB),failed
    0,30,73695.16666666667,1700.898775864534,2562.719743,2621.439999,1.6000666666666667,75.75511169433594,0
    ....
    389,419,230711.53333333333,**84246**.15838892311,87308.632063,87375.740927,4.228833333333333,105.1185302734375,0
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2289#discussion_r135307748
  
    --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java ---
    @@ -0,0 +1,263 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.loadgen;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ObjectInputStream;
    +import java.io.Serializable;
    +import java.util.HashMap;
    +import java.util.Map;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.grouping.PartialKeyGrouping;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A set of measurements about a stream so we can statistically reproduce it.
    + */
    +public class InputStream implements Serializable {
    +    private static final Logger LOG = LoggerFactory.getLogger(InputStream.class);
    +    public final String fromComponent;
    +    public final String toComponent;
    +    public final String id;
    +    public final NormalDistStats execTime;
    +    public final NormalDistStats processTime;
    +    public final GroupingType groupingType;
    +    //Cached GlobalStreamId
    +    private GlobalStreamId gsid = null;
    +
    +    /**
    +     * Create an output stream from a config.
    +     * @param conf the config to read from.
    +     * @return the read OutputStream.
    +     */
    +    public static InputStream fromConf(Map<String, Object> conf) {
    +        String component = (String) conf.get("from");
    +        String toComp = (String) conf.get("to");
    +        NormalDistStats execTime = NormalDistStats.fromConf((Map<String, Object>) conf.get("execTime"));
    +        NormalDistStats processTime = NormalDistStats.fromConf((Map<String, Object>) conf.get("processTime"));
    +        Map<String, Object> grouping = (Map<String, Object>) conf.get("grouping");
    +        GroupingType groupingType = GroupingType.fromConf((String) grouping.get("type"));
    +        String streamId = (String) grouping.getOrDefault("streamId", "default");
    +        return new InputStream(component, toComp, streamId, execTime, processTime, groupingType);
    +    }
    +
    +    /**
    +     * Convert this to a conf.
    +     * @return the conf.
    +     */
    +    public Map<String, Object> toConf() {
    +        Map<String, Object> ret = new HashMap<>();
    +        ret.put("from", fromComponent);
    +        ret.put("to", toComponent);
    +        ret.put("execTime", execTime.toConf());
    +        ret.put("processTime", processTime.toConf());
    +
    +        Map<String, Object> grouping = new HashMap<>();
    +        grouping.put("streamId", id);
    +        grouping.put("type", groupingType.toConf());
    +        ret.put("grouping", grouping);
    +
    +        return ret;
    +    }
    +
    +    public static class Builder {
    +        private String fromComponent;
    +        private String toComponent;
    +        private String id;
    +        private NormalDistStats execTime;
    +        private NormalDistStats processTime;
    +        private GroupingType groupingType = GroupingType.SHUFFLE;
    +
    +        public String getFromComponent() {
    --- End diff --
    
    No they don't but it was the most convenient way to pull out the needed information while still building the object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2289#discussion_r135084038
  
    --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java ---
    @@ -0,0 +1,468 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.loadgen;
    +
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.DefaultParser;
    +import org.apache.commons.cli.HelpFormatter;
    +import org.apache.commons.cli.Option;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.cli.ParseException;
    +import org.apache.storm.Config;
    +import org.apache.storm.generated.Bolt;
    +import org.apache.storm.generated.BoltStats;
    +import org.apache.storm.generated.ClusterSummary;
    +import org.apache.storm.generated.ComponentCommon;
    +import org.apache.storm.generated.ExecutorSummary;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.Nimbus;
    +import org.apache.storm.generated.SpoutSpec;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.StreamInfo;
    +import org.apache.storm.generated.TopologyInfo;
    +import org.apache.storm.generated.TopologyPageInfo;
    +import org.apache.storm.generated.TopologySummary;
    +import org.apache.storm.generated.WorkerSummary;
    +import org.apache.storm.utils.NimbusClient;
    +import org.apache.storm.utils.ObjectReader;
    +import org.json.simple.JSONObject;
    +import org.json.simple.JSONValue;
    +import org.json.simple.parser.JSONParser;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Capture running topologies for load gen later on.
    + */
    +public class CaptureLoad {
    +    private static final Logger LOG = LoggerFactory.getLogger(CaptureLoad.class);
    +    public static final String DEFAULT_OUT_DIR = "./loadgen/";
    +
    +    private static List<Double> extractBoltValues(List<ExecutorSummary> summaries,
    +                                                  GlobalStreamId id,
    +                                                  Function<BoltStats, Map<String, Map<GlobalStreamId, Double>>> func) {
    +
    +        List<Double> ret = new ArrayList<>();
    +        if (summaries != null) {
    +            for (ExecutorSummary summ : summaries) {
    +                if (summ != null && summ.is_set_stats()) {
    +                    Map<String, Map<GlobalStreamId, Double>> data = func.apply(summ.get_stats().get_specific().get_bolt());
    +                    if (data != null) {
    +                        List<Double> subvalues = data.values().stream()
    +                            .map((subMap) -> subMap.get(id))
    +                            .filter((value) -> value != null)
    +                            .mapToDouble((value) -> value.doubleValue())
    +                            .boxed().collect(Collectors.toList());
    +                        ret.addAll(subvalues);
    +                    }
    +                }
    +            }
    +        }
    +        return ret;
    +    }
    +
    +    static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary topologySummary) throws Exception {
    +        String topologyName = topologySummary.get_name();
    +        LOG.info("Capturing {}...", topologyName);
    +        String topologyId = topologySummary.get_id();
    +        TopologyInfo info = client.getTopologyInfo(topologyId);
    +        TopologyPageInfo tpinfo = client.getTopologyPageInfo(topologyId, ":all-time", false);
    +        @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
    +        StormTopology topo = client.getUserTopology(topologyId);
    +        //Done capturing topology information...
    +
    +        Map<String, Object> savedTopoConf = new HashMap<>();
    +        Map<String, Object> topoConf = (Map<String, Object>) JSONValue.parse(client.getTopologyConf(topologyId));
    +        for (String key: TopologyLoadConf.IMPORTANT_CONF_KEYS) {
    +            Object o = topoConf.get(key);
    +            if (o != null) {
    +                savedTopoConf.put(key, o);
    +                LOG.info("with config {}: {}", key, o);
    +            }
    +        }
    +        //Lets use the number of actually scheduled workers as a way to bridge RAS and non-RAS
    +        int numWorkers = tpinfo.get_num_workers();
    +        if (savedTopoConf.containsKey(Config.TOPOLOGY_WORKERS)) {
    +            numWorkers = Math.max(numWorkers, ((Number)savedTopoConf.get(Config.TOPOLOGY_WORKERS)).intValue());
    +        }
    +        savedTopoConf.put(Config.TOPOLOGY_WORKERS, numWorkers);
    +
    +        Map<String, LoadCompConf.Builder> boltBuilders = new HashMap<>();
    +        Map<String, LoadCompConf.Builder> spoutBuilders = new HashMap<>();
    +        List<InputStream.Builder> inputStreams = new ArrayList<>();
    +        Map<GlobalStreamId, OutputStream.Builder> outStreams = new HashMap<>();
    +
    +        //Bolts
    +        if (topo.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> boltSpec : topo.get_bolts().entrySet()) {
    +                String boltComp = boltSpec.getKey();
    +                LOG.info("Found bolt {}...", boltComp);
    +                Bolt bolt = boltSpec.getValue();
    +                ComponentCommon common = bolt.get_common();
    +                Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
    +                if (inputs != null) {
    +                    for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
    +                        GlobalStreamId id = input.getKey();
    +                        LOG.info("with input {}...", id);
    +                        Grouping grouping = input.getValue();
    +                        InputStream.Builder builder = new InputStream.Builder()
    +                            .withId(id.get_streamId())
    +                            .withFromComponent(id.get_componentId())
    +                            .withToComponent(boltComp)
    +                            .withGroupingType(grouping);
    +                        inputStreams.add(builder);
    +                    }
    +                }
    +                Map<String, StreamInfo> outputs = common.get_streams();
    +                if (outputs != null) {
    +                    for (String name : outputs.keySet()) {
    +                        GlobalStreamId id = new GlobalStreamId(boltComp, name);
    +                        LOG.info("and output {}...", id);
    +                        OutputStream.Builder builder = new OutputStream.Builder()
    +                            .withId(name);
    +                        outStreams.put(id, builder);
    +                    }
    +                }
    +                LoadCompConf.Builder builder = new LoadCompConf.Builder()
    +                    .withParallelism(common.get_parallelism_hint())
    +                    .withId(boltComp);
    +                boltBuilders.put(boltComp, builder);
    +            }
    +
    +            Map<String, Map<String, Double>> boltResources = getBoltsResources(topo, topoConf);
    +            for (Map.Entry<String, Map<String, Double>> entry: boltResources.entrySet()) {
    +                LoadCompConf.Builder bd = boltBuilders.get(entry.getKey());
    +                if (bd != null) {
    +                    Map<String, Double> resources = entry.getValue();
    +                    Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
    +                    if (cpu != null) {
    +                        bd.withCpuLoad(cpu);
    +                    }
    +                    Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +                    if (mem != null) {
    +                        bd.withMemoryLoad(mem);
    +                    }
    +                }
    +            }
    +        }
    +
    +        //Spouts
    +        if (topo.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spoutSpec : topo.get_spouts().entrySet()) {
    +                String spoutComp = spoutSpec.getKey();
    +                LOG.info("Found Spout {}...", spoutComp);
    +                SpoutSpec spout = spoutSpec.getValue();
    +                ComponentCommon common = spout.get_common();
    +
    +                Map<String, StreamInfo> outputs = common.get_streams();
    +                if (outputs != null) {
    +                    for (String name : outputs.keySet()) {
    +                        GlobalStreamId id = new GlobalStreamId(spoutComp, name);
    +                        LOG.info("with output {}...", id);
    +                        OutputStream.Builder builder = new OutputStream.Builder()
    +                            .withId(name);
    +                        outStreams.put(id, builder);
    +                    }
    +                }
    +                LoadCompConf.Builder builder = new LoadCompConf.Builder()
    +                    .withParallelism(common.get_parallelism_hint())
    +                    .withId(spoutComp);
    +                spoutBuilders.put(spoutComp, builder);
    +            }
    +
    +            Map<String, Map<String, Double>> spoutResources = getSpoutsResources(topo, topoConf);
    +            for (Map.Entry<String, Map<String, Double>> entry: spoutResources.entrySet()) {
    +                LoadCompConf.Builder sd = spoutBuilders.get(entry.getKey());
    +                if (sd != null) {
    +                    Map<String, Double> resources = entry.getValue();
    +                    Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
    +                    if (cpu != null) {
    +                        sd.withCpuLoad(cpu);
    +                    }
    +                    Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +                    if (mem != null) {
    +                        sd.withMemoryLoad(mem);
    +                    }
    +                }
    +            }
    +        }
    +
    +        //Stats...
    +        Map<String, List<ExecutorSummary>> byComponent = new HashMap<>();
    +        for (ExecutorSummary executor: info.get_executors()) {
    +            String component = executor.get_component_id();
    +            List<ExecutorSummary> list = byComponent.get(component);
    +            if (list == null) {
    +                list = new ArrayList<>();
    +                byComponent.put(component, list);
    +            }
    +            list.add(executor);
    +        }
    +
    +        List<InputStream> streams = new ArrayList<>(inputStreams.size());
    +        //Compute the stats for the different input streams
    +        for (InputStream.Builder builder : inputStreams) {
    +            GlobalStreamId streamId = new GlobalStreamId(builder.getFromComponent(), builder.getId());
    +            List<ExecutorSummary> summaries = byComponent.get(builder.getToComponent());
    +            //Execute and process latency...
    +            builder.withProcessTime(new NormalDistStats(
    +                extractBoltValues(summaries, streamId, BoltStats::get_process_ms_avg)));
    +            builder.withExecTime(new NormalDistStats(
    +                extractBoltValues(summaries, streamId, BoltStats::get_execute_ms_avg)));
    +            //InputStream is done
    +            streams.add(builder.build());
    +        }
    +
    +        //There is a bug in some versions that returns 0 for the uptime.
    +        // To work around it we should get it an alternative (working) way.
    +        Map<String, Integer> workerToUptime = new HashMap<>();
    +        for (WorkerSummary ws : tpinfo.get_workers()) {
    +            workerToUptime.put(ws.get_supervisor_id() + ":" + ws.get_port(), ws.get_uptime_secs());
    +        }
    +        LOG.debug("WORKER TO UPTIME {}", workerToUptime);
    +
    +        for (Map.Entry<GlobalStreamId, OutputStream.Builder> entry : outStreams.entrySet()) {
    +            OutputStream.Builder builder = entry.getValue();
    +            GlobalStreamId id = entry.getKey();
    +            List<Double> emittedRate = new ArrayList<>();
    +            List<ExecutorSummary> summaries = byComponent.get(id.get_componentId());
    +            if (summaries != null) {
    +                for (ExecutorSummary summary: summaries) {
    +                    if (summary.is_set_stats()) {
    +                        int uptime = summary.get_uptime_secs();
    +                        LOG.debug("UPTIME {}", uptime);
    +                        if (uptime <= 0) {
    +                            //Likely it is because of a bug, so try to get it another way
    +                            String key = summary.get_host() + ":" + summary.get_port();
    +                            uptime = workerToUptime.getOrDefault(key, 1);
    +                            LOG.debug("Getting uptime for worker {}, {}", key, uptime);
    +                        }
    +                        for (Map.Entry<String, Map<String, Long>> statEntry : summary.get_stats().get_emitted().entrySet()) {
    +                            String timeWindow = statEntry.getKey();
    +                            long timeSecs = uptime;
    +                            try {
    +                                timeSecs = Long.valueOf(timeWindow);
    +                            } catch (NumberFormatException e) {
    +                                //Ignored...
    +                            }
    +                            timeSecs = Math.min(timeSecs, uptime);
    +                            Long count = statEntry.getValue().get(id.get_streamId());
    +                            if (count != null) {
    +                                LOG.debug("{} emitted {} for {} secs or {} tuples/sec",
    +                                    id, count, timeSecs, count.doubleValue() / timeSecs);
    +                                emittedRate.add(count.doubleValue() / timeSecs);
    +                            }
    +                        }
    +                    }
    +                }
    +            }
    +            builder.withRate(new NormalDistStats(emittedRate));
    +
    +            //The OutputStream is done
    +            LoadCompConf.Builder comp = boltBuilders.get(id.get_componentId());
    +            if (comp == null) {
    +                comp = spoutBuilders.get(id.get_componentId());
    +            }
    +            comp.withStream(builder.build());
    +        }
    +
    +        List<LoadCompConf> spouts = spoutBuilders.values().stream()
    +            .map((b) -> b.build())
    +            .collect(Collectors.toList());
    +
    +        List<LoadCompConf> bolts = boltBuilders.values().stream()
    +            .map((b) -> b.build())
    +            .collect(Collectors.toList());
    +
    +        return new TopologyLoadConf(topologyName, savedTopoConf, spouts, bolts, streams);
    +    }
    +
    +    /**
    +     * Main entry point for CaptureLoad command.
    +     * @param args the arguments to the command
    +     * @throws Exception on any error
    +     */
    +    public static void main(String[] args) throws Exception {
    +        Options options = new Options();
    +        options.addOption(Option.builder("a")
    +            .longOpt("anonymize")
    +            .desc("Strip out any possibly identifiable information")
    +            .build());
    +        options.addOption(Option.builder("o")
    +            .longOpt("output-dir")
    +            .argName("<file>")
    +            .hasArg()
    +            .desc("Where to write (defaults to " + DEFAULT_OUT_DIR + ")")
    +            .build());
    +        options.addOption(Option.builder("h")
    +            .longOpt("help")
    +            .desc("Print a help message")
    +            .build());
    +        CommandLineParser parser = new DefaultParser();
    +        CommandLine cmd = null;
    +        ParseException pe = null;
    +        try {
    +            cmd = parser.parse(options, args);
    +        } catch (ParseException e) {
    +            pe = e;
    +        }
    +        if (pe != null || cmd.hasOption('h')) {
    +            if (pe != null) {
    +                System.err.println("ERROR " + pe.getMessage());
    --- End diff --
    
    Minor Nit: Can probably just dump this `println` in the `catch {}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    another observation of latency reported differing from UI and report (this time by ~120x):
    `
    start_time(s),end_time(s),completion_rate(tuple/s),mean(ms),99%ile(ms),99.9%ile(ms),cores,mem(MB),failed
    ....
    181,211,145589.76666666666,99707.19604026663,107374.182399,107508.400127,4.9433333333333325,2169.0212020874023,0
    211,241,145994.63333333333,114849.46619218022,122876.329983,122943.438847,4.990833333333334,1467.2594909667969,0
    241,271,144041.5,130208.78991536044,138512.695295,138781.130751,4.888733333333334,1794.4858627319336,0
    `
    <img width="1232" alt="stats" src="https://user-images.githubusercontent.com/2366541/29654810-8dd78d6a-8863-11e7-889a-7568f249f88c.png">



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2289


---

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2289#discussion_r135074180
  
    --- Diff: examples/storm-loadgen/README.md ---
    @@ -0,0 +1,195 @@
    +# Storm Load Generation Tools
    +
    +A set of tools to place an artificial load on a storm cluster to compare against a different storm cluster.  This is particularly helpful when making changes to the data path in storm to see what if any impact the changes had.  This is also useful for end users that want to compare different hardware setups to see what the trade-offs are, although actually running your real topologies is going to be more accurate.
    +
    +## Methodology
    +The idea behind all of these tools is to measure the trade-offs between latency, throughput, and cost when processing data using Apache Storm.
    +
    +When processing data you typically will know a few things.  First you will know about how much data you are going to be processing.  This will typically be a range of values that change throughput the day.  You also will have an idea of how quickly you need the data processed by.  Often this is measured in terms of the latency it takes to process data at the some percentile or set of percentiles.  This is because of most use cases the value of the data declines over time, and being able to react to the data quickly is more valuable.  You probably also have a budget for how much you are willing to spend to be able to process this data.  There are always trade-offs in how quickly you can process some data and how efficiently you can processes that data both in terms of resource usage (cost) and latency.  These tools are designed to help you explore that space.
    +
    +A note on how latency is measured.  Storm typically measures latency from when a message is emitted by a spout until the point it is fully acked or failed (in many versions of storm it actually does this in the acker instead of the spout so it is trying to be a measure of how long it takes for the actual processing, removing as much of the acker overhead as possible).  For these tools we do it differently.  We simulate a throughput and measure the start time of the tuple from when it would have been emitted if the topology could keep up with the load.  In the normal case this should not be an issue, but if the topology cannot keep up with the throughput you will see the latency grow very high compared to the latency reported by storm.
    +
    +## Tools
    +### CaptureLoad 
    +
    +`CaptureLoad` will look at the topologies on a running cluster and store the structure of and metrics about each of theses topologies storing them in a format that can be used later to reproduce a similar load on the cluster.
    --- End diff --
    
    theses


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    I think I am done with new features unless someone else comes up with something else that I missed out on. I really would appreciate some reviews. @knusbaum if you have the time I know you expressed interest in this patch.
    
    @roshannaik if you really want metrics about the emit to ack/fail latency I can add those in too, but the only time that they become useful is if the topology cannot keep up with the load placed on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    @roshannaik I merged this into master, but if you want any changes to it please just ask and I'll see what I can do.


---

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2289#discussion_r135317773
  
    --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java ---
    @@ -0,0 +1,263 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.loadgen;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ObjectInputStream;
    +import java.io.Serializable;
    +import java.util.HashMap;
    +import java.util.Map;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.grouping.PartialKeyGrouping;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A set of measurements about a stream so we can statistically reproduce it.
    + */
    +public class InputStream implements Serializable {
    +    private static final Logger LOG = LoggerFactory.getLogger(InputStream.class);
    +    public final String fromComponent;
    +    public final String toComponent;
    +    public final String id;
    +    public final NormalDistStats execTime;
    +    public final NormalDistStats processTime;
    +    public final GroupingType groupingType;
    +    //Cached GlobalStreamId
    +    private GlobalStreamId gsid = null;
    +
    +    /**
    +     * Create an output stream from a config.
    +     * @param conf the config to read from.
    +     * @return the read OutputStream.
    +     */
    +    public static InputStream fromConf(Map<String, Object> conf) {
    +        String component = (String) conf.get("from");
    +        String toComp = (String) conf.get("to");
    +        NormalDistStats execTime = NormalDistStats.fromConf((Map<String, Object>) conf.get("execTime"));
    +        NormalDistStats processTime = NormalDistStats.fromConf((Map<String, Object>) conf.get("processTime"));
    +        Map<String, Object> grouping = (Map<String, Object>) conf.get("grouping");
    +        GroupingType groupingType = GroupingType.fromConf((String) grouping.get("type"));
    +        String streamId = (String) grouping.getOrDefault("streamId", "default");
    +        return new InputStream(component, toComp, streamId, execTime, processTime, groupingType);
    +    }
    +
    +    /**
    +     * Convert this to a conf.
    +     * @return the conf.
    +     */
    +    public Map<String, Object> toConf() {
    +        Map<String, Object> ret = new HashMap<>();
    +        ret.put("from", fromComponent);
    +        ret.put("to", toComponent);
    +        ret.put("execTime", execTime.toConf());
    +        ret.put("processTime", processTime.toConf());
    +
    +        Map<String, Object> grouping = new HashMap<>();
    +        grouping.put("streamId", id);
    +        grouping.put("type", groupingType.toConf());
    +        ret.put("grouping", grouping);
    +
    +        return ret;
    +    }
    +
    +    public static class Builder {
    +        private String fromComponent;
    +        private String toComponent;
    +        private String id;
    +        private NormalDistStats execTime;
    +        private NormalDistStats processTime;
    +        private GroupingType groupingType = GroupingType.SHUFFLE;
    +
    +        public String getFromComponent() {
    --- End diff --
    
    Ahh, sorry. I missed their use.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2289: STORM-2702: storm-loadgen

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2289
  
    @roshannaik The latency reported is a simulation of kafka or something like it.  The start time, is not when the message is emitted by the spout.  The start time is when the message would have been inserted into something like kafka to be processed.  So if the topology cannot keep up with the throughput the latency grows accordingly.  I should document it better in the README.
    
    I made the decision not to cut the numbers off in CSV/TSV because those files are really intended to be read by a machine.  They are not really human readable anyways.
    
    You can adjust the columns to what you want and ids is the one that spits out the topology ids that are running.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---