You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/04/17 22:41:03 UTC

samza git commit: SAMZA-1666: Benchmark SystemProducer and SystemConsumer

Repository: samza
Updated Branches:
  refs/heads/master 5b953ac3e -> 12968cfb6


SAMZA-1666: Benchmark SystemProducer and SystemConsumer

* Tests to benchmark the performance of the system consumers and producers.
* Config to test the benchmark for the event hub system producer and consumer.

SystemConsumerBench and SystemProducerBench provides base generic implementation to test the benchmark for the system producers and consumers. Any new system that needs benchmark test needs a properties file.

The benchmark test itself is single threaded in the way it consumes and produces events. Scaling the benchmark tests right now involves running multiple processes of these tests in parallel.

Right now we just calculate the event rate, But in future we could create a logging metrics registry to hookup other metrics and log them in console along with event rate while the benchmark tests are being run.

Author: Srinivasulu Punuru <sp...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>, Wei Song<ws...@linkedin.com>

Closes #473 from srinipunuru/benchmark.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/12968cfb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/12968cfb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/12968cfb

Branch: refs/heads/master
Commit: 12968cfb6024705d6412b9b9917897f0eb794c3e
Parents: 5b953ac
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Tue Apr 17 15:40:59 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Apr 17 15:40:59 2018 -0700

----------------------------------------------------------------------
 samza-tools/config/bench-log4j.xml              |  35 +++++
 samza-tools/config/eh-bench.properties          |  26 ++++
 samza-tools/scripts/system-consumer-bench.sh    |  34 +++++
 .../scripts/system-consumer-with-samza-bench.sh |  34 +++++
 samza-tools/scripts/system-producer-bench.sh    |  34 +++++
 .../tools/benchmark/AbstractSamzaBench.java     | 153 +++++++++++++++++++
 .../benchmark/ConfigBasedSspGrouperFactory.java |  87 +++++++++++
 .../tools/benchmark/SystemConsumerBench.java    |  91 +++++++++++
 .../benchmark/SystemConsumerWithSamzaBench.java | 117 ++++++++++++++
 .../tools/benchmark/SystemProducerBench.java    | 124 +++++++++++++++
 10 files changed, 735 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/config/bench-log4j.xml
----------------------------------------------------------------------
diff --git a/samza-tools/config/bench-log4j.xml b/samza-tools/config/bench-log4j.xml
new file mode 100644
index 0000000..02f5ec8
--- /dev/null
+++ b/samza-tools/config/bench-log4j.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN" "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="fileAppender" class="org.apache.log4j.FileAppender">
+    <param name="File"   value="./eh-bench.log" />
+    <param name="Append" value="false" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p [%c{1}:%L] - %m%n"/>
+    </layout>
+  </appender>
+  <root>
+    <priority value ="info" />
+    <appender-ref ref="fileAppender" />
+  </root>
+</log4j:configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/config/eh-bench.properties
----------------------------------------------------------------------
diff --git a/samza-tools/config/eh-bench.properties b/samza-tools/config/eh-bench.properties
new file mode 100644
index 0000000..13b96a8
--- /dev/null
+++ b/samza-tools/config/eh-bench.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+systems.eventhub.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
+systems.eventhub.stream.list=eh
+streams.eh.eventhubs.namespace=
+streams.eh.eventhubs.entitypath=
+sensitive.streams.eh.eventhubs.sas.keyname=
+sensitive.streams.eh.eventhubs.sas.token=
+streams.eh.eventhubs.consumer.group=
+streams.eh.samza.physical.name=
+streams.eh.samza.system=eventhub
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/scripts/system-consumer-bench.sh
----------------------------------------------------------------------
diff --git a/samza-tools/scripts/system-consumer-bench.sh b/samza-tools/scripts/system-consumer-bench.sh
new file mode 100755
index 0000000..01240c3
--- /dev/null
+++ b/samza-tools/scripts/system-consumer-bench.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(dirname $0)
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/bench-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx1G -Xms1G"
+fi
+
+exec $base_dir/run-class.sh org.apache.samza.tools.benchmark.SystemConsumerBench "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/scripts/system-consumer-with-samza-bench.sh
----------------------------------------------------------------------
diff --git a/samza-tools/scripts/system-consumer-with-samza-bench.sh b/samza-tools/scripts/system-consumer-with-samza-bench.sh
new file mode 100755
index 0000000..a544133
--- /dev/null
+++ b/samza-tools/scripts/system-consumer-with-samza-bench.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(dirname $0)
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/bench-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx1G -Xms1G"
+fi
+
+exec $base_dir/run-class.sh org.apache.samza.tools.benchmark.SystemConsumerWithSamzaBench "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/scripts/system-producer-bench.sh
----------------------------------------------------------------------
diff --git a/samza-tools/scripts/system-producer-bench.sh b/samza-tools/scripts/system-producer-bench.sh
new file mode 100755
index 0000000..3665820
--- /dev/null
+++ b/samza-tools/scripts/system-producer-bench.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(dirname $0)
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/bench-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx1G -Xms1G"
+fi
+
+exec $base_dir/run-class.sh org.apache.samza.tools.benchmark.SystemProducerBench "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
new file mode 100644
index 0000000..9392aab
--- /dev/null
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.tools.benchmark;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.tools.CommandLineHelper;
+
+
+/**
+ * Base class for the samza benchmark tests
+ */
+
+public abstract class AbstractSamzaBench {
+  protected static final String OPT_SHORT_PROPERTIES_FILE = "p";
+  protected static final String OPT_LONG_PROPERTIES_FILE = "props";
+  protected static final String OPT_ARG_PROPERTIES_FILE = "PROPERTIES_FILE";
+  protected static final String OPT_DESC_PROPERTIES_FILE = "Path to the properties file.";
+
+  protected static final String OPT_SHORT_NUM_EVENTS = "n";
+  protected static final String OPT_LONG_NUM_EVENTS = "numEvents";
+  protected static final String OPT_ARG_NUM_EVENTS = "NUMBER_EVENTS";
+  protected static final String OPT_DESC_NUM_EVENTS = "Total number of events to consume.";
+
+  protected static final String OPT_SHORT_START_PARTITION = "sp";
+  protected static final String OPT_LONG_START_PARTITION = "startPartition";
+  protected static final String OPT_ARG_START_PARTITION = "START_PARTITION";
+  protected static final String OPT_DESC_START_PARTITION = "Start partition.";
+
+  protected static final String OPT_SHORT_END_PARTITION = "ep";
+  protected static final String OPT_LONG_END_PARTITION = "endPartition";
+  protected static final String OPT_ARG_END_PARTITION = "END_PARTITION";
+  protected static final String OPT_DESC_END_PARTITION = "End partition.";
+
+  protected static final String OPT_SHORT_STREAM = "s";
+  protected static final String OPT_LONG_STREAM = "streamId";
+  protected static final String OPT_ARG_STREAM = "STREAM_ID";
+  protected static final String OPT_DESC_STREAM = "STREAM ID.";
+  protected static final String CFG_STREAM_SYSTEM_NAME = "streams.%s.samza.system";
+  protected static final String CFG_SYSTEM_FACTORY = "systems.%s.samza.factory";
+  protected static final String CFG_PHYSICAL_STREAM_NAME = "streams.%s.samza.physical.name";
+  protected final Options options;
+  protected final CommandLine cmd;
+  protected SystemFactory factory;
+  protected Config config;
+  protected String systemName;
+  protected String physicalStreamName;
+  protected int startPartition;
+  protected int endPartition;
+  protected int totalEvents;
+  protected String streamId;
+
+  public AbstractSamzaBench(String scriptName, String args[]) throws ParseException {
+    options = new Options();
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_PROPERTIES_FILE, OPT_LONG_PROPERTIES_FILE, OPT_ARG_PROPERTIES_FILE,
+            true, OPT_DESC_PROPERTIES_FILE));
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_NUM_EVENTS, OPT_LONG_NUM_EVENTS, OPT_ARG_NUM_EVENTS, true,
+            OPT_DESC_NUM_EVENTS));
+
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_START_PARTITION, OPT_LONG_START_PARTITION, OPT_ARG_START_PARTITION,
+            true, OPT_DESC_START_PARTITION));
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_END_PARTITION, OPT_LONG_END_PARTITION, OPT_ARG_END_PARTITION, true,
+            OPT_DESC_END_PARTITION));
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_STREAM, OPT_LONG_STREAM, OPT_ARG_STREAM, true, OPT_DESC_STREAM));
+
+    addOptions(options);
+
+    CommandLineParser parser = new BasicParser();
+    try {
+      cmd = parser.parse(options, args);
+    } catch (Exception e) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(String.format("Error: %s.sh", scriptName), options);
+      throw e;
+    }
+  }
+
+  public void start() throws IOException, InterruptedException {
+    startPartition = Integer.parseInt(cmd.getOptionValue(OPT_SHORT_START_PARTITION));
+    endPartition = Integer.parseInt(cmd.getOptionValue(OPT_SHORT_END_PARTITION));
+    totalEvents = Integer.parseInt(cmd.getOptionValue(OPT_SHORT_NUM_EVENTS));
+    String propsFile = cmd.getOptionValue(OPT_SHORT_PROPERTIES_FILE);
+    streamId = cmd.getOptionValue(OPT_SHORT_STREAM);
+    Properties props = new Properties();
+    props.load(new FileInputStream(propsFile));
+    addMoreSystemConfigs(props);
+    config = convertToSamzaConfig(props);
+    systemName = config.get(String.format(CFG_STREAM_SYSTEM_NAME, streamId));
+    String systemFactory = config.get(String.format(CFG_SYSTEM_FACTORY, systemName));
+    physicalStreamName = config.get(String.format(CFG_PHYSICAL_STREAM_NAME, streamId));
+
+    factory = ReflectionUtils.createInstance(systemFactory);
+    if (factory == null) {
+      throw new RuntimeException("Cannot instantiate systemfactory " + systemFactory);
+    }
+  }
+
+  /**
+   * Derived classes can override this method to add any additional properties needed to create the System
+   * @param props Properties to which system configs can be added.
+   */
+  protected void addMoreSystemConfigs(Properties props) {
+  }
+
+  /**
+   * Derived classes can override this method to add any additional options that benchmark test may need.
+   * @param options Options to which additional command line options can be added.
+   */
+  protected void addOptions(Options options) {
+  }
+
+  Config convertToSamzaConfig(Properties props) {
+      Map<String, String> propsValue =
+          props.stringPropertyNames().stream().collect(Collectors.toMap(Function.identity(), props::getProperty));
+      return new MapConfig(propsValue);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/src/main/java/org/apache/samza/tools/benchmark/ConfigBasedSspGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/ConfigBasedSspGrouperFactory.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/ConfigBasedSspGrouperFactory.java
new file mode 100644
index 0000000..073fbb0
--- /dev/null
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/ConfigBasedSspGrouperFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.tools.benchmark;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Grouper that assigns only the subset of partitions configured to the task. This can be used only
+ * with {@link org.apache.samza.standalone.PassthroughJobCoordinator}.
+ */
+class ConfigBasedSspGrouperFactory implements SystemStreamPartitionGrouperFactory {
+
+  /**
+   * Comma separated list of partitions that needs to be assigned to this task.
+   */
+  public static final String CONFIG_STREAM_PARTITIONS = "streams.%s.partitions";
+  private static final String CFG_PARTITIONS_DELIMITER = ",";
+
+  @Override
+  public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) {
+    return new ConfigBasedSspGrouper(config);
+  }
+
+  private class ConfigBasedSspGrouper implements SystemStreamPartitionGrouper {
+    private final Config config;
+    private HashMap<String, Set<Integer>> _streamPartitionsMap = new HashMap<>();
+
+    public ConfigBasedSspGrouper(Config config) {
+      this.config = config;
+    }
+
+    @Override
+    public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) {
+      Set<SystemStreamPartition> filteredSsps = new HashSet<>();
+      for (SystemStreamPartition ssp : ssps) {
+        Set<Integer> partitions = getPartitions(ssp.getSystemStream());
+        if (partitions.contains(ssp.getPartition().getPartitionId())) {
+          filteredSsps.add(ssp);
+        }
+      }
+      HashMap<TaskName, Set<SystemStreamPartition>> group = new HashMap<>();
+      group.put(new TaskName("TestTask"), filteredSsps);
+      return group;
+    }
+
+    private Set<Integer> getPartitions(SystemStream systemStream) {
+      String streamName = systemStream.getStream();
+
+      if (!_streamPartitionsMap.containsKey(streamName)) {
+        String partitions = config.get(String.format(CONFIG_STREAM_PARTITIONS, streamName));
+        _streamPartitionsMap.put(streamName, Arrays.stream(partitions.split(CFG_PARTITIONS_DELIMITER))
+            .map(Integer::parseInt)
+            .collect(Collectors.toSet()));
+      }
+      return _streamPartitionsMap.get(streamName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerBench.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerBench.java
new file mode 100644
index 0000000..cbfc865
--- /dev/null
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerBench.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.tools.benchmark;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.cli.ParseException;
+import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+
+
+/**
+ * Generic benchmark test for {@link SystemConsumer}.
+ */
+public class SystemConsumerBench extends AbstractSamzaBench {
+
+  public static void main(String args[]) throws Exception {
+    SystemConsumerBench bench = new SystemConsumerBench(args);
+    bench.start();
+  }
+
+  public SystemConsumerBench(String args[]) throws ParseException {
+    super("system-consumer-bench", args);
+  }
+
+  public void start() throws IOException, InterruptedException {
+    super.start();
+    SystemAdmin systemAdmin = factory.getAdmin(systemName, config);
+    SystemStreamMetadata ssm =
+        systemAdmin.getSystemStreamMetadata(Collections.singleton(physicalStreamName)).get(physicalStreamName);
+
+    NoOpMetricsRegistry metricsRegistry = new NoOpMetricsRegistry();
+    Set<SystemStreamPartition> ssps = createSSPs(systemName, physicalStreamName, startPartition, endPartition);
+    SystemConsumer consumer = factory.getConsumer(systemName, config, metricsRegistry);
+    for (SystemStreamPartition ssp : ssps) {
+      consumer.register(ssp, ssm.getSystemStreamPartitionMetadata().get(ssp.getPartition()).getOldestOffset());
+    }
+
+    consumer.start();
+
+    System.out.println("starting consumption at " + Instant.now());
+    Instant startTime = Instant.now();
+    int numEvents = 0;
+    while (numEvents < totalEvents) {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollResult = consumer.poll(ssps, 2000);
+      numEvents += pollResult.values().stream().mapToInt(List::size).sum();
+    }
+
+    System.out.println("Ending consumption at " + Instant.now());
+    System.out.println(String.format("Event Rate is %s Messages/Sec ",
+        (numEvents * 1000 / Duration.between(startTime, Instant.now()).toMillis())));
+    consumer.stop();
+    System.exit(0);
+  }
+
+  Set<SystemStreamPartition> createSSPs(String systemName, String physicalStreamName, int startPartition,
+      int endPartition) {
+    return IntStream.range(startPartition, endPartition)
+        .mapToObj(x -> new SystemStreamPartition(systemName, physicalStreamName, new Partition(x)))
+        .collect(Collectors.toSet());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
new file mode 100644
index 0000000..5456db6
--- /dev/null
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.tools.benchmark;
+
+import com.google.common.base.Joiner;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.cli.ParseException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+
+
+/**
+ * Generic benchmark test for a test consumer but with samza framework
+ */
+public class SystemConsumerWithSamzaBench extends AbstractSamzaBench {
+  public SystemConsumerWithSamzaBench(String[] args) throws ParseException {
+    super("system-consumer-with-samza-bench", args);
+  }
+
+  public static void main(String args[]) throws Exception {
+    SystemConsumerBench bench = new SystemConsumerBench(args);
+    bench.start();
+  }
+
+  @Override
+  public void addMoreSystemConfigs(Properties props) {
+    props.put("app.runner.class", LocalApplicationRunner.class.getName());
+    List<Integer> partitions = IntStream.range(startPartition, endPartition).boxed().collect(Collectors.toList());
+    props.put(JobConfig.JOB_NAME(), "SamzaBench");
+    props.put(JobConfig.PROCESSOR_ID(), "1");
+    props.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    props.put(String.format(ConfigBasedSspGrouperFactory.CONFIG_STREAM_PARTITIONS, streamId),
+        Joiner.on(",").join(partitions));
+    props.put(TaskConfig.GROUPER_FACTORY(), ConfigBasedSspGrouperFactory.class.getName());
+  }
+
+  public void start() throws IOException, InterruptedException {
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    super.start();
+    MessageConsumer consumeFn = new MessageConsumer();
+    StreamApplication app = (graph, config) -> {
+      MessageStream<Object> stream = graph.getInputStream(streamId);
+      stream.map(consumeFn);
+    };
+
+    runner.run(app);
+
+    while (consumeFn.getEventsConsumed() < totalEvents) {
+      Thread.sleep(10);
+    }
+
+    Instant endTime = Instant.now();
+
+    runner.kill(app);
+
+    System.out.println("\n*******************");
+    System.out.println(String.format("Started at %s Ending at %s ", consumeFn.startTime, endTime));
+    System.out.println(String.format("Event Rate is %s Messages/Sec ",
+        (consumeFn.getEventsConsumed() * 1000 / Duration.between(consumeFn.startTime, Instant.now()).toMillis())));
+
+    System.out.println(
+        "Event Rate is " + consumeFn.getEventsConsumed() * 1000 / Duration.between(consumeFn.startTime, endTime).toMillis());
+    System.out.println("*******************\n");
+
+    System.exit(0);
+  }
+
+  private class MessageConsumer implements MapFunction<Object, Object> {
+    AtomicInteger eventsConsumed = new AtomicInteger(0);
+    volatile Instant startTime;
+
+    @Override
+    public Object apply(Object message) {
+
+      eventsConsumed.incrementAndGet();
+      if (eventsConsumed.get() == 1) {
+        startTime = Instant.now();
+      }
+      return message;
+    }
+
+    public int getEventsConsumed() {
+      return eventsConsumed.get();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemProducerBench.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemProducerBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemProducerBench.java
new file mode 100644
index 0000000..6c2a5f2
--- /dev/null
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemProducerBench.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.tools.benchmark;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.tools.CommandLineHelper;
+import org.apache.samza.tools.RandomValueGenerator;
+import org.apache.samza.util.NoOpMetricsRegistry;
+
+
+/**
+ * Generic benchmark test for a {@link SystemProducer}.
+ */
+public class SystemProducerBench extends AbstractSamzaBench {
+
+  private static final String OPT_SHORT_MESSAGE_SIZE = "sz";
+  private static final String OPT_LONG_MESSAGE_SIZE = "size";
+  private static final String OPT_ARG_MESSAGE_SIZE = "MESSAGE_SIZE";
+  private static final String OPT_DESC_MESSAGE_SIZE = "Size of the message in bytes.";
+
+  private byte[] value;
+
+  public static void main(String args[]) throws Exception {
+    SystemProducerBench bench = new SystemProducerBench(args);
+    bench.start();
+  }
+
+  public SystemProducerBench(String args[]) throws ParseException {
+    super("system-producer", args);
+  }
+
+  public void addOptions(Options options) {
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_MESSAGE_SIZE, OPT_LONG_MESSAGE_SIZE, OPT_ARG_MESSAGE_SIZE, true,
+            OPT_DESC_MESSAGE_SIZE));
+  }
+
+  public void start() throws IOException, InterruptedException {
+
+    super.start();
+    String source = "SystemProducerBench";
+
+    int size = Integer.parseInt(cmd.getOptionValue(OPT_SHORT_MESSAGE_SIZE));
+    RandomValueGenerator randGenerator = new RandomValueGenerator(System.currentTimeMillis());
+    value = randGenerator.getNextString(size, size).getBytes();
+
+    NoOpMetricsRegistry metricsRegistry = new NoOpMetricsRegistry();
+    List<SystemStreamPartition> ssps = createSSPs(systemName, physicalStreamName, startPartition, endPartition);
+    SystemProducer producer = factory.getProducer(systemName, config, metricsRegistry);
+    producer.register(source);
+    producer.start();
+
+    System.out.println("starting production at " + Instant.now());
+    Instant startTime = Instant.now();
+    for (int index = 0; index < totalEvents; index++) {
+      SystemStreamPartition ssp = ssps.get(index % ssps.size());
+      OutgoingMessageEnvelope messageEnvelope = createMessageEnvelope(ssp, index);
+      producer.send(source, messageEnvelope);
+    }
+
+    System.out.println("Ending production at " + Instant.now());
+    System.out.println(String.format("Event Rate is %s Messages/Sec",
+        (totalEvents * 1000 / Duration.between(startTime, Instant.now()).toMillis())));
+
+    producer.flush(source);
+
+    System.out.println("Ending flush at " + Instant.now());
+    System.out.println(String.format("Event Rate with flush is %s Messages/Sec",
+        (totalEvents * 1000 / Duration.between(startTime, Instant.now()).toMillis())));
+    producer.stop();
+    System.exit(0);
+  }
+
+  /**
+   * Naive create message implementation that uses the same random string for each of the message.
+   * If a system producer wants to test with a specific type of messages, It needs to override this method.
+   */
+  OutgoingMessageEnvelope createMessageEnvelope(SystemStreamPartition ssp, int index) {
+    return new OutgoingMessageEnvelope(ssp.getSystemStream(), String.valueOf(index), value);
+  }
+
+  /**
+   * Simple implementation to create SSPs that assumes that the partitions are ordered list of integers.
+   */
+  List<SystemStreamPartition> createSSPs(String systemName, String physicalStreamName, int startPartition,
+      int endPartition) {
+    return IntStream.range(startPartition, endPartition)
+        .mapToObj(x -> new SystemStreamPartition(systemName, physicalStreamName, new Partition(x)))
+        .collect(Collectors.toList());
+  }
+}