You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/04/27 19:29:49 UTC
[09/50] [abbrv] metron git commit: METRON-1483: Create a tool to
monitor performance of the topologies closes apache/incubator-metron#958
METRON-1483: Create a tool to monitor performance of the topologies closes apache/incubator-metron#958
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/46ad9d93
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/46ad9d93
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/46ad9d93
Branch: refs/heads/feature/METRON-1416-upgrade-solr
Commit: 46ad9d93b4385da0f8668f2ba84212d54d00ba4b
Parents: e3eeec3
Author: cstella <ce...@gmail.com>
Authored: Tue Mar 20 09:36:32 2018 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue Mar 20 09:36:32 2018 -0400
----------------------------------------------------------------------
metron-contrib/metron-performance/README.md | 205 ++++++++
.../performance_measurement.png | Bin 0 -> 5790 bytes
metron-contrib/metron-performance/pom.xml | 134 +++++
.../src/main/assembly/assembly.xml | 42 ++
.../metron/performance/load/LoadGenerator.java | 175 +++++++
.../metron/performance/load/LoadOptions.java | 499 +++++++++++++++++++
.../performance/load/MessageGenerator.java | 48 ++
.../metron/performance/load/SendToKafka.java | 107 ++++
.../load/monitor/AbstractMonitor.java | 49 ++
.../load/monitor/EPSGeneratedMonitor.java | 53 ++
.../monitor/EPSThroughputWrittenMonitor.java | 77 +++
.../performance/load/monitor/MonitorNaming.java | 23 +
.../performance/load/monitor/MonitorTask.java | 44 ++
.../performance/load/monitor/Results.java | 51 ++
.../load/monitor/writers/CSVWriter.java | 67 +++
.../load/monitor/writers/ConsoleWriter.java | 65 +++
.../load/monitor/writers/Writable.java | 40 ++
.../load/monitor/writers/Writer.java | 86 ++++
.../performance/sampler/BiasedSampler.java | 113 +++++
.../metron/performance/sampler/Sampler.java | 24 +
.../performance/sampler/UnbiasedSampler.java | 28 ++
.../metron/performance/util/KafkaUtil.java | 56 +++
.../src/main/scripts/load_tool.sh | 36 ++
.../performance/load/LoadOptionsTest.java | 93 ++++
.../performance/load/SendToKafkaTest.java | 49 ++
.../metron/performance/sampler/SamplerTest.java | 145 ++++++
metron-contrib/pom.xml | 15 +
.../common-services/METRON/CURRENT/metainfo.xml | 4 +
.../packaging/docker/deb-docker/pom.xml | 6 +
.../docker/rpm-docker/SPECS/metron.spec | 21 +
.../packaging/docker/rpm-docker/pom.xml | 6 +
31 files changed, 2361 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/README.md
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/README.md b/metron-contrib/metron-performance/README.md
new file mode 100644
index 0000000..8981349
--- /dev/null
+++ b/metron-contrib/metron-performance/README.md
@@ -0,0 +1,205 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# Performance Utilities
+
+This project creates some useful performance monitoring and measurement
+utilities.
+
+## `load-tool.sh`
+
+The Load tool is intended to do the following:
+* Generate a load at a specific events per second into kafka
+ * The messages are taken from a template file, where there is a message template per line
+ * The load can be biased (e.g. 80% of the load can be comprised of 20% of the templates)
+* Monitor the kafka offsets for a topic to determine the events per second written
+ * This could be the topic that you are generating load on
+ * This could be another topic that represents the output of some topology (e.g. generate load on `enrichments` and monitor `indexing` to determine the throughput of the enrichment topology).
+
+```
+usage: Generator
+ -bs,--sample_bias <BIAS_FILE> The discrete distribution to bias
+ the sampling. This is a CSV of 2
+ columns. The first column is the %
+ of the templates and the 2nd column
+ is the probability (0-100) that
+ it's chosen. For instance:
+ 20,80
+ 80,20
+ implies that 20% of the templates
+ will comprise 80% of the output and
+ the remaining 80% of the templates
+ will comprise 20% of the output.
+ -c,--csv <CSV_FILE> A CSV file to emit monitoring data
+ to. The format is a CSV with the
+ following schema: timestamp, (name,
+ eps, historical_mean,
+ historical_stddev)+
+ -cg,--consumer_group <GROUP_ID> Consumer Group. The default is
+ load.group
+ -e,--eps <EPS> The target events per second
+ -h,--help Generate Help screen
+ -k,--kafka_config <CONFIG_FILE> The kafka config. This is a file
+ containing a JSON map with the
+ kafka config.
+ -l,--lookback <LOOKBACK> When summarizing, how many
+ monitoring periods should we
+ summarize over? If 0, then no
+ summary. Default: 5
+ -md,--monitor_delta_ms <TIME_IN_MS> The time (in ms) between monitoring
+ output. Default is 10000
+ -mt,--monitor_topic <TOPIC> The kafka topic to monitor.
+ -ot,--output_topic <TOPIC> The kafka topic to write to
+ -p,--threads <NUM_THREADS> The number of threads to use when
+ extracting data. The default is
+ the number of cores of your
+ machine.
+ -sd,--send_delta_ms <TIME_IN_MS> The time (in ms) between sending a
+ batch of messages. Default is 100
+ -t,--template <TEMPLATE_FILE> The template file to use for
+ generation. This should be a file
+ with a template per line with
+ $METRON_TS and $METRON_GUID in the
+ spots for timestamp and guid, if
+ you so desire them.
+ -tl,--time_limit_ms <MS> The total amount of time to run
+ this in milliseconds. By default,
+ it never stops.
+ -z,--zk_quorum <QUORUM> zookeeper quorum
+```
+
+## Templates
+Messages are drawn from a template file. A template file has a message template per line.
+For instance, let's say we want to generate JSON maps with fields: `source.type`, `ip_src_addr`
+and `ip_dst_addr`. We can generate a template file with a template like the following per line:
+```
+{ "source.type" : "asa", "ip_src_addr" : "127.0.0.1", "ip_dst_addr" : "191.168.1.1" }
+```
+
+When messages are generated, there are some special replacements that can be used: `$METRON_TS` and `$METRON_GUID`.
+We can adjust our previous template to use these like so:
+```
+{ "source.type" : "asa", "ip_src_addr" : "127.0.0.1", "ip_dst_addr" : "191.168.1.1", "timestamp" : $METRON_TS, "guid" : "$METRON_GUID" }
+```
+One note about GUIDs generated. We do not generate global UUIDs, they are unique only within the context of a given generator run.
+
+## Biased Sampling
+
+This load tool can be configured to use biased sampling. This is useful if, for instance, you are trying to model data which is not distributed
+uniformly, like many types of network data. Generating synthetic data with similar distribution to your regular data will enable the caches
+to be exercised in the same way, for instance, and yield a more realistic scenario.
+
+You specify the biases in a csv file with 2 columns:
+* The first column represents the % of the templates
+* The second column represents the % of the generated output.
+
+A simple example would be to generate samples based on Pareto's principle:
+```
+20,80
+80,20
+```
+This would yield biases that mean the first 20% of the templates in the template file would comprise 80% of the output.
+
+A more complex example might be:
+```
+20,80
+20,5
+50,1
+10,14
+```
+This would would imply:
+* The first 20% of the templates would comprise 80% of the output
+* The next 20% of the templates would comprise 5% of the output
+* The next 50% of the templates would comprise 1% of the output
+* The next 10% of the templates would comprise 14% of the output.
+
+## CSV Output
+
+For those who would prefer a different visualization or wish to incorporate the output of this tool into an automated test,
+you can specify a file to emit data in CSV format to via the `-c` or `--csv` option.
+
+The CSV columns are as follows:
+* timestamp in epoch millis
+
+If you are generating synthetic data, then:
+* "generated"
+* The events per second generated
+* The mean of the events per second generated for the the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`)
+* The standard deviation of the events per second generated for the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`)
+
+If you are monitoring a topic, then:
+* "throughput measured"
+* The events per second measured
+* The mean of the events per second measured for the the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`)
+* The standard deviation of the events per second measured for the last `k` runs, where `k` is the lookback (set via `-l` and defaulted to `5`)
+
+Obviously, if you are doing both generating and monitoring the throughput of a topic, then all of the columns are added.
+
+An example of CSV output is:
+```
+1520506955047,generated,,,,throughput measured,,,
+1520506964896,generated,1045,1045,0,throughput measured,,,
+1520506974896,generated,1000,1022,31,throughput measured,1002,1002,0
+1520506984904,generated,999,1014,26,throughput measured,999,1000,2
+1520506994896,generated,1000,1011,22,throughput measured,1000,1000,1
+1520507004896,generated,1000,1008,20,throughput measured,1000,1000,1
+```
+
+## Use-cases for the Load Tool
+
+### Measure Throughput of a Topology
+
+One can use the load tool to monitor performance of a kafka-to-kafka topology.
+For instance, we could monitor the throughput of the enrichment topology by monitoring the `enrichments` kafka topic:
+```
+$METRON_HOME/bin/load_tool.sh -mt enrichments -z $ZOOKEEPER
+```
+
+### Generate Synthetic Load and Measure Performance
+
+One can use the load tool to generate synthetic load and monitor performance of a kafka-to-kafka topology. For instance, we could
+monitor the performance of the enrichment topology. It is advised to start the enrichment topology against a new topic and write
+to a new topic so as to not pollute your downstream indices. So, for instance we could create a kafka topic called
+`enrichments_load` by generating load on it. We could also create a new kafka topic called `indexing_load` and configure the enrichment
+topology to output to it. We would then generate load on `enrichments_load` and monitor `indexing_load`.
+```
+#Threadpool of size 5, you want somewhere between 5 and 10 depending on the throughput numbers you're trying to drive
+#Messages drawn from ~/dummy.templates, which is a message template per line
+#Generate at a rate of 9000 messages per second
+#Emit the data to a CSV file ~/measurements.csv
+$METRON_HOME/bin/load_tool.sh -p 5 -ot enrichments_load -mt indexing_load -t ~/dummy.templates -eps 9000 -z $ZOOKEEPER -c ~/measurements.csv
+```
+
+Now, with the help of a bash function and gnuplot we can generate a plot
+of the historical throughput measurements for `indexing_load`:
+```
+# Ensure that you have installed gnuplot and the liberation font package
+# via yum install -y gnuplot liberation-sans-fonts
+# We will define a plot function that will generate a png plot. It takes
+# one arg, the output file. It expects to have a 2 column CSV streamed
+# with the first dimension being the timestamp and the second dimension
+# being what you want plotted.
+plot() {
+ awk -F, '{printf "%d %d\n", $1/1000, $2} END { print "e" }' | gnuplot -e "reset;clear;set style fill solid 1.0 border -1; set nokey;set title 'Throughput Measured'; set xlabel 'Time'; set boxwidth 0.5; set xtics rotate; set ylabel 'events/sec';set xdata time; set timefmt '%s';set format x '%H:%M:%S';set term png enhanced font '/usr/share/fonts/liberation/LiberationSans-Regular.ttf' 12 size 900,400; set output '$1';plot '< cat -' using 1:2 with line lt -1 lw 2;"
+}
+
+# We want to transform the CSV file into a space separated file with the
+# timestamp followed by the throughput measurements.
+cat ~/measurements.csv | awk -F, '{printf "%d,%d\n", $1, $8 }' | plot performance_measurement.png
+```
+This generates a plot like so to `performance_measurement.png`:
+![Performance Measurement](performance_measurement.png)
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/performance_measurement.png
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/performance_measurement.png b/metron-contrib/metron-performance/performance_measurement.png
new file mode 100644
index 0000000..c4dcfb1
Binary files /dev/null and b/metron-contrib/metron-performance/performance_measurement.png differ
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/pom.xml
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/pom.xml b/metron-contrib/metron-performance/pom.xml
new file mode 100644
index 0000000..4242110
--- /dev/null
+++ b/metron-contrib/metron-performance/pom.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <name>metron-performance</name>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-performance</artifactId>
+ <packaging>jar</packaging>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-contrib</artifactId>
+ <version>0.4.3</version>
+ </parent>
+ <description>Performance Testing Utilities</description>
+ <url>https://metron.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_guava_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-common</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>${global_kafka_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${global_shade_version}</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <artifactSet>
+ <excludes>
+ <exclude>*slf4j*</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.metron.perf.guava</shadedPattern>
+ </relocation>
+ </relocations>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resources>
+ <resource>.yaml</resource>
+ <resource>LICENSE.txt</resource>
+ <resource>ASL2.0</resource>
+ <resource>NOTICE.txt</resource>
+ </resources>
+ </transformer>
+ <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE -->
+ <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+ <addHeader>false</addHeader>
+ <projectName>${project.name}</projectName>
+ </transformer-->
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/assembly/assembly.xml b/metron-contrib/metron-performance/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..3595284
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/assembly/assembly.xml
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<assembly>
+ <id>archive</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.basedir}/src/main/scripts</directory>
+ <outputDirectory>bin</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ <excludes>
+ <exclude>**/*.formatted</exclude>
+ <exclude>**/*.filtered</exclude>
+ </excludes>
+ <fileMode>0755</fileMode>
+ <lineEnding>unix</lineEnding>
+ <filtered>true</filtered>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/target</directory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ <outputDirectory>lib</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
new file mode 100644
index 0000000..33f777b
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadGenerator.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.EPSGeneratedMonitor;
+import org.apache.metron.performance.load.monitor.EPSThroughputWrittenMonitor;
+import org.apache.metron.performance.load.monitor.MonitorTask;
+import org.apache.metron.performance.load.monitor.writers.CSVWriter;
+import org.apache.metron.performance.load.monitor.writers.ConsoleWriter;
+import org.apache.metron.performance.load.monitor.writers.Writable;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.performance.sampler.Sampler;
+import org.apache.metron.performance.sampler.UnbiasedSampler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class LoadGenerator
+{
+ public static String CONSUMER_GROUP = "metron.load.group";
+ public static long SEND_PERIOD_MS = 100;
+ public static long MONITOR_PERIOD_MS = 1000*10;
+ private static ExecutorService pool;
+ private static ThreadLocal<KafkaProducer<String, String>> kafkaProducer;
+ public static AtomicLong numSent = new AtomicLong(0);
+
+ public static void main( String[] args ) throws Exception {
+ CommandLine cli = LoadOptions.parse(new PosixParser(), args);
+ EnumMap<LoadOptions, Optional<Object>> evaluatedArgs = LoadOptions.createConfig(cli);
+ Map<String, Object> kafkaConfig = new HashMap<>();
+ kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ if(LoadOptions.ZK.has(cli)) {
+ String zkQuorum = (String) evaluatedArgs.get(LoadOptions.ZK).get();
+ kafkaConfig.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+ , Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum))
+ );
+ }
+ String groupId = evaluatedArgs.get(LoadOptions.CONSUMER_GROUP).get().toString();
+ System.out.println("Consumer Group: " + groupId);
+ kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ if(LoadOptions.KAFKA_CONFIG.has(cli)) {
+ kafkaConfig.putAll((Map<String, Object>) evaluatedArgs.get(LoadOptions.KAFKA_CONFIG).get());
+ }
+ kafkaProducer = ThreadLocal.withInitial(() -> new KafkaProducer<>(kafkaConfig));
+ int numThreads = (int)evaluatedArgs.get(LoadOptions.NUM_THREADS).get();
+ System.out.println("Thread pool size: " + numThreads);
+ pool = Executors.newFixedThreadPool(numThreads);
+ Optional<Object> eps = evaluatedArgs.get(LoadOptions.EPS);
+
+ Optional<Object> outputTopic = evaluatedArgs.get(LoadOptions.OUTPUT_TOPIC);
+ Optional<Object> monitorTopic = evaluatedArgs.get(LoadOptions.MONITOR_TOPIC);
+ long sendDelta = (long) evaluatedArgs.get(LoadOptions.SEND_DELTA).get();
+ long monitorDelta = (long) evaluatedArgs.get(LoadOptions.MONITOR_DELTA).get();
+ if((eps.isPresent() && outputTopic.isPresent()) || monitorTopic.isPresent()) {
+ Timer timer = new Timer(false);
+ long startTimeMs = System.currentTimeMillis();
+ if(outputTopic.isPresent() && eps.isPresent()) {
+ List<String> templates = (List<String>)evaluatedArgs.get(LoadOptions.TEMPLATE).get();
+ if(templates.isEmpty()) {
+ System.out.println("Empty templates, so nothing to do.");
+ return;
+ }
+ Optional<Object> biases = evaluatedArgs.get(LoadOptions.BIASED_SAMPLE);
+ Sampler sampler = new UnbiasedSampler();
+ if(biases.isPresent()){
+ sampler = new BiasedSampler((List<Map.Entry<Integer, Integer>>) biases.get(), templates.size());
+ }
+ MessageGenerator generator = new MessageGenerator(templates, sampler);
+ Long targetLoad = (Long)eps.get();
+ int periodsPerSecond = (int)(1000/sendDelta);
+ long messagesPerPeriod = targetLoad/periodsPerSecond;
+ String outputTopicStr = (String)outputTopic.get();
+ System.out.println("Generating data to " + outputTopicStr + " at " + targetLoad + " events per second");
+ System.out.println("Sending " + messagesPerPeriod + " messages to " + outputTopicStr + " every " + sendDelta + "ms");
+ timer.scheduleAtFixedRate(new SendToKafka( outputTopicStr
+ , messagesPerPeriod
+ , numThreads
+ , generator
+ , pool
+ , numSent
+ , kafkaProducer
+ )
+ , 0, sendDelta);
+ }
+ List<AbstractMonitor> monitors = new ArrayList<>();
+ if(outputTopic.isPresent() && monitorTopic.isPresent()) {
+ System.out.println("Monitoring " + monitorTopic.get() + " every " + monitorDelta + " ms");
+ monitors.add(new EPSGeneratedMonitor(outputTopic, numSent));
+ monitors.add(new EPSThroughputWrittenMonitor(monitorTopic, kafkaConfig));
+ }
+ else if(outputTopic.isPresent() && !monitorTopic.isPresent()) {
+ System.out.println("Monitoring " + outputTopic.get() + " every " + monitorDelta + " ms");
+ monitors.add(new EPSGeneratedMonitor(outputTopic, numSent));
+ monitors.add(new EPSThroughputWrittenMonitor(outputTopic, kafkaConfig));
+ }
+ else if(!outputTopic.isPresent() && monitorTopic.isPresent()) {
+ System.out.println("Monitoring " + monitorTopic.get() + " every " + monitorDelta + " ms");
+ monitors.add(new EPSThroughputWrittenMonitor(monitorTopic, kafkaConfig));
+ }
+ else if(!outputTopic.isPresent() && !monitorTopic.isPresent()) {
+ System.out.println("You have not specified an output topic or a monitoring topic, so I have nothing to do here.");
+ }
+ int lookback = (int) evaluatedArgs.get(LoadOptions.SUMMARY_LOOKBACK).get();
+ if(lookback > 0) {
+ System.out.println("Summarizing over the last " + lookback + " monitoring periods (" + lookback*monitorDelta + "ms)");
+ }
+ else {
+ System.out.println("Turning off summarization.");
+ }
+ final CSVWriter csvWriter = new CSVWriter((File) evaluatedArgs.get(LoadOptions.CSV).orElse(null));
+ Writer writer = new Writer(monitors, lookback, new ArrayList<Consumer<Writable>>() {{
+ add(new ConsoleWriter());
+ add(csvWriter);
+ }});
+ timer.scheduleAtFixedRate(new MonitorTask(writer), 0, monitorDelta);
+ Optional<Object> timeLimit = evaluatedArgs.get(LoadOptions.TIME_LIMIT);
+ if(timeLimit.isPresent()) {
+ System.out.println("Ending in " + timeLimit.get() + " ms.");
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ timer.cancel();
+ long durationS = (System.currentTimeMillis() - startTimeMs)/1000;
+ System.out.println("\nGenerated " + numSent.get() + " in " + durationS + " seconds." );
+ csvWriter.close();
+ System.exit(0);
+ }
+ }
+
+ , (Long) timeLimit.get());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
new file mode 100644
index 0000000..b4d217d
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/LoadOptions.java
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.performance.sampler.BiasedSampler;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.cli.OptionHandler;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum LoadOptions implements CLIOptions<LoadOptions> {
+ HELP(new OptionHandler<LoadOptions>() {
+
+ @Override
+ public String getShortCode() {
+ return "h";
+ }
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ }),
+ ZK(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "zk_quorum", true, "zookeeper quorum");
+ o.setArgName("QUORUM");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(option.has(cli)) {
+ return Optional.ofNullable(option.get(cli));
+ }
+ else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public String getShortCode() {
+ return "z";
+ }
+ }),
+ CONSUMER_GROUP(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "consumer_group", true, "Consumer Group. The default is " + LoadGenerator.CONSUMER_GROUP);
+ o.setArgName("GROUP_ID");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(option.has(cli)) {
+ return Optional.ofNullable(option.get(cli));
+ }
+ else {
+ return Optional.of(LoadGenerator.CONSUMER_GROUP);
+ }
+ }
+
+ @Override
+ public String getShortCode() {
+ return "cg";
+ }
+ }),
+ BIASED_SAMPLE(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "sample_bias", true, "The discrete distribution to bias the sampling. " +
+ "This is a CSV of 2 columns. The first column is the % of the templates " +
+ "and the 2nd column is the probability (0-100) that it's chosen. For instance:\n" +
+ " 20,80\n" +
+ " 80,20\n" +
+ "implies that 20% of the templates will comprise 80% of the output and the remaining 80% of the templates will comprise 20% of the output.");
+ o.setArgName("BIAS_FILE");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(!option.has(cli)) {
+ return Optional.empty();
+ }
+ File discreteDistributionFile = new File(option.get(cli));
+ if(discreteDistributionFile.exists()) {
+ try (BufferedReader br = new BufferedReader(new FileReader(discreteDistributionFile))){
+ return Optional.ofNullable(BiasedSampler.readDistribution(br));
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to read distribution file: " + option.get(cli), e);
+ }
+ }
+ else {
+ throw new IllegalStateException("Unable to read distribution file: " + option.get(cli) + " file doesn't exist.");
+ }
+ }
+
+ @Override
+ public String getShortCode() {
+ return "bs";
+ }
+ })
+ ,CSV(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "csv", true, "A CSV file to emit monitoring data to. " +
+ "The format is a CSV with the following schema: timestamp, (name, eps, historical_mean, historical_stddev)+");
+ o.setArgName("CSV_FILE");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(!option.has(cli)) {
+ return Optional.empty();
+ }
+ return Optional.of(new File(option.get(cli)));
+ }
+
+ @Override
+ public String getShortCode() {
+ return "c";
+ }
+ })
+ ,TEMPLATE(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "template", true, "The template file to use for generation. This should be a file with a template per line with $METRON_TS and $METRON_GUID in the spots for timestamp and guid, if you so desire them.");
+ o.setArgName("TEMPLATE_FILE");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(!option.has(cli)) {
+ return Optional.empty();
+ }
+ File templateFile = new File(option.get(cli));
+ if(templateFile.exists()) {
+ List<String> templates = new ArrayList<>();
+ try(BufferedReader br = new BufferedReader(new FileReader(templateFile))) {
+ for(String line = null;(line = br.readLine()) != null;) {
+ templates.add(line);
+ }
+ return Optional.of(templates);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to read template file: " + option.get(cli), e);
+ }
+ }
+ else {
+ throw new IllegalStateException("Unable to read template file: " + option.get(cli) + " file doesn't exist.");
+ }
+ }
+
+ @Override
+ public String getShortCode() {
+ return "t";
+ }
+ })
+ ,SUMMARY_LOOKBACK(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "lookback", true, "When summarizing, how many monitoring periods should we summarize over? If 0, then no summary. Default: 5");
+ o.setArgName("LOOKBACK");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(option.has(cli)) {
+ return Optional.of(ConversionUtils.convert(option.get(cli), Integer.class));
+ }
+ else {
+ return Optional.of(5);
+ }
+ }
+
+ @Override
+ public String getShortCode() {
+ return "l";
+ }
+ })
+ ,EPS(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "eps", true, "The target events per second");
+ o.setArgName("EPS");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(option.has(cli)) {
+ return Optional.of(ConversionUtils.convert(option.get(cli), Long.class));
+ }
+ else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public String getShortCode() {
+ return "e";
+ }
+ })
+ ,KAFKA_CONFIG(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "kafka_config", true, "The kafka config. This is a file containing a JSON map with the kafka config.");
+ o.setArgName("CONFIG_FILE");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(!option.has(cli)) {
+ return Optional.empty();
+ }
+ File configFile = new File(option.get(cli));
+ if(configFile.exists()) {
+ try {
+ return Optional.ofNullable(JSONUtils.INSTANCE.load(configFile, JSONUtils.MAP_SUPPLIER));
+ } catch (FileNotFoundException e) {
+ throw new IllegalStateException("Unable to read file: " + option.get(cli), e);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to read file: " + option.get(cli), e);
+ }
+ }
+ else {
+ throw new IllegalStateException("Unable to read file: " + option.get(cli) + " file doesn't exist.");
+ }
+ }
+
+ @Override
+ public String getShortCode() {
+ return "k";
+ }
+ }),
+ SEND_DELTA(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "send_delta_ms", true, "The time (in ms) between sending a batch of messages. Default is " + LoadGenerator.SEND_PERIOD_MS);
+ o.setArgName("TIME_IN_MS");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(option.has(cli)) {
+ Object res = option.get(cli);
+ return Optional.ofNullable(ConversionUtils.convert(res, Long.class));
+ }
+ return Optional.of(LoadGenerator.SEND_PERIOD_MS);
+
+ }
+
+ @Override
+ public String getShortCode() {
+ return "sd";
+ }
+ }),
+ MONITOR_DELTA(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "monitor_delta_ms", true, "The time (in ms) between monitoring output. Default is " + LoadGenerator.MONITOR_PERIOD_MS);
+ o.setArgName("TIME_IN_MS");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(option.has(cli)) {
+ Object res = option.get(cli);
+ return Optional.ofNullable(ConversionUtils.convert(res, Long.class));
+ }
+ return Optional.of(LoadGenerator.MONITOR_PERIOD_MS);
+
+ }
+
+ @Override
+ public String getShortCode() {
+ return "md";
+ }
+ })
+ ,TIME_LIMIT(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "time_limit_ms", true, "The total amount of time to run this in milliseconds. By default, it never stops.");
+ o.setArgName("MS");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ if(option.has(cli)) {
+ Object res = option.get(cli);
+ Long timeMs = ConversionUtils.convert(res, Long.class);
+ return Optional.ofNullable(timeMs);
+ }
+ return Optional.empty();
+
+ }
+
+ @Override
+ public String getShortCode() {
+ return "tl";
+ }
+ })
+ ,NUM_THREADS(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "threads", true, "The number of threads to use when extracting data. The default is the number of cores of your machine.");
+ o.setArgName("NUM_THREADS");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ int numThreads = Runtime.getRuntime().availableProcessors();
+ if(option.has(cli)) {
+ Object res = option.get(cli);
+ if(res instanceof String && res.toString().toUpperCase().endsWith("C")) {
+ numThreads *= ConversionUtils.convert(res.toString().trim().replace("C", ""), Integer.class);
+ }
+ else {
+ numThreads = ConversionUtils.convert(res, Integer.class);
+ }
+ }
+ return Optional.of(numThreads);
+
+ }
+
+ @Override
+ public String getShortCode() {
+ return "p";
+ }
+ })
+ ,OUTPUT_TOPIC(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "output_topic", true, "The kafka topic to write to");
+ o.setArgName("TOPIC");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ return Optional.ofNullable(option.get(cli));
+ }
+
+ @Override
+ public String getShortCode() {
+ return "ot";
+ }
+ }),
+ MONITOR_TOPIC(new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "monitor_topic", true, "The kafka topic to monitor.");
+ o.setArgName("TOPIC");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ return Optional.ofNullable(option.get(cli));
+ }
+
+ @Override
+ public String getShortCode() {
+ return "mt";
+ }
+ }),
+ ;
+ Option option;
+ String shortCode;
+ OptionHandler<LoadOptions> handler;
+ LoadOptions(OptionHandler<LoadOptions> optionHandler) {
+ this.shortCode = optionHandler.getShortCode();
+ this.handler = optionHandler;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ @Override
+ public Option getOption() {
+ return option;
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+
+ @Override
+ public OptionHandler<LoadOptions> getHandler() {
+ return null;
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static EnumMap<LoadOptions, Optional<Object> > createConfig(CommandLine cli) {
+ EnumMap<LoadOptions, Optional<Object> > ret = new EnumMap<>(LoadOptions.class);
+ for(LoadOptions option : values()) {
+ ret.put(option, option.handler.getValue(option, cli));
+ }
+ return ret;
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "Generator", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(LoadOptions o : LoadOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java
new file mode 100644
index 0000000..572d438
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/MessageGenerator.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.metron.performance.sampler.Sampler;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class MessageGenerator implements Supplier<String> {
+ private static ThreadLocal<Random> rng = ThreadLocal.withInitial(() -> new Random());
+ private static AtomicLong guidOffset = new AtomicLong(0);
+ private static String guidPrefix = "00000000-0000-0000-0000-";
+ private List<String> patterns;
+ private Sampler sampler;
+ public MessageGenerator(List<String> patterns, Sampler sampler) {
+ this.patterns = patterns;
+ this.sampler = sampler;
+ }
+
+ @Override
+ public String get() {
+ int sample = sampler.sample(rng.get(), patterns.size());
+ String pattern = patterns.get(sample);
+ long guidId = guidOffset.getAndIncrement();
+ String guid = guidPrefix + guidId;
+ String ts = "" + System.currentTimeMillis();
+ return pattern.replace("$METRON_TS", ts)
+ .replace("$METRON_GUID", guid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
new file mode 100644
index 0000000..67bf469
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/SendToKafka.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class SendToKafka extends TimerTask {
+ private long numMessagesSent;
+ private long numSentLast = 0;
+ private long batchSize;
+ private int numBatches;
+ private Supplier<String> messageSupplier;
+ private String kafkaTopic;
+ private ExecutorService pool;
+ protected AtomicLong numSent;
+ private ThreadLocal<KafkaProducer<String, String>> kafkaProducer;
+ public SendToKafka( String kafkaTopic
+ , long numMessagesSent
+ , int numBatches
+ , Supplier<String> messageSupplier
+ , ExecutorService pool
+ , AtomicLong numSent
+ , ThreadLocal<KafkaProducer<String, String>> kafkaProducer
+ )
+ {
+ this.numSent = numSent;
+ this.kafkaProducer = kafkaProducer;
+ this.pool = pool;
+ this.numMessagesSent = numMessagesSent;
+ this.messageSupplier = messageSupplier;
+ this.numBatches = numBatches;
+ this.batchSize = numMessagesSent/numBatches;
+ this.kafkaTopic = kafkaTopic;
+ }
+
+ @Override
+ public void run() {
+ long numSentCurrent = numSent.get();
+ long numSentSince = numSentCurrent - numSentLast;
+ boolean sendMessages = numSentLast == 0 || numSentSince >= numMessagesSent;
+ if(sendMessages) {
+ Collection<Future<Long>> futures = Collections.synchronizedList(new ArrayList<>());
+ for(int batch = 0;batch < numBatches;++batch) {
+ try {
+ futures.add(pool.submit(() -> {
+ KafkaProducer<String, String> producer = kafkaProducer.get();
+ Collection<Future<?>> b = Collections.synchronizedCollection(new ArrayList<>());
+ for (int i = 0; i < batchSize; ++i) {
+ b.add(sendToKafka(producer, kafkaTopic, messageSupplier.get()));
+ }
+ for(Future<?> f : b) {
+ f.get();
+ }
+ return batchSize;
+ }));
+
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ }
+ }
+ for(Future<Long> f : futures) {
+ try {
+ f.get();
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ }
+ }
+ numSentLast = numSentCurrent;
+ }
+ }
+
+ protected Future<?> sendToKafka(KafkaProducer<String, String> producer, String kafkaTopic, String message) {
+ return producer.send(new ProducerRecord<>(kafkaTopic, message),
+ (recordMetadata, e) -> {
+ if(e != null) {
+ e.printStackTrace(System.err);
+ }
+ numSent.incrementAndGet();
+ }
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java
new file mode 100644
index 0000000..80cb5cc
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/AbstractMonitor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public abstract class AbstractMonitor implements Supplier<Long>, MonitorNaming {
+ private static final double EPSILON = 1e-6;
+ protected Optional<?> kafkaTopic;
+ protected long timestampPrevious = 0;
+ public AbstractMonitor(Optional<?> kafkaTopic) {
+ this.kafkaTopic = kafkaTopic;
+ }
+
+ protected abstract Long monitor(double deltaTs);
+
+ @Override
+ public Long get() {
+ long timeStarted = System.currentTimeMillis();
+ Long ret = null;
+ if(timestampPrevious > 0) {
+ double deltaTs = (timeStarted - timestampPrevious) / 1000.0;
+ if (Math.abs(deltaTs) > EPSILON) {
+ ret = monitor(deltaTs);
+ }
+ }
+ timestampPrevious = timeStarted;
+ return ret;
+ }
+
+ public abstract String format();
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java
new file mode 100644
index 0000000..3e380bb
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSGeneratedMonitor.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class EPSGeneratedMonitor extends AbstractMonitor {
+ private AtomicLong numSent;
+ private long numSentPrevious = 0;
+ public EPSGeneratedMonitor(Optional<?> kafkaTopic, AtomicLong numSent) {
+ super(kafkaTopic);
+ this.numSent = numSent;
+ }
+
+ @Override
+ protected Long monitor(double deltaTs) {
+ if(kafkaTopic.isPresent()) {
+ long totalProcessed = numSent.get();
+ long written = (totalProcessed - numSentPrevious);
+ long epsWritten = (long) (written / deltaTs);
+ numSentPrevious = totalProcessed;
+ return epsWritten;
+ }
+ return null;
+ }
+
+ @Override
+ public String format() {
+ return "%d eps generated to " + kafkaTopic.get();
+ }
+
+ @Override
+ public String name() {
+ return "generated";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java
new file mode 100644
index 0000000..96efd1d
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/EPSThroughputWrittenMonitor.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.metron.performance.util.KafkaUtil;
+
+import java.util.Map;
+import java.util.Optional;
+
+public class EPSThroughputWrittenMonitor extends AbstractMonitor {
+ Map<Integer, Long> lastOffsetMap = null;
+ KafkaConsumer<String, String> consumer;
+ public EPSThroughputWrittenMonitor(Optional<?> kafkaTopic, Map<String, Object> kafkaProps) {
+ super(kafkaTopic);
+ consumer = new KafkaConsumer<>(kafkaProps);
+ }
+
+ private Long writtenSince(Map<Integer, Long> partitionOffsets, Map<Integer, Long> lastOffsetMap) {
+ if(partitionOffsets == null) {
+ return null;
+ }
+ long sum = 0;
+ for(Map.Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) {
+ sum += partitionOffset.getValue() - lastOffsetMap.get(partitionOffset.getKey());
+ }
+ return sum;
+ }
+
+ @Override
+ protected Long monitor(double deltaTs) {
+ Optional<Long> epsWritten = Optional.empty();
+ if(kafkaTopic.isPresent()) {
+ if(lastOffsetMap != null) {
+ Map<Integer, Long> currentOffsets = KafkaUtil.INSTANCE.getKafkaOffsetMap(consumer, (String) kafkaTopic.get());
+ Long eventsWrittenSince = writtenSince(currentOffsets, lastOffsetMap);
+ if (eventsWrittenSince != null) {
+ epsWritten = Optional.of((long) (eventsWrittenSince / deltaTs));
+ }
+ lastOffsetMap = currentOffsets == null ? lastOffsetMap : currentOffsets;
+ if (epsWritten.isPresent()) {
+ return epsWritten.get();
+ }
+ }
+ else {
+ lastOffsetMap = KafkaUtil.INSTANCE.getKafkaOffsetMap(consumer, (String)kafkaTopic.get());
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String format() {
+ return "%d eps throughput measured for " + kafkaTopic.get();
+ }
+
+ @Override
+ public String name() {
+ return "throughput measured";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java
new file mode 100644
index 0000000..4833c17
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorNaming.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+public interface MonitorNaming {
+ String format();
+ String name();
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java
new file mode 100644
index 0000000..1e02a00
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/MonitorTask.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.writers.Writer;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TimerTask;
+
+public class MonitorTask extends TimerTask {
+ private Writer writer;
+ public MonitorTask(Writer writer) {
+ this.writer = writer;
+ }
+
+ /**
+ * The action to be performed by this timer task.
+ */
+ @Override
+ public void run() {
+ writer.writeAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java
new file mode 100644
index 0000000..e094b74
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/Results.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
+import java.util.Optional;
+
+public class Results {
+ private String format;
+ private String name;
+ private Optional<DescriptiveStatistics> history;
+ private Long eps;
+ public Results(String format, String name, Long eps, Optional<DescriptiveStatistics> history) {
+ this.format = format;
+ this.name = name;
+ this.history = history;
+ this.eps = eps;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Long getEps() {
+ return eps;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ public Optional<DescriptiveStatistics> getHistory() {
+ return history;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java
new file mode 100644
index 0000000..112206d
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/CSVWriter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import com.google.common.base.Joiner;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public class CSVWriter implements Consumer<Writable> {
+ private Optional<PrintWriter> pw = Optional.empty();
+
+ public CSVWriter(File outFile) throws IOException {
+ if(outFile != null) {
+ pw = Optional.of(new PrintWriter(new FileWriter(outFile)));
+ }
+ }
+
+ @Override
+ public void accept(Writable writable) {
+ if(pw.isPresent()) {
+ List<String> parts = new ArrayList<>();
+ parts.add("" + writable.getDate().getTime());
+ for (Results r : writable.getResults()) {
+ parts.add(r.getName());
+ parts.add(r.getEps() == null?"":(r.getEps() + ""));
+ if (r.getHistory().isPresent()) {
+ parts.add("" + (int) r.getHistory().get().getMean());
+ parts.add("" + (int) Math.sqrt(r.getHistory().get().getVariance()));
+ } else {
+ parts.add("");
+ parts.add("");
+ }
+ }
+ pw.get().println(Joiner.on(",").join(parts));
+ pw.get().flush();
+ }
+ }
+
+ public void close() {
+ if(pw.isPresent()) {
+ pw.get().close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java
new file mode 100644
index 0000000..efb2ad3
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/ConsoleWriter.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Consumer;
+
+public class ConsoleWriter implements Consumer<Writable> {
+
+ private String getSummary(DescriptiveStatistics stats) {
+ return String.format("Mean: %d, Std Dev: %d", (int)stats.getMean(), (int)Math.sqrt(stats.getVariance()));
+ }
+
+ @Override
+ public void accept(Writable writable) {
+ List<String> parts = new ArrayList<>();
+ Date date = writable.getDate();
+ for(Results r : writable.getResults()) {
+ Long eps = r.getEps();
+ if(eps != null) {
+ String part = String.format(r.getFormat(), eps);
+ if (r.getHistory().isPresent()) {
+ part += " (" + getSummary(r.getHistory().get()) + ")";
+ }
+ parts.add(part);
+ }
+ }
+ if(date != null) {
+ DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ String header = dateFormat.format(date) + " - ";
+ String emptyHeader = StringUtils.repeat(" ", header.length());
+ for (int i = 0; i < parts.size(); ++i) {
+ String part = parts.get(i);
+ if (i == 0) {
+ System.out.println(header + (part == null ? "" : part));
+ } else {
+ System.out.println(emptyHeader + (part == null ? "" : part));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java
new file mode 100644
index 0000000..3ed62bf
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writable.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.util.Date;
+import java.util.List;
+
+public class Writable {
+ private Date date;
+ private List<Results> results;
+ public Writable(Date date, List<Results> results) {
+ this.date = date;
+ this.results = results;
+ }
+
+ public Date getDate() {
+ return date;
+ }
+
+ public List<Results> getResults() {
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
new file mode 100644
index 0000000..a9d915b
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/load/monitor/writers/Writer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.load.monitor.writers;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.metron.performance.load.monitor.AbstractMonitor;
+import org.apache.metron.performance.load.monitor.Results;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+public class Writer {
+
+ private int summaryLookback;
+ private List<LinkedList<Double>> summaries = new ArrayList<>();
+ private List<Consumer<Writable>> writers;
+ private List<AbstractMonitor> monitors;
+
+ public Writer(List<AbstractMonitor> monitors, int summaryLookback, List<Consumer<Writable>> writers) {
+ this.summaryLookback = summaryLookback;
+ this.writers = writers;
+ this.monitors = monitors;
+ for(AbstractMonitor m : monitors) {
+ this.summaries.add(new LinkedList<>());
+ }
+ }
+
+ public void writeAll() {
+ int i = 0;
+ Date dateOf = new Date();
+ List<Results> results = new ArrayList<>();
+ for(AbstractMonitor m : monitors) {
+ Long eps = m.get();
+ if(eps != null && summaryLookback > 0) {
+ LinkedList<Double> summary = summaries.get(i);
+ addToLookback(eps.doubleValue(), summary);
+ results.add(new Results(m.format(), m.name(), eps, Optional.of(getStats(summary))));
+ }
+ else {
+ results.add(new Results(m.format(), m.name(), eps, Optional.empty()));
+ }
+ i++;
+ }
+ Writable writable = new Writable(dateOf, results);
+ for(Consumer<Writable> writer : writers) {
+ writer.accept(writable);
+ }
+ }
+
+ private void addToLookback(Double d, LinkedList<Double> lookback) {
+ if(lookback.size() >= summaryLookback) {
+ lookback.removeFirst();
+ }
+ lookback.addLast(d);
+ }
+
+ public DescriptiveStatistics getStats(List<Double> avg) {
+ DescriptiveStatistics stats = new DescriptiveStatistics();
+ for(Double d : avg) {
+ if(d == null || Double.isNaN(d)) {
+ continue;
+ }
+ stats.addValue(d);
+ }
+ return stats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
new file mode 100644
index 0000000..f0a5b2c
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/BiasedSampler.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+public class BiasedSampler implements Sampler {
+ TreeMap<Double, Map.Entry<Integer, Integer>> discreteDistribution;
+ public BiasedSampler(List<Map.Entry<Integer, Integer>> discreteDistribution, int max) {
+ this.discreteDistribution = createDistribution(discreteDistribution, max);
+ }
+
+ public static List<Map.Entry<Integer, Integer>> readDistribution(BufferedReader distrFile) throws IOException {
+ return readDistribution(distrFile, false);
+ }
+
+ public static List<Map.Entry<Integer, Integer>> readDistribution(BufferedReader distrFile, boolean quiet) throws IOException {
+ List<Map.Entry<Integer, Integer>> ret = new ArrayList<>();
+ if(!quiet) {
+ System.out.println("Using biased sampler with the following biases:");
+ }
+ int sumLeft = 0;
+ int sumRight = 0;
+ for(String line = null;(line = distrFile.readLine()) != null;) {
+ if(line.startsWith("#")) {
+ continue;
+ }
+ Iterable<String> it = Splitter.on(",").split(line.trim());
+ if(Iterables.size(it) != 2) {
+ throw new IllegalArgumentException(line + " should be a comma separated pair of integers, but was not.");
+ }
+ int left = Integer.parseInt(Iterables.getFirst(it, null));
+ int right = Integer.parseInt(Iterables.getLast(it, null));
+ if(left <= 0 || left > 100) {
+ throw new IllegalArgumentException(line + ": " + (left < 0?left:right) + " must a positive integer in (0, 100]");
+ }
+ if(right <= 0 || right > 100) {
+ throw new IllegalArgumentException(line + ": " + right + " must a positive integer in (0, 100]");
+ }
+ if(!quiet) {
+ System.out.println("\t" + left + "% of templates will comprise roughly " + right + "% of sample output");
+ }
+ ret.add(new AbstractMap.SimpleEntry<>(left, right));
+ sumLeft += left;
+ sumRight += right;
+ }
+ if(sumLeft > 100 || sumRight > 100 ) {
+ throw new IllegalStateException("Neither columns must sum to beyond 100. " +
+ "The first column is the % of templates. " +
+ "The second column is the % of the sample that % of template occupies.");
+ }
+ else if(sumLeft < 100 && sumRight < 100) {
+ int left = 100 - sumLeft;
+ int right = 100 - sumRight;
+ if(!quiet) {
+ System.out.println("\t" + left + "% of templates will comprise roughly " + right + "% of sample output");
+ }
+ ret.add(new AbstractMap.SimpleEntry<>(left, right));
+ }
+ return ret;
+
+ }
+
+ private static TreeMap<Double, Map.Entry<Integer, Integer>>
+ createDistribution(List<Map.Entry<Integer, Integer>> discreteDistribution, int max) {
+ TreeMap<Double, Map.Entry<Integer, Integer>> ret = new TreeMap<>();
+ int from = 0;
+ double weight = 0.0d;
+ for(Map.Entry<Integer, Integer> kv : discreteDistribution) {
+ double pctVals = kv.getKey()/100.0;
+ int to = from + (int)(max*pctVals);
+ double pctWeight = kv.getValue()/100.0;
+ ret.put(weight, new AbstractMap.SimpleEntry<>(from, to));
+ weight += pctWeight;
+ from = to;
+ }
+ return ret;
+ }
+
+ @Override
+ public int sample(Random rng, int limit) {
+ double weight = rng.nextDouble();
+ Map.Entry<Integer, Integer> range = discreteDistribution.floorEntry(weight).getValue();
+ return rng.nextInt(range.getValue() - range.getKey()) + range.getKey();
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/46ad9d93/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java
----------------------------------------------------------------------
diff --git a/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java
new file mode 100644
index 0000000..e5f03c8
--- /dev/null
+++ b/metron-contrib/metron-performance/src/main/java/org/apache/metron/performance/sampler/Sampler.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.performance.sampler;
+
+import java.util.Random;
+
+public interface Sampler {
+ int sample(Random rng, int limit);
+}