You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 14:59:47 UTC

[17/52] [abbrv] metron git commit: METRON-1483: Create a tool to monitor performance of the topologies closes apache/incubator-metron#958

METRON-1483: Create a tool to monitor performance of the topologies closes apache/incubator-metron#958


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/46ad9d93
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/46ad9d93
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/46ad9d93

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 46ad9d93b4385da0f8668f2ba84212d54d00ba4b
Parents: e3eeec3
Author: cstella <ce...@gmail.com>
Authored: Tue Mar 20 09:36:32 2018 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue Mar 20 09:36:32 2018 -0400

----------------------------------------------------------------------
 metron-contrib/metron-performance/README.md     | 205 ++++++++
 .../performance_measurement.png                 | Bin 0 -> 5790 bytes
 metron-contrib/metron-performance/pom.xml       | 134 +++++
 .../src/main/assembly/assembly.xml              |  42 ++
 .../metron/performance/load/LoadGenerator.java  | 175 +++++++
 .../metron/performance/load/LoadOptions.java    | 499 +++++++++++++++++++
 .../performance/load/MessageGenerator.java      |  48 ++
 .../metron/performance/load/SendToKafka.java    | 107 ++++
 .../load/monitor/AbstractMonitor.java           |  49 ++
 .../load/monitor/EPSGeneratedMonitor.java       |  53 ++
 .../monitor/EPSThroughputWrittenMonitor.java    |  77 +++
 .../performance/load/monitor/MonitorNaming.java |  23 +
 .../performance/load/monitor/MonitorTask.java   |  44 ++
 .../performance/load/monitor/Results.java       |  51 ++
 .../load/monitor/writers/CSVWriter.java         |  67 +++
 .../load/monitor/writers/ConsoleWriter.java     |  65 +++
 .../load/monitor/writers/Writable.java          |  40 ++
 .../load/monitor/writers/Writer.java            |  86 ++++
 .../performance/sampler/BiasedSampler.java      | 113 +++++
 .../metron/performance/sampler/Sampler.java     |  24 +
 .../performance/sampler/UnbiasedSampler.java    |  28 ++
 .../metron/performance/util/KafkaUtil.java      |  56 +++
 .../src/main/scripts/load_tool.sh               |  36 ++
 .../performance/load/LoadOptionsTest.java       |  93 ++++
 .../performance/load/SendToKafkaTest.java       |  49 ++
 .../metron/performance/sampler/SamplerTest.java | 145 ++++++
 metron-contrib/pom.xml                          |  15 +
 .../common-services/METRON/CURRENT/metainfo.xml |   4 +
 .../packaging/docker/deb-docker/pom.xml         |   6 +
 .../docker/rpm-docker/SPECS/metron.spec         |  21 +
 .../packaging/docker/rpm-docker/pom.xml         |   6 +
 31 files changed, 2361 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/README.md
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/README.md b/metron-contrib/metron-performance/README.md
new file mode 100644
index 0000000..8981349
--- /dev/null
+++ b/metron-contrib/metron-performance/README.md
@@ -0,0 +1,205 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# Performance Utilities
+
+This project creates some useful performance monitoring and measurement
+utilities.
+
+## `load-tool.sh`
+
+The Load tool is intended to do the following:
+* Generate a load at a specific events per second into kafka
+  * The messages are taken from a template file, where there is a message template per line
+  * The load can be biased (e.g. 80% of the load can be comprised of 20% of the templates)
+* Monitor the kafka offsets for a topic to determine the events per second written
+  * This could be the topic that you are generating load on
+  * This could be another topic that represents the output of some topology (e.g. generate load on `enrichments` and monitor `indexing` to determine the throughput of the enrichment topology).
+
+```
+usage: Generator
+ -bs,--sample_bias <BIAS_FILE>         The discrete distribution to bias
+                                       the sampling. This is a CSV of 2
+                                       columns.  The first column is the %
+                                       of the templates and the 2nd column
+                                       is the probability (0-100) that
+                                       it's chosen.  For instance:
+                                       20,80
+                                       80,20
+                                       implies that 20% of the templates
+                                       will comprise 80% of the output and
+                                       the remaining 80% of the templates
+                                       will comprise 20% of the output.
+ -c,--csv <CSV_FILE>                   A CSV file to emit monitoring data
+                                       to.  The format is a CSV with the
+                                       following schema: timestamp, (name,
+                                       eps, historical_mean,
+                                       historical_stddev)+
+ -cg,--consumer_group <GROUP_ID>       Consumer Group.  The default is
+                                       load.group
+ -e,--eps <EPS>                        The target events per second
+ -h,--help                             Generate Help screen
+ -k,--kafka_config <CONFIG_FILE>       The kafka config.  This is a file
+                                       containing a JSON map with the
+                                       kafka config.
+ -l,--lookback <LOOKBACK>              When summarizing, how many
+                                       monitoring periods should we
+                                       summarize over?  If 0, then no
+                                       summary.  Default: 5
+ -md,--monitor_delta_ms <TIME_IN_MS>   The time (in ms) between monitoring
+                                       output. Default is 10000
+ -mt,--monitor_topic <TOPIC>           The kafka topic to monitor.
+ -ot,--output_topic <TOPIC>            The kafka topic to write to
+ -p,--threads <NUM_THREADS>            The number of threads to use when
+                                       extracting data.  The default is
+                                       the number of cores of your
+                                       machine.
+ -sd,--send_delta_ms <TIME_IN_MS>      The time (in ms) between sending a
+                                       batch of messages. Default is 100
+ -t,--template <TEMPLATE_FILE>         The template file to use for
+                                       generation.  This should be a file
+                                       with a template per line with
+                                       $METRON_TS and $METRON_GUID in the
+                                       spots for timestamp and guid, if
+                                       you so desire them.
+ -tl,--time_limit_ms <MS>              The total amount of time to run
+                                       this in milliseconds.  By default,
+                                       it never stops.
+ -z,--zk_quorum <QUORUM>               zookeeper quorum
+```
+
+## Templates
+Messages are drawn from a template file.  A template file has a message template per line.  
+For instance, let's say we want to generate JSON maps with fields: `source.type`, `ip_src_addr` 
+and `ip_dst_addr`.  We can generate a template file with a template like the following per line:
+```
+{ "source.type" : "asa", "ip_src_addr" : "127.0.0.1", "ip_dst_addr" : "191.168.1.1" }
+```
+
+When messages are generated, there are some special replacements that can be used: `$METRON_TS` and `$METRON_GUID`.
+We can adjust our previous template to use these like so:
+```
+{ "source.type" : "asa", "ip_src_addr" : "127.0.0.1", "ip_dst_addr" : "191.168.1.1", "timestamp" : $METRON_TS, "guid" : "$METRON_GUID" }
+```
+One note about GUIDs generated.  We do not generate global UUIDs, they are unique only within the context of a given generator run.  
+
+## Biased Sampling
+
+This load tool can be configured to use biased sampling.  This is useful if, for instance, you are trying to model data which is not distributed
+uniformly, like many types of network data.  Generating synthetic data with similar distribution to your regular data will enable the caches
+to be exercised in the same way, for instance, and yield a more realistic scenario.
+
+You specify the biases in a csv file with 2 columns:
+* The first column represents the % of the templates
+* The second column represents the % of the generated output. 
+
+A simple example would be to generate samples based on Pareto's principle:
+```
+20,80
+80,20
+``` 
+This would yield biases that mean the first 20% of the templates in the template file would comprise 80% of the output.
+
+A more complex example might be:
+```
+20,80
+20,5
+50,1
+10,14
+``` 
+This would would imply:
+* The first 20% of the templates would comprise 80% of the output
+* The next 20% of the templates would comprise 5% of the output
+* The next 50% of the templates would comprise 1% of the output
+* The next 10% of the templates would comprise 14% of the output.
+
+## CSV Output
+
+For those who would prefer a different visualization or wish to incorporate the output of this tool into an automated test,
+you can specify a file to emit data in CSV format to via the `-c` or `--csv` option.
+
+The CSV columns are as follows:
+* timestamp in epoch millis
+
+If you are generating synthetic data, then:
+* "generated"
+* The events per second generated
+* The mean of the events per second generated for the the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`)
+* The standard deviation of the events per second generated for the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`)
+
+If you are monitoring a topic, then:
+* "throughput measured"
+* The events per second measured
+* The mean of the events per second measured for the the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`)
+* The standard deviation of the events per second measured for the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`)
+
+Obviously, if you are doing both generating and monitoring the throughput of a topic, then all of the columns are added.
+
+An example of CSV output is:
+```
+1520506955047,generated,,,,throughput measured,,,
+1520506964896,generated,1045,1045,0,throughput measured,,,
+1520506974896,generated,1000,1022,31,throughput measured,1002,1002,0
+1520506984904,generated,999,1014,26,throughput measured,999,1000,2
+1520506994896,generated,1000,1011,22,throughput measured,1000,1000,1
+1520507004896,generated,1000,1008,20,throughput measured,1000,1000,1
+```
+
+## Use-cases for the Load Tool
+
+### Measure Throughput of a Topology
+
+One can use the load tool to monitor performance of a kafka-to-kafka topology.
+For instance, we could monitor the throughput of the enrichment topology by monitoring the `enrichments` kafka topic:
+```
+$METRON_HOME/bin/load_tool.sh -mt enrichments -z $ZOOKEEPER
+```
+
+### Generate Synthetic Load and Measure Performance
+
+One can use the load tool to generate synthetic load and monitor performance of a kafka-to-kafka topology.  For instance, we could
+monitor the performance of the enrichment topology.  It is advised to start the enrichment topology against a new topic and write 
+to a new topic so as to not pollute your downstream indices.  So, for instance we could create a kafka topic called 
+`enrichments_load` by generating load on it.  We could also create a new  kafka topic called `indexing_load` and configure the enrichment
+topology to output to it.  We would then generate load on `enrichments_load` and monitor `indexing_load`.
+```
+#Threadpool of size 5, you want somewhere between 5 and 10 depending on the throughput numbers you're trying to drive
+#Messages drawn from ~/dummy.templates, which is a message template per line
+#Generate at a rate of 9000 messages per second
+#Emit the data to a CSV file ~/measurements.csv
+$METRON_HOME/bin/load_tool.sh -p 5 -ot enrichments_load -mt indexing_load -t ~/dummy.templates -eps 9000 -z $ZOOKEEPER -c ~/measurements.csv
+```
+
+Now, with the help of a bash function and gnuplot we can generate a plot
+of the historical throughput measurements for `indexing_load`:
+```
+# Ensure that you have installed gnuplot and the liberation font package
+# via yum install -y gnuplot liberation-sans-fonts
+# We will define a plot function that will generate a png plot.  It takes
+# one arg, the output file.  It expects to have a 2 column CSV streamed
+#  with the first dimension being the timestamp and the second dimension
+# being what you want plotted.
+plot() {
+  awk -F, '{printf "%d %d\n", $1/1000, $2} END { print "e" }' | gnuplot -e "reset;clear;set style fill solid 1.0 border -1; set nokey;set title 'Throughput Measured'; set xlabel 'Time'; set boxwidth 0.5; set xtics rotate; set ylabel 'events/sec';set xdata time; set timefmt '%s';set format x '%H:%M:%S';set term png enhanced font '/usr/share/fonts/liberation/LiberationSans-Regular.ttf' 12 size 900,400; set output '$1';plot '< cat -' using 1:2 with line lt -1 lw 2;"
+}
+
+# We want to transform the CSV file into a space separated file with the
+# timestamp followed by the throughput measurements.
+cat ~/measurements.csv | awk -F, '{printf "%d,%d\n", $1, $8 }' | plot performance_measurement.png
+```
+This generates a plot like so to `performance_measurement.png`:
+![Performance Measurement](performance_measurement.png)

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/performance_measurement.png
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/performance_measurement.png b/metron-contrib/metron-performance/performance_measurement.png
new file mode 100644
index 0000000..c4dcfb1
Binary files /dev/null and b/metron-contrib/metron-performance/performance_measurement.png differ

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/pom.xml
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/pom.xml b/metron-contrib/metron-performance/pom.xml
new file mode 100644
index 0000000..4242110
--- /dev/null
+++ b/metron-contrib/metron-performance/pom.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <name>metron-performance</name>
+  <groupId>org.apache.metron</groupId>
+  <artifactId>metron-performance</artifactId>
+  <packaging>jar</packaging>
+  <parent>
+    <groupId>org.apache.metron</groupId>
+    <artifactId>metron-contrib</artifactId>
+    <version>0.4.3</version>
+  </parent>
+  <description>Performance Testing Utilities</description>
+  <url>https://metron.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${global_guava_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>metron-common</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+      <version>${global_kafka_version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.12</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>${global_shade_version}</version>
+        <configuration>
+          <createDependencyReducedPom>true</createDependencyReducedPom>
+          <artifactSet>
+            <excludes>
+              <exclude>*slf4j*</exclude>
+            </excludes>
+          </artifactSet>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <relocations>
+                <relocation>
+                  <pattern>com.google.common</pattern>
+                  <shadedPattern>org.apache.metron.perf.guava</shadedPattern>
+                </relocation>
+              </relocations>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                  <resources>
+                    <resource>.yaml</resource>
+                    <resource>LICENSE.txt</resource>
+                    <resource>ASL2.0</resource>
+                    <resource>NOTICE.txt</resource>
+                  </resources>
+                </transformer>
+                <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE -->
+                <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+                                <addHeader>false</addHeader>
+                                <projectName>${project.name}</projectName>
+                </transformer-->
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                  <mainClass></mainClass>
+                </transformer>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptor>src/main/assembly/assembly.xml</descriptor>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id> <!-- this is used for inheritance merges -->
+            <phase>package</phase> <!-- bind to the packaging phase -->
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/assembly/assembly.xml b/metron-contrib/metron-performance/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..3595284
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/assembly/assembly.xml
@@ -0,0 +1,42 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<assembly>
+  <id>archive</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${project.basedir}/src/main/scripts</directory>
+      <outputDirectory>bin</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0755</fileMode>
+      <lineEnding>unix</lineEnding>
+      <filtered>true</filtered>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/target</directory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+      <outputDirectory>lib</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+    </fileSet>
+  </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
new file mode 100644
index 0000000..33f777b
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+  public static String CONSUMER_GROUP = "metron.load.group";
+  public static long SEND_PERIOD_MS = 100;
+  public static long MONITOR_PERIOD_MS = 1000*10;
+  private static ExecutorService pool;
+  private static ThreadLocal<KafkaProducer<String, String>> kafkaProducer;
+  public static AtomicLong numSent = new AtomicLong(0);
+
+  public static void main( String[] args ) throws Exception {
+    CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+    EnumMap<LoadOptions, Optional<Object>> evaluatedArgs = LoadOptions.createConfig(cli);
+    Map<String, Object> kafkaConfig = new HashMap<>();
+    kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    if(LoadOptions.ZK.has(cli)) {
+      String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get();
+      kafkaConfig.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+                     , Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum))
+                     );
+    }
+    String groupId = evaluatedArgs.get(LoadOptions.CONSUMER_GROUP).get().toString();
+    System.out.println("Consumer Group: " + groupId);
+    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    if(LoadOptions.KAFKA_CONFIG.has(cli)) {
+      kafkaConfig.putAll((Map<String, Object>) evaluatedArgs.get(LoadOptions.KAFKA_CONFIG).get());
+    }
+    kafkaProducer = ThreadLocal.withInitial(() -> new KafkaProducer<>(kafkaConfig));
+    int numThreads = (int)evaluatedArgs.get(LoadOptions.NUM_THREADS).get();
+    System.out.println("Thread pool size: " + numThreads);
+    pool = Executors.newFixedThreadPool(numThreads);
+    Optional<Object> eps = evaluatedArgs.get(LoadOptions.EPS);
+
+    Optional<Object> outputTopic = evaluatedArgs.get(LoadOptions.OUTPUT_TOPIC);
+    Optional<Object> monitorTopic = evaluatedArgs.get(LoadOptions.MONITOR_TOPIC);
+    long sendDelta = (long) evaluatedArgs.get(LoadOptions.SEND_DELTA).get();
+    long monitorDelta = (long) evaluatedArgs.get(LoadOptions.MONITOR_DELTA).get();
+    if((eps.isPresent() && outputTopic.isPresent()) || monitorTopic.isPresent()) {
+      Timer timer = new Timer(false);
+      long startTimeMs = System.currentTimeMillis();
+      if(outputTopic.isPresent() && eps.isPresent()) {
+        List<String> templates = (List<String>)evaluatedArgs.get(LoadOptions.TEMPLATE).get();
+        if(templates.isEmpty()) {
+          System.out.println("Empty templates, so nothing to do.");
+          return;
+        }
+        Optional<Object> biases = evaluatedArgs.get(LoadOptions.BIASED_SAMPLE);
+        Sampler sampler = new UnbiasedSampler();
+        if(biases.isPresent()){
+          sampler = new BiasedSampler((List<Map.Entry<Integer, Integer>>) biases.get(), templates.size());
+        }
+        MessageGenerator generator = new MessageGenerator(templates, sampler);
+        Long targetLoad = (Long)eps.get();
+        int periodsPerSecond = (int)(1000/sendDelta);
+        long messagesPerPeriod = targetLoad/periodsPerSecond;
+        String outputTopicStr = (String)outputTopic.get();
+        System.out.println("Generating data to " + outputTopicStr + " at " + targetLoad + " events per second");
+        System.out.println("Sending " + messagesPerPeriod + " messages to " + outputTopicStr + " every " + sendDelta + "ms");
+        timer.scheduleAtFixedRate(new SendToKafka( outputTopicStr
+                                                 , messagesPerPeriod
+                                                 , numThreads
+                                                 , generator
+                                                 , pool
+                                                 , numSent
+                                                 , kafkaProducer
+                                                 )
+                                 , 0, sendDelta);
+      }
+      List<AbstractMonitor> monitors = new ArrayList<>();
+      if(outputTopic.isPresent() && monitorTopic.isPresent()) {
+        System.out.println("Monitoring " + monitorTopic.get() + " every " + monitorDelta + " ms");
+        monitors.add(new EPSGeneratedMonitor(outputTopic, numSent));
+        monitors.add(new EPSThroughputWrittenMonitor(monitorTopic, kafkaConfig));
+      }
+      else if(outputTopic.isPresent() && !monitorTopic.isPresent()) {
+        System.out.println("Monitoring " + outputTopic.get() + " every " + monitorDelta + " ms");
+        monitors.add(new EPSGeneratedMonitor(outputTopic, numSent));
+        monitors.add(new EPSThroughputWrittenMonitor(outputTopic, kafkaConfig));
+      }
+      else if(!outputTopic.isPresent() && monitorTopic.isPresent()) {
+        System.out.println("Monitoring " + monitorTopic.get() + " every " + monitorDelta + " ms");
+        monitors.add(new EPSThroughputWrittenMonitor(monitorTopic, kafkaConfig));
+      }
+      else if(!outputTopic.isPresent() && !monitorTopic.isPresent()) {
+        System.out.println("You have not specified an output topic or a monitoring topic, so I have nothing to do here.");
+      }
+      int lookback = (int) evaluatedArgs.get(LoadOptions.SUMMARY_LOOKBACK).get();
+      if(lookback > 0) {
+        System.out.println("Summarizing over the last " + lookback + " monitoring periods (" + lookback*monitorDelta + "ms)");
+      }
+      else {
+        System.out.println("Turning off summarization.");
+      }
+      final CSVWriter csvWriter = new CSVWriter((File) evaluatedArgs.get(LoadOptions.CSV).orElse(null));
+      Writer writer = new Writer(monitors, lookback, new ArrayList<Consumer<Writable>>() {{
+        add(new ConsoleWriter());
+        add(csvWriter);
+      }});
+      timer.scheduleAtFixedRate(new MonitorTask(writer), 0, monitorDelta);
+      Optional<Object> timeLimit = evaluatedArgs.get(LoadOptions.TIME_LIMIT);
+      if(timeLimit.isPresent()) {
+        System.out.println("Ending in " + timeLimit.get() + " ms.");
+        timer.schedule(new TimerTask() {
+                         @Override
+                         public void run() {
+                           timer.cancel();
+                           long durationS = (System.currentTimeMillis() - startTimeMs)/1000;
+                           System.out.println("\nGenerated " + numSent.get() + " in " + durationS + " seconds." );
+                           csvWriter.close();
+                           System.exit(0);
+                         }
+                       }
+
+                , (Long) timeLimit.get());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
new file mode 100644
index 0000000..b4d217d
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.cli.OptionHandler;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum LoadOptions implements CLIOptions<LoadOptions> {
+  HELP(new OptionHandler<LoadOptions>() {
+
+    @Override
+    public String getShortCode() {
+      return "h";
+    }
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      return new Option(s, "help", false, "Generate Help screen");
+    }
+  }),
+  ZK(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "zk_quorum", true, "zookeeper quorum");
+      o.setArgName("QUORUM");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(option.has(cli)) {
+        return Optional.ofNullable(option.get(cli));
+      }
+      else {
+        return Optional.empty();
+      }
+    }
+
+    @Override
+    public String getShortCode() {
+      return "z";
+    }
+  }),
+  CONSUMER_GROUP(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "consumer_group", true, "Consumer Group.  The default is " + LoadGenerator.CONSUMER_GROUP);
+      o.setArgName("GROUP_ID");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(option.has(cli)) {
+        return Optional.ofNullable(option.get(cli));
+      }
+      else {
+        return Optional.of(LoadGenerator.CONSUMER_GROUP);
+      }
+    }
+
+    @Override
+    public String getShortCode() {
+      return "cg";
+    }
+  }),
+  BIASED_SAMPLE(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "sample_bias", true, "The discrete distribution to bias the sampling. " +
+              "This is a CSV of 2 columns.  The first column is the % of the templates " +
+              "and the 2nd column is the probability (0-100) that it's chosen.  For instance:\n" +
+              "  20,80\n" +
+              "  80,20\n" +
+              "implies that 20% of the templates will comprise 80% of the output and the remaining 80% of the templates will comprise 20% of the output.");
+      o.setArgName("BIAS_FILE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(!option.has(cli)) {
+        return Optional.empty();
+      }
+      File discreteDistributionFile  = new File(option.get(cli));
+      if(discreteDistributionFile.exists()) {
+        try (BufferedReader br = new BufferedReader(new FileReader(discreteDistributionFile))){
+          return Optional.ofNullable(BiasedSampler.readDistribution(br));
+        } catch (IOException e) {
+          throw new IllegalStateException("Unable to read distribution file: " + option.get(cli), e);
+        }
+      }
+      else {
+        throw new IllegalStateException("Unable to read distribution file: " + option.get(cli) + " file doesn't exist.");
+      }
+    }
+
+    @Override
+    public String getShortCode() {
+      return "bs";
+    }
+  })
+  ,CSV(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "csv", true, "A CSV file to emit monitoring data to.  " +
+              "The format is a CSV with the following schema: timestamp, (name, eps, historical_mean, historical_stddev)+");
+      o.setArgName("CSV_FILE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(!option.has(cli)) {
+        return Optional.empty();
+      }
+      return Optional.of(new File(option.get(cli)));
+    }
+
+    @Override
+    public String getShortCode() {
+      return "c";
+    }
+  })
+  ,TEMPLATE(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "template", true, "The template file to use for generation.  This should be a file with a template per line with $METRON_TS and $METRON_GUID in the spots for timestamp and guid, if you so desire them.");
+      o.setArgName("TEMPLATE_FILE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(!option.has(cli)) {
+        return Optional.empty();
+      }
+      File templateFile = new File(option.get(cli));
+      if(templateFile.exists()) {
+        List<String> templates = new ArrayList<>();
+        try(BufferedReader br = new BufferedReader(new FileReader(templateFile))) {
+          for(String line = null;(line = br.readLine()) != null;) {
+            templates.add(line);
+          }
+          return Optional.of(templates);
+        } catch (IOException e) {
+          throw new IllegalStateException("Unable to read template file: " + option.get(cli), e);
+        }
+      }
+      else {
+        throw new IllegalStateException("Unable to read template file: " + option.get(cli) + " file doesn't exist.");
+      }
+    }
+
+    @Override
+    public String getShortCode() {
+      return "t";
+    }
+  })
+  ,SUMMARY_LOOKBACK(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "lookback", true, "When summarizing, how many monitoring periods should we summarize over?  If 0, then no summary.  Default: 5");
+      o.setArgName("LOOKBACK");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(option.has(cli)) {
+        return Optional.of(ConversionUtils.convert(option.get(cli), Integer.class));
+      }
+      else {
+        return Optional.of(5);
+      }
+    }
+
+    @Override
+    public String getShortCode() {
+      return "l";
+    }
+  })
+  ,EPS(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "eps", true, "The target events per second");
+      o.setArgName("EPS");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(option.has(cli)) {
+        return Optional.of(ConversionUtils.convert(option.get(cli), Long.class));
+      }
+      else {
+        return Optional.empty();
+      }
+    }
+
+    @Override
+    public String getShortCode() {
+      return "e";
+    }
+  })
+  ,KAFKA_CONFIG(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "kafka_config", true, "The kafka config.  This is a file containing a JSON map with the kafka config.");
+      o.setArgName("CONFIG_FILE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(!option.has(cli)) {
+        return Optional.empty();
+      }
+      File configFile = new File(option.get(cli));
+      if(configFile.exists()) {
+        try {
+          return Optional.ofNullable(JSONUtils.INSTANCE.load(configFile, JSONUtils.MAP_SUPPLIER));
+        } catch (FileNotFoundException e) {
+          throw new IllegalStateException("Unable to read file: " + option.get(cli), e);
+        } catch (IOException e) {
+          throw new IllegalStateException("Unable to read file: " + option.get(cli), e);
+        }
+      }
+      else {
+        throw new IllegalStateException("Unable to read file: " + option.get(cli) + " file doesn't exist.");
+      }
+    }
+
+    @Override
+    public String getShortCode() {
+      return "k";
+    }
+  }),
+  SEND_DELTA(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "send_delta_ms", true, "The time (in ms) between sending a batch of messages. Default is " + LoadGenerator.SEND_PERIOD_MS);
+      o.setArgName("TIME_IN_MS");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(option.has(cli)) {
+        Object res = option.get(cli);
+        return Optional.ofNullable(ConversionUtils.convert(res, Long.class));
+      }
+      return Optional.of(LoadGenerator.SEND_PERIOD_MS);
+
+    }
+
+    @Override
+    public String getShortCode() {
+      return "sd";
+    }
+  }),
+  MONITOR_DELTA(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "monitor_delta_ms", true, "The time (in ms) between monitoring output. Default is " + LoadGenerator.MONITOR_PERIOD_MS);
+      o.setArgName("TIME_IN_MS");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(option.has(cli)) {
+        Object res = option.get(cli);
+        return Optional.ofNullable(ConversionUtils.convert(res, Long.class));
+      }
+      return Optional.of(LoadGenerator.MONITOR_PERIOD_MS);
+
+    }
+
+    @Override
+    public String getShortCode() {
+      return "md";
+    }
+  })
+  ,TIME_LIMIT(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "time_limit_ms", true, "The total amount of time to run this in milliseconds.  By default, it never stops.");
+      o.setArgName("MS");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      if(option.has(cli)) {
+        Object res = option.get(cli);
+        Long timeMs = ConversionUtils.convert(res, Long.class);
+        return Optional.ofNullable(timeMs);
+      }
+      return Optional.empty();
+
+    }
+
+    @Override
+    public String getShortCode() {
+      return "tl";
+    }
+  })
+  ,NUM_THREADS(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "threads", true, "The number of threads to use when extracting data.  The default is the number of cores of your machine.");
+      o.setArgName("NUM_THREADS");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      int numThreads = Runtime.getRuntime().availableProcessors();
+      if(option.has(cli)) {
+        Object res = option.get(cli);
+        if(res instanceof String && res.toString().toUpperCase().endsWith("C")) {
+          numThreads *= ConversionUtils.convert(res.toString().trim().replace("C", ""), Integer.class);
+        }
+        else {
+          numThreads = ConversionUtils.convert(res, Integer.class);
+        }
+      }
+      return Optional.of(numThreads);
+
+    }
+
+    @Override
+    public String getShortCode() {
+      return "p";
+    }
+  })
+  ,OUTPUT_TOPIC(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "output_topic", true, "The kafka topic to write to");
+      o.setArgName("TOPIC");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      return Optional.ofNullable(option.get(cli));
+    }
+
+    @Override
+    public String getShortCode() {
+      return "ot";
+    }
+  }),
+  MONITOR_TOPIC(new OptionHandler<LoadOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "monitor_topic", true, "The kafka topic to monitor.");
+      o.setArgName("TOPIC");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+      return Optional.ofNullable(option.get(cli));
+    }
+
+    @Override
+    public String getShortCode() {
+      return "mt";
+    }
+  }),
+  ;
+  Option option;
+  String shortCode;
+  OptionHandler<LoadOptions> handler;
+  LoadOptions(OptionHandler<LoadOptions> optionHandler) {
+    this.shortCode = optionHandler.getShortCode();
+    this.handler = optionHandler;
+    this.option = optionHandler.apply(shortCode);
+  }
+
+  @Override
+  public Option getOption() {
+    return option;
+  }
+
+  public boolean has(CommandLine cli) {
+    return cli.hasOption(shortCode);
+  }
+
+  public String get(CommandLine cli) {
+    return cli.getOptionValue(shortCode);
+  }
+
+  @Override
+  public OptionHandler<LoadOptions> getHandler() {
+    return null;
+  }
+
+  public static CommandLine parse(CommandLineParser parser, String[] args) {
+    try {
+      CommandLine cli = parser.parse(getOptions(), args);
+      if(HELP.has(cli)) {
+        printHelp();
+        System.exit(0);
+      }
+      return cli;
+    } catch (ParseException e) {
+      System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+      e.printStackTrace(System.err);
+      printHelp();
+      System.exit(-1);
+      return null;
+    }
+  }
+
+  public static EnumMap<LoadOptions, Optional<Object> > createConfig(CommandLine cli) {
+    EnumMap<LoadOptions, Optional<Object> > ret = new EnumMap<>(LoadOptions.class);
+    for(LoadOptions option : values()) {
+      ret.put(option, option.handler.getValue(option, cli));
+    }
+    return ret;
+  }
+
+  public static void printHelp() {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp( "Generator", getOptions());
+  }
+
+  public static Options getOptions() {
+    Options ret = new Options();
+    for(LoadOptions o : LoadOptions.values()) {
+      ret.addOption(o.option);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java
new file mode 100644
index 0000000..572d438
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.metron.performance.sampler.Sampler;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class MessageGenerator implements Supplier<String> {
+  private static ThreadLocal<Random> rng = ThreadLocal.withInitial(() -> new Random());
+  private static AtomicLong guidOffset = new AtomicLong(0);
+  private static String guidPrefix = "00000000-0000-0000-0000-";
+  private List<String> patterns;
+  private Sampler sampler;
+  public MessageGenerator(List<String> patterns, Sampler sampler) {
+    this.patterns = patterns;
+    this.sampler = sampler;
+  }
+
+  @Override
+  public String get() {
+    int sample = sampler.sample(rng.get(), patterns.size());
+    String pattern = patterns.get(sample);
+    long guidId = guidOffset.getAndIncrement();
+    String guid = guidPrefix + guidId;
+    String ts = "" + System.currentTimeMillis();
+    return pattern.replace("$METRON_TS", ts)
+                            .replace("$METRON_GUID", guid);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
new file mode 100644
index 0000000..67bf469
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class SendToKafka extends TimerTask {
+  private long numMessagesSent;
+  private long numSentLast = 0;
+  private long batchSize;
+  private int numBatches;
+  private Supplier<String> messageSupplier;
+  private String kafkaTopic;
+  private ExecutorService pool;
+  protected AtomicLong numSent;
+  private ThreadLocal<KafkaProducer<String, String>> kafkaProducer;
+  public SendToKafka( String kafkaTopic
+                    , long numMessagesSent
+                    , int numBatches
+                    , Supplier<String> messageSupplier
+                    , ExecutorService pool
+                    , AtomicLong numSent
+                    , ThreadLocal<KafkaProducer<String, String>> kafkaProducer
+                    )
+  {
+    this.numSent = numSent;
+    this.kafkaProducer = kafkaProducer;
+    this.pool = pool;
+    this.numMessagesSent = numMessagesSent;
+    this.messageSupplier = messageSupplier;
+    this.numBatches = numBatches;
+    this.batchSize = numMessagesSent/numBatches;
+    this.kafkaTopic = kafkaTopic;
+  }
+
+  @Override
+  public void run() {
+    long numSentCurrent = numSent.get();
+    long numSentSince = numSentCurrent - numSentLast;
+    boolean sendMessages = numSentLast == 0 || numSentSince >= numMessagesSent;
+    if(sendMessages) {
+      Collection<Future<Long>> futures = Collections.synchronizedList(new ArrayList<>());
+      for(int batch = 0;batch < numBatches;++batch) {
+        try {
+          futures.add(pool.submit(() -> {
+            KafkaProducer<String, String> producer = kafkaProducer.get();
+            Collection<Future<?>> b = Collections.synchronizedCollection(new ArrayList<>());
+            for (int i = 0; i < batchSize; ++i) {
+              b.add(sendToKafka(producer, kafkaTopic, messageSupplier.get()));
+            }
+            for(Future<?> f : b) {
+              f.get();
+            }
+            return batchSize;
+          }));
+
+        } catch (Exception e) {
+          e.printStackTrace(System.err);
+        }
+      }
+      for(Future<Long> f : futures) {
+        try {
+          f.get();
+        } catch (Exception e) {
+          e.printStackTrace(System.err);
+        }
+      }
+      numSentLast = numSentCurrent;
+    }
+  }
+
+  protected Future<?> sendToKafka(KafkaProducer<String, String> producer, String kafkaTopic, String message) {
+    return producer.send(new ProducerRecord<>(kafkaTopic, message),
+                      (recordMetadata, e) -> {
+                        if(e != null) {
+                          e.printStackTrace(System.err);
+                        }
+                        numSent.incrementAndGet();
+                      }
+              );
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java
new file mode 100644
index 0000000..80cb5cc
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public abstract class AbstractMonitor implements Supplier<Long>, MonitorNaming {
+  private static final double EPSILON = 1e-6;
+  protected Optional<?> kafkaTopic;
+  protected long timestampPrevious = 0;
+  public AbstractMonitor(Optional<?> kafkaTopic) {
+    this.kafkaTopic = kafkaTopic;
+  }
+
+  protected abstract Long monitor(double deltaTs);
+
+  @Override
+  public Long get() {
+    long timeStarted = System.currentTimeMillis();
+    Long ret = null;
+    if(timestampPrevious > 0) {
+      double deltaTs = (timeStarted - timestampPrevious) / 1000.0;
+      if (Math.abs(deltaTs) > EPSILON) {
+        ret = monitor(deltaTs);
+      }
+    }
+    timestampPrevious = timeStarted;
+    return ret;
+  }
+
+  public abstract String format();
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java
new file mode 100644
index 0000000..3e380bb
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class EPSGeneratedMonitor extends AbstractMonitor {
+  private AtomicLong numSent;
+  private long numSentPrevious = 0;
+  public EPSGeneratedMonitor(Optional<?> kafkaTopic, AtomicLong numSent) {
+    super(kafkaTopic);
+    this.numSent = numSent;
+  }
+
+  @Override
+  protected Long monitor(double deltaTs) {
+    if(kafkaTopic.isPresent()) {
+      long totalProcessed = numSent.get();
+      long written = (totalProcessed - numSentPrevious);
+      long epsWritten = (long) (written / deltaTs);
+      numSentPrevious = totalProcessed;
+      return epsWritten;
+    }
+    return null;
+  }
+
+  @Override
+  public String format() {
+    return "%d eps generated to " + kafkaTopic.get();
+  }
+
+  @Override
+  public String name() {
+    return "generated";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java
new file mode 100644
index 0000000..96efd1d
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.metron.performance.util.KafkaUtil;
+
+import java.util.Map;
+import java.util.Optional;
+
+public class EPSThroughputWrittenMonitor extends AbstractMonitor {
+  Map<Integer, Long> lastOffsetMap = null;
+  KafkaConsumer<String, String> consumer;
+  public EPSThroughputWrittenMonitor(Optional<?> kafkaTopic, Map<String, Object> kafkaProps) {
+    super(kafkaTopic);
+    consumer = new KafkaConsumer<>(kafkaProps);
+  }
+
+  private Long writtenSince(Map<Integer, Long> partitionOffsets, Map<Integer, Long> lastOffsetMap) {
+    if(partitionOffsets == null) {
+      return null;
+    }
+    long sum = 0;
+    for(Map.Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) {
+      sum += partitionOffset.getValue() - lastOffsetMap.get(partitionOffset.getKey());
+    }
+    return sum;
+  }
+
+  @Override
+  protected Long monitor(double deltaTs) {
+    Optional<Long> epsWritten = Optional.empty();
+    if(kafkaTopic.isPresent()) {
+      if(lastOffsetMap != null) {
+        Map<Integer, Long> currentOffsets = KafkaUtil.INSTANCE.getKafkaOffsetMap(consumer, (String) kafkaTopic.get());
+        Long eventsWrittenSince = writtenSince(currentOffsets, lastOffsetMap);
+        if (eventsWrittenSince != null) {
+          epsWritten = Optional.of((long) (eventsWrittenSince / deltaTs));
+        }
+        lastOffsetMap = currentOffsets == null ? lastOffsetMap : currentOffsets;
+        if (epsWritten.isPresent()) {
+          return epsWritten.get();
+        }
+      }
+      else {
+        lastOffsetMap = KafkaUtil.INSTANCE.getKafkaOffsetMap(consumer, (String)kafkaTopic.get());
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String format() {
+    return "%d eps throughput measured for " + kafkaTopic.get();
+  }
+
+  @Override
+  public String name() {
+    return "throughput measured";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java
new file mode 100644
index 0000000..4833c17
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+public interface MonitorNaming {
+  String format();
+  String name();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java
new file mode 100644
index 0000000..1e02a00
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TimerTask;
+
+public class MonitorTask extends TimerTask {
+  private Writer writer;
+  public MonitorTask(Writer writer) {
+    this.writer = writer;
+  }
+
+  /**
+   * The action to be performed by this timer task.
+   */
+  @Override
+  public void run() {
+    writer.writeAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java
new file mode 100644
index 0000000..e094b74
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
+import java.util.Optional;
+
+public class Results {
+  private String format;
+  private String name;
+  private Optional<DescriptiveStatistics> history;
+  private Long eps;
+  public Results(String format, String name, Long eps, Optional<DescriptiveStatistics> history) {
+    this.format = format;
+    this.name = name;
+    this.history = history;
+    this.eps = eps;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public Long getEps() {
+    return eps;
+  }
+
+  public String getFormat() {
+    return format;
+  }
+
+  public Optional<DescriptiveStatistics> getHistory() {
+    return history;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java
new file mode 100644
index 0000000..112206d
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import com.google.common.base.Joiner;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public class CSVWriter implements Consumer<Writable> {
+  private Optional<PrintWriter> pw = Optional.empty();
+
+  public CSVWriter(File outFile) throws IOException {
+    if(outFile != null) {
+      pw = Optional.of(new PrintWriter(new FileWriter(outFile)));
+    }
+  }
+
+  @Override
+  public void accept(Writable writable) {
+    if(pw.isPresent()) {
+      List<String> parts = new ArrayList<>();
+      parts.add("" + writable.getDate().getTime());
+      for (Results r : writable.getResults()) {
+        parts.add(r.getName());
+        parts.add(r.getEps() == null?"":(r.getEps() + ""));
+        if (r.getHistory().isPresent()) {
+          parts.add("" + (int) r.getHistory().get().getMean());
+          parts.add("" + (int) Math.sqrt(r.getHistory().get().getVariance()));
+        } else {
+          parts.add("");
+          parts.add("");
+        }
+      }
+      pw.get().println(Joiner.on(",").join(parts));
+      pw.get().flush();
+    }
+  }
+
+  public void close() {
+    if(pw.isPresent()) {
+      pw.get().close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java
new file mode 100644
index 0000000..efb2ad3
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Consumer;
+
+public class ConsoleWriter implements Consumer<Writable> {
+
+  private String getSummary(DescriptiveStatistics stats) {
+    return String.format("Mean: %d, Std Dev: %d", (int)stats.getMean(), (int)Math.sqrt(stats.getVariance()));
+  }
+
+  @Override
+  public void accept(Writable writable) {
+    List<String> parts = new ArrayList<>();
+    Date date = writable.getDate();
+    for(Results r : writable.getResults()) {
+      Long eps = r.getEps();
+      if(eps != null) {
+        String part = String.format(r.getFormat(), eps);
+        if (r.getHistory().isPresent()) {
+          part += " (" + getSummary(r.getHistory().get()) + ")";
+        }
+        parts.add(part);
+      }
+    }
+    if(date != null) {
+      DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+      String header = dateFormat.format(date) + " - ";
+      String emptyHeader = StringUtils.repeat(" ", header.length());
+      for (int i = 0; i < parts.size(); ++i) {
+        String part = parts.get(i);
+        if (i == 0) {
+          System.out.println(header + (part == null ? "" : part));
+        } else {
+          System.out.println(emptyHeader + (part == null ? "" : part));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java
new file mode 100644
index 0000000..3ed62bf
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.util.Date;
+import java.util.List;
+
+public class Writable {
+  private Date date;
+  private List<Results> results;
+  public Writable(Date date, List<Results> results) {
+    this.date = date;
+    this.results = results;
+  }
+
+  public Date getDate() {
+    return date;
+  }
+
+  public List<Results> getResults() {
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
new file mode 100644
index 0000000..a9d915b
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public class Writer {
+
+  private int summaryLookback;
+  private List<LinkedList<Double>> summaries = new ArrayList<>();
+  private List<Consumer<Writable>> writers;
+  private List<AbstractMonitor> monitors;
+
+  public Writer(List<AbstractMonitor> monitors, int summaryLookback, List<Consumer<Writable>> writers) {
+    this.summaryLookback = summaryLookback;
+    this.writers = writers;
+    this.monitors = monitors;
+    for(AbstractMonitor m : monitors) {
+      this.summaries.add(new LinkedList<>());
+    }
+  }
+
+  public void writeAll() {
+    int i = 0;
+    Date dateOf = new Date();
+    List<Results> results = new ArrayList<>();
+    for(AbstractMonitor m : monitors) {
+      Long eps = m.get();
+      if(eps != null && summaryLookback > 0) {
+          LinkedList<Double> summary = summaries.get(i);
+          addToLookback(eps.doubleValue(), summary);
+          results.add(new Results(m.format(), m.name(), eps, Optional.of(getStats(summary))));
+      }
+      else {
+        results.add(new Results(m.format(), m.name(), eps, Optional.empty()));
+      }
+      i++;
+    }
+    Writable writable = new Writable(dateOf, results);
+    for(Consumer<Writable> writer : writers) {
+      writer.accept(writable);
+    }
+  }
+
+  private void addToLookback(Double d, LinkedList<Double> lookback) {
+    if(lookback.size() >= summaryLookback) {
+      lookback.removeFirst();
+    }
+    lookback.addLast(d);
+  }
+
+  public DescriptiveStatistics getStats(List<Double> avg) {
+    DescriptiveStatistics stats = new DescriptiveStatistics();
+    for(Double d : avg) {
+      if(d == null || Double.isNaN(d)) {
+        continue;
+      }
+      stats.addValue(d);
+    }
+    return stats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
new file mode 100644
index 0000000..f0a5b2c
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+public class BiasedSampler implements Sampler {
+  TreeMap<Double, Map.Entry<Integer, Integer>> discreteDistribution;
+  public BiasedSampler(List<Map.Entry<Integer, Integer>>  discreteDistribution, int max) {
+    this.discreteDistribution = createDistribution(discreteDistribution, max);
+  }
+
+  public static List<Map.Entry<Integer, Integer>> readDistribution(BufferedReader distrFile) throws IOException {
+    return readDistribution(distrFile, false);
+  }
+
+  public static List<Map.Entry<Integer, Integer>> readDistribution(BufferedReader distrFile, boolean quiet) throws IOException {
+    List<Map.Entry<Integer, Integer>> ret = new ArrayList<>();
+    if(!quiet) {
+      System.out.println("Using biased sampler with the following biases:");
+    }
+    int sumLeft = 0;
+    int sumRight = 0;
+    for(String line = null;(line = distrFile.readLine()) != null;) {
+      if(line.startsWith("#")) {
+        continue;
+      }
+      Iterable<String> it = Splitter.on(",").split(line.trim());
+      if(Iterables.size(it) != 2) {
+        throw new IllegalArgumentException(line + " should be a comma separated pair of integers, but was not.");
+      }
+      int left = Integer.parseInt(Iterables.getFirst(it, null));
+      int right = Integer.parseInt(Iterables.getLast(it, null));
+      if(left <= 0 || left > 100) {
+        throw new IllegalArgumentException(line + ": " + (left < 0?left:right) + " must a positive integer in (0, 100]");
+      }
+      if(right <= 0 || right > 100) {
+        throw new IllegalArgumentException(line + ": " + right + " must a positive integer in (0, 100]");
+      }
+      if(!quiet) {
+        System.out.println("\t" + left + "% of templates will comprise roughly " + right + "% of sample output");
+      }
+      ret.add(new AbstractMap.SimpleEntry<>(left, right));
+      sumLeft += left;
+      sumRight += right;
+    }
+    if(sumLeft > 100 || sumRight > 100 ) {
+      throw new IllegalStateException("Neither columns must sum to beyond 100.  " +
+              "The first column is the % of templates. " +
+              "The second column is the % of the sample that % of template occupies.");
+    }
+    else if(sumLeft < 100 && sumRight < 100) {
+      int left = 100 - sumLeft;
+      int right = 100 - sumRight;
+      if(!quiet) {
+        System.out.println("\t" + left + "% of templates will comprise roughly " + right + "% of sample output");
+      }
+      ret.add(new AbstractMap.SimpleEntry<>(left, right));
+    }
+    return ret;
+
+  }
+
+  private static TreeMap<Double, Map.Entry<Integer, Integer>>
+                 createDistribution(List<Map.Entry<Integer, Integer>>  discreteDistribution, int max) {
+    TreeMap<Double, Map.Entry<Integer, Integer>> ret = new TreeMap<>();
+    int from = 0;
+    double weight = 0.0d;
+    for(Map.Entry<Integer, Integer> kv : discreteDistribution) {
+      double pctVals = kv.getKey()/100.0;
+      int to = from + (int)(max*pctVals);
+      double pctWeight = kv.getValue()/100.0;
+      ret.put(weight, new AbstractMap.SimpleEntry<>(from, to));
+      weight += pctWeight;
+      from = to;
+    }
+    return ret;
+  }
+
+  @Override
+  public int sample(Random rng, int limit) {
+    double weight = rng.nextDouble();
+    Map.Entry<Integer, Integer> range = discreteDistribution.floorEntry(weight).getValue();
+    return rng.nextInt(range.getValue() - range.getKey()) + range.getKey();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java
new file mode 100644
index 0000000..e5f03c8
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import java.util.Random;
+
+public interface Sampler {
+  int sample(Random rng, int limit);
+}