You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:43 UTC

[02/18] storm git commit: STORM-2702: storm-loadgen

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
new file mode 100644
index 0000000..1a45ccc
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.generated.GlobalStreamId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+/**
+ * Configuration for a simulated topology.
+ */
+public class TopologyLoadConf {
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyLoadConf.class);
+    static final Set<String> IMPORTANT_CONF_KEYS = Collections.unmodifiableSet(new HashSet(Arrays.asList(
+        Config.TOPOLOGY_WORKERS,
+        Config.TOPOLOGY_ACKER_EXECUTORS,
+        Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT,
+        Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
+        Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
+        Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING,
+        Config.TOPOLOGY_DEBUG,
+        Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
+        Config.TOPOLOGY_ISOLATED_MACHINES,
+        Config.TOPOLOGY_MAX_SPOUT_PENDING,
+        Config.TOPOLOGY_MAX_TASK_PARALLELISM,
+        Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+        Config.TOPOLOGY_PRIORITY,
+        Config.TOPOLOGY_SCHEDULER_STRATEGY,
+        Config.TOPOLOGY_SHELLBOLT_MAX_PENDING,
+        Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS,
+        Config.TOPOLOGY_SPOUT_WAIT_STRATEGY,
+        Config.TOPOLOGY_WORKER_CHILDOPTS,
+        Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
+        Config.TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE,
+        Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB
+    )));
+    private static AtomicInteger topoUniquifier = new AtomicInteger(0);
+
+    public final String name;
+    public final Map<String, Object> topoConf;
+    public final List<LoadCompConf> spouts;
+    public final List<LoadCompConf> bolts;
+    public final List<InputStream> streams;
+    private final AtomicInteger boltUniquifier = new AtomicInteger(0);
+    private final AtomicInteger spoutUniquifier = new AtomicInteger(0);
+    private final AtomicInteger streamUniquifier = new AtomicInteger(0);
+
+    /**
+     * Parse the TopologyLoadConf from a file in YAML format.
+     * @param file the file to read from
+     * @return the parsed conf
+     * @throws IOException if there is an issue reading the file.
+     */
+    public static TopologyLoadConf fromConf(File file) throws IOException {
+        Yaml yaml = new Yaml(new SafeConstructor());
+        Map<String, Object> yamlConf = (Map<String, Object>)yaml.load(new FileReader(file));
+        return TopologyLoadConf.fromConf(yamlConf);
+    }
+
+    /**
+     * Parse the TopologyLoadConf from a config map.
+     * @param conf the config with the TopologyLoadConf in it
+     * @return the parsed instance.
+     */
+    public static TopologyLoadConf fromConf(Map<String, Object> conf) {
+        Map<String, Object> topoConf = null;
+        if (conf.containsKey("config")) {
+            topoConf = new HashMap<>((Map<String, Object>)conf.get("config"));
+        }
+
+        List<LoadCompConf> spouts = new ArrayList<>();
+        for (Map<String, Object> spoutInfo: (List<Map<String, Object>>) conf.get("spouts")) {
+            spouts.add(LoadCompConf.fromConf(spoutInfo));
+        }
+
+        List<LoadCompConf> bolts = new ArrayList<>();
+        List<Map<String, Object>> boltInfos = (List<Map<String, Object>>) conf.get("bolts");
+        if (boltInfos != null) {
+            for (Map<String, Object> boltInfo : boltInfos) {
+                bolts.add(LoadCompConf.fromConf(boltInfo));
+            }
+        }
+
+        List<InputStream> streams = new ArrayList<>();
+        List<Map<String, Object>> streamInfos = (List<Map<String, Object>>) conf.get("streams");
+        if (streamInfos != null) {
+            for (Map<String, Object> streamInfo: streamInfos) {
+                streams.add(InputStream.fromConf(streamInfo));
+            }
+        }
+
+        return new TopologyLoadConf((String)conf.get("name"), topoConf, spouts, bolts, streams);
+    }
+
+    /**
+     * Write this out to a file in YAML format.
+     * @param file the file to write to.
+     * @throws IOException if there is an error writing to the file.
+     */
+    public void writeTo(File file) throws IOException {
+        Yaml yaml = new Yaml(new SafeConstructor());
+        try (FileWriter writer = new FileWriter(file)) {
+            yaml.dump(toConf(), writer);
+        }
+    }
+
+    /**
+     * Convert this into a YAML String.
+     * @return this as a YAML String.
+     */
+    public String toYamlString() {
+        Yaml yaml = new Yaml(new SafeConstructor());
+        StringWriter writer = new StringWriter();
+        yaml.dump(toConf(), writer);
+        return writer.toString();
+    }
+
+    /**
+     * Covert this into a Map config.
+     * @return this as a Map config.
+     */
+    public Map<String, Object> toConf() {
+        Map<String, Object> ret = new HashMap<>();
+        if (name != null) {
+            ret.put("name", name);
+        }
+        if (topoConf != null) {
+            ret.put("config", topoConf);
+        }
+        if (spouts != null && !spouts.isEmpty()) {
+            ret.put("spouts", spouts.stream().map(LoadCompConf::toConf)
+                .collect(Collectors.toList()));
+        }
+
+        if (bolts != null && !bolts.isEmpty()) {
+            ret.put("bolts", bolts.stream().map(LoadCompConf::toConf)
+                .collect(Collectors.toList()));
+        }
+
+        if (streams != null && !streams.isEmpty()) {
+            ret.put("streams", streams.stream().map(InputStream::toConf)
+                .collect(Collectors.toList()));
+        }
+        return ret;
+    }
+
+    /**
+     * Constructor.
+     * @param name the name of the topology.
+     * @param topoConf the config for the topology
+     * @param spouts the spouts for the topology
+     * @param bolts the bolts for the topology
+     * @param streams the streams for the topology
+     */
+    public TopologyLoadConf(String name, Map<String, Object> topoConf,
+                            List<LoadCompConf> spouts, List<LoadCompConf> bolts, List<InputStream> streams) {
+        this.name = name;
+        this.topoConf = topoConf;
+        this.spouts = spouts;
+        this.bolts = bolts;
+        this.streams = streams;
+    }
+
+    private static String getUniqueTopoName() {
+        return "topology_" + asCharString(topoUniquifier.getAndIncrement());
+    }
+
+    private String getUniqueBoltName() {
+        return "bolt_" + asCharString(boltUniquifier.getAndIncrement());
+    }
+
+    private String getUniqueSpoutName() {
+        return "spout_" + asCharString(spoutUniquifier.getAndIncrement());
+    }
+
+    private String getUniqueStreamName() {
+        return "stream_" + asCharString(spoutUniquifier.getAndIncrement());
+    }
+
+    private static String asCharString(int value) {
+        int div = value / 26;
+        int remainder = value % 26;
+        String ret = "";
+        if (div > 0) {
+            ret = asCharString(div);
+        }
+        ret += (char)((int)'a' + remainder);
+        return ret;
+    }
+
+    public TopologyLoadConf withName(String baseName) {
+        return new TopologyLoadConf(baseName, topoConf, spouts, bolts, streams);
+    }
+
+    /**
+     * Scale all of the components in the topology by a percentage (but keep the throughput the same).
+     * @param v the amount to scale them by.  1.0 is nothing, 0.5 cuts them in half, 2.0 doubles them.
+     * @return a copy of this with the needed adjustments made.
+     */
+    public TopologyLoadConf scaleParallel(double v) {
+        List<LoadCompConf> scaledSpouts = spouts.stream().map((c) -> c.scaleParallel(v)).collect(Collectors.toList());
+        List<LoadCompConf> scaledBolts = bolts.stream().map((c) -> c.scaleParallel(v)).collect(Collectors.toList());
+        return new TopologyLoadConf(name, topoConf, scaledSpouts, scaledBolts, streams);
+    }
+
+    /**
+     * Scale the throughput of the entire topology by a percentage.
+     * @param v the amount to scale it by 1.0 is nothing 0.5 cuts it in half and 2.0 doubles it.
+     * @return a copy of this with the needed adjustments made.
+     */
+    public TopologyLoadConf scaleThroughput(double v) {
+        List<LoadCompConf> scaledSpouts = spouts.stream()
+            .map((c) -> c.scaleThroughput(v)).collect(Collectors.toList());
+        List<LoadCompConf> scaledBolts = bolts.stream()
+            .map((c) -> c.scaleThroughput(v)).collect(Collectors.toList());
+        return new TopologyLoadConf(name, topoConf, scaledSpouts, scaledBolts, streams);
+    }
+
+    /**
+     * Create a new version of this topology with identifiable information removed.
+     * @return the anonymized version of the TopologyLoadConf.
+     */
+    public TopologyLoadConf anonymize() {
+        Map<String, String> remappedComponents = new HashMap<>();
+        Map<GlobalStreamId, GlobalStreamId> remappedStreams = new HashMap<>();
+        for (LoadCompConf comp: bolts) {
+            String newId = getUniqueBoltName();
+            remappedComponents.put(comp.id, newId);
+            if (comp.streams != null) {
+                for (OutputStream out : comp.streams) {
+                    GlobalStreamId orig = new GlobalStreamId(comp.id, out.id);
+                    GlobalStreamId remapped = new GlobalStreamId(newId, getUniqueStreamName());
+                    remappedStreams.put(orig, remapped);
+                }
+            }
+        }
+
+        for (LoadCompConf comp: spouts) {
+            remappedComponents.put(comp.id, getUniqueSpoutName());
+            String newId = getUniqueSpoutName();
+            remappedComponents.put(comp.id, newId);
+            if (comp.streams != null) {
+                for (OutputStream out : comp.streams) {
+                    GlobalStreamId orig = new GlobalStreamId(comp.id, out.id);
+                    GlobalStreamId remapped = new GlobalStreamId(newId, getUniqueStreamName());
+                    remappedStreams.put(orig, remapped);
+                }
+            }
+        }
+
+        for (InputStream in : streams) {
+            if (!remappedComponents.containsKey(in.toComponent)) {
+                remappedComponents.put(in.toComponent, getUniqueSpoutName());
+            }
+            GlobalStreamId orig = in.gsid();
+            if (!remappedStreams.containsKey(orig)) {
+                //Even if the topology is not valid we still need to remap it all
+                String remappedComp = remappedComponents.computeIfAbsent(in.fromComponent, (key) -> {
+                    LOG.warn("stream's {} from is not defined {}", in.id, in.fromComponent);
+                    return getUniqueBoltName();
+                });
+                remappedStreams.put(orig, new GlobalStreamId(remappedComp, getUniqueStreamName()));
+            }
+        }
+
+        //Now we need to map them all back again
+        List<LoadCompConf> remappedSpouts = spouts.stream()
+            .map((orig) -> orig.remap(remappedComponents, remappedStreams))
+            .collect(Collectors.toList());
+        List<LoadCompConf> remappedBolts = bolts.stream()
+            .map((orig) -> orig.remap(remappedComponents, remappedStreams))
+            .collect(Collectors.toList());
+        List<InputStream> remappedInputStreams = streams.stream()
+            .map((orig) -> orig.remap(remappedComponents, remappedStreams))
+            .collect(Collectors.toList());
+        return new TopologyLoadConf(getUniqueTopoName(), anonymizeTopoConf(topoConf), remappedSpouts, remappedBolts, remappedInputStreams);
+    }
+
+    private static Map<String,Object> anonymizeTopoConf(Map<String, Object> topoConf) {
+        //Only keep important conf keys
+        Map<String, Object> ret = new HashMap<>();
+        for (Map.Entry<String, Object> entry: topoConf.entrySet()) {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+            if (IMPORTANT_CONF_KEYS.contains(key)) {
+                if (Config.TOPOLOGY_WORKER_CHILDOPTS.equals(key)
+                    || Config.TOPOLOGY_WORKER_GC_CHILDOPTS.equals(key)) {
+                    value = cleanupChildOpts(value);
+                }
+                ret.put(key, value);
+            }
+        }
+        return ret;
+    }
+
+    private static Object cleanupChildOpts(Object value) {
+        if (value instanceof String) {
+            String sv = (String) value;
+            StringBuffer ret = new StringBuffer();
+            for (String part: sv.split("\\s+")) {
+                if (part.startsWith("-X")) {
+                    ret.append(part).append(" ");
+                }
+            }
+            return ret.toString();
+        } else {
+            List<String> ret = new ArrayList<>();
+            for (String subValue: (Collection<String>)value) {
+                ret.add((String)cleanupChildOpts(subValue));
+            }
+            return ret.stream().filter((item) -> item != null && !item.isEmpty()).collect(Collectors.toList());
+        }
+    }
+
+    /**
+     * Try to see if this looks like a trident topology.
+     * NOTE: this will not work for anonymized configs
+     * @return true if it does else false.
+     */
+    public boolean looksLikeTrident() {
+        for (LoadCompConf spout: spouts) {
+            if (spout.id.startsWith("$mastercoord")) {
+                return true;
+            }
+        }
+
+        for (LoadCompConf bolt: bolts) {
+            if (bolt.id.startsWith("$spoutcoord")) {
+                return true;
+            }
+        }
+
+        for (InputStream in: streams) {
+            if (in.id.equals("$batch")) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Get the messages emitted per second in aggregate across all streams in the topology.
+     * @return messages per second.
+     */
+    public double getAllEmittedAggregate() {
+        double ret = getSpoutEmittedAggregate();
+        for (LoadCompConf bolt: bolts) {
+            ret += bolt.getAllEmittedAggregate() * bolt.parallelism;
+        }
+        return ret;
+    }
+
+    /**
+     * Get the messages emitted per second in aggregate for all of the spouts in the topology.
+     * @return messages per second.
+     */
+    public double getSpoutEmittedAggregate() {
+        double ret = 0;
+        for (LoadCompConf spout: spouts) {
+            ret += spout.getAllEmittedAggregate() * spout.parallelism;
+        }
+        return ret;
+    }
+
+    /**
+     * Try and guess at the actual number of messages emitted per second by a trident topology, not the number of batches.
+     * This does not work on an anonymized conf.
+     * @return messages per second or 0 if this does not look like a trident topology.
+     */
+    public double getTridentEstimatedEmittedAggregate() {
+        //In this case we are ignoring the coord stuff, and only looking at
+        double ret = 0;
+        if (looksLikeTrident()) {
+            List<LoadCompConf> all = new ArrayList<>(bolts);
+            all.addAll(spouts);
+            for (LoadCompConf comp : all) {
+                if (comp.id.startsWith("spout-")) {
+                    if (comp.streams != null) {
+                        for (OutputStream out: comp.streams) {
+                            if (!out.id.startsWith("$")
+                                && !out.id.startsWith("__")
+                                && out.rate != null) {
+                                ret += out.rate.mean * comp.parallelism;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return ret;
+    }
+
+    public TopologyLoadConf replaceShuffleWithLocalOrShuffle() {
+        List<InputStream> modified = streams.stream().map((in) -> in.replaceShuffleWithLocalOrShuffle()).collect(Collectors.toList());
+        return new TopologyLoadConf(name, topoConf, spouts, bolts, modified);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/LoadCompConfTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/LoadCompConfTest.java b/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/LoadCompConfTest.java
new file mode 100644
index 0000000..27c7389
--- /dev/null
+++ b/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/LoadCompConfTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class LoadCompConfTest {
+    @Test
+    public void scaleParallel() throws Exception {
+        LoadCompConf orig = new LoadCompConf.Builder()
+            .withId("SOME_SPOUT")
+            .withParallelism(1)
+            .withStream(new OutputStream("default", new NormalDistStats(500.0, 100.0, 300.0, 600.0), false))
+            .build();
+        assertEquals(500.0, orig.getAllEmittedAggregate(), 0.001);
+        LoadCompConf scaled = orig.scaleParallel(2);
+        //Parallelism is double
+        assertEquals(2, scaled.parallelism);
+        assertEquals("SOME_SPOUT", scaled.id);
+        //But throughput is the same
+        assertEquals(500.0, scaled.getAllEmittedAggregate(), 0.001);
+    }
+
+    @Test
+    public void scaleThroughput() throws Exception {
+        LoadCompConf orig = new LoadCompConf.Builder()
+            .withId("SOME_SPOUT")
+            .withParallelism(1)
+            .withStream(new OutputStream("default", new NormalDistStats(500.0, 100.0, 300.0, 600.0), false))
+            .build();
+        assertEquals(500.0, orig.getAllEmittedAggregate(), 0.001);
+        LoadCompConf scaled = orig.scaleThroughput(2.0);
+        //Parallelism is same
+        assertEquals(1, scaled.parallelism);
+        assertEquals("SOME_SPOUT", scaled.id);
+        //But throughput is the same
+        assertEquals(1000.0, scaled.getAllEmittedAggregate(), 0.001);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/LoadMetricsServerTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/LoadMetricsServerTest.java b/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/LoadMetricsServerTest.java
new file mode 100644
index 0000000..ca58d7e
--- /dev/null
+++ b/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/LoadMetricsServerTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.apache.storm.loadgen.LoadMetricsServer.convert;
+
+public class LoadMetricsServerTest {
+    @Test
+    public void convertTest() throws Exception {
+        for (TimeUnit from : TimeUnit.values()) {
+            for (TimeUnit to : TimeUnit.values()) {
+                assertEquals(from + " to " + to + " and back", 1.0, convert(convert(1.0, from, to), to, from), 0.0001);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/NormalDistStatsTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/NormalDistStatsTest.java b/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/NormalDistStatsTest.java
new file mode 100644
index 0000000..10cc18b
--- /dev/null
+++ b/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/NormalDistStatsTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class NormalDistStatsTest {
+    public static void assertNDSEquals(NormalDistStats a, NormalDistStats b) {
+        assertEquals("mean", a.mean, b.mean, 0.0001);
+        assertEquals("min", a.min, b.min, 0.0001);
+        assertEquals("max", a.max, b.max, 0.0001);
+        assertEquals("stddev", a.stddev, b.stddev, 0.0001);
+    }
+
+    @Test
+    public void scaleBy() throws Exception {
+        NormalDistStats orig = new NormalDistStats(1.0, 0.5, 0.0, 2.0);
+        assertNDSEquals(orig, orig.scaleBy(1.0));
+        NormalDistStats expectedDouble = new NormalDistStats(2.0, 0.5, 1.0, 3.0);
+        assertNDSEquals(expectedDouble, orig.scaleBy(2.0));
+        NormalDistStats expectedHalf = new NormalDistStats(0.5, 0.5, 0.0, 1.5);
+        assertNDSEquals(expectedHalf, orig.scaleBy(0.5));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/OutputStreamTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/OutputStreamTest.java b/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/OutputStreamTest.java
new file mode 100644
index 0000000..9200104
--- /dev/null
+++ b/examples/storm-loadgen/src/test/java/org/apache/storm/loadgen/OutputStreamTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class OutputStreamTest {
+    @Test
+    public void scaleThroughput() throws Exception {
+        OutputStream orig = new OutputStream("ID", new NormalDistStats(100.0, 1.0, 99.0, 101.0), false);
+        OutputStream scaled = orig.scaleThroughput(2.0);
+        assertEquals(orig.id, scaled.id);
+        assertEquals(orig.areKeysSkewed, scaled.areKeysSkewed);
+        assertEquals(scaled.rate.mean, 200.0, 0.0001);
+        assertEquals(scaled.rate.stddev, 1.0, 0.0001);
+        assertEquals(scaled.rate.min, 199.0, 0.0001);
+        assertEquals(scaled.rate.max, 201.0, 0.0001);
+    }
+}
\ No newline at end of file