You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/11/07 15:50:42 UTC

[3/5] incubator-rya git commit: RYA-356 Added a Twill App for running the periodic service. Closes #248.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java b/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java
new file mode 100644
index 0000000..552324e
--- /dev/null
+++ b/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.twill.yarn;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration;
+import org.apache.rya.periodic.notification.twill.PeriodicNotificationTwillApp;
+import org.apache.rya.periodic.notification.twill.PeriodicNotificationTwillRunnable;
+import org.apache.twill.api.ClassAcceptor;
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.yarn.YarnTwillRunnerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+
+/**
+ * This class is responsible for starting and stopping the {@link PeriodicNotificationTwillApp} on a Hadoop YARN cluster.
+ */
+public class PeriodicNotificationTwillRunner implements AutoCloseable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PeriodicNotificationTwillRunner.class);
+
+    private final YarnConfiguration yarnConfiguration;
+    private final TwillRunnerService twillRunner;
+    private final File configFile;
+
+    /**
+     *
+     * @param yarnZookeepers - The zookeeper connect string used by the Hadoop YARN cluster.
+     * @param configFile - The config file used by {@link PeriodicNotificationTwillApp}.  Typically notification.properties.
+     */
+    public PeriodicNotificationTwillRunner(final String yarnZookeepers, final File configFile) {
+        Preconditions.checkArgument(configFile.exists(), "Config File must exist");
+        Objects.requireNonNull(yarnZookeepers, "YARN Zookeepers must not be null.");
+        this.configFile = configFile;
+        yarnConfiguration = new YarnConfiguration();
+        twillRunner = new YarnTwillRunnerService(yarnConfiguration, yarnZookeepers);
+        twillRunner.start();
+
+        // sleep to give the YarnTwillRunnerService time to retrieve state from zookeeper
+        try {
+            Thread.sleep(1000);
+        } catch (final InterruptedException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
+     * Start an instance of the {@link PeriodicNotificationTwillApp}.
+     *
+     * @param interactive - If true, this method will block until the user terminates this JVM, at which point the
+     *            {@link PeriodicNotificationTwillApp} on the YARN cluster will also be terminated. If false, this
+     *            method will return after startup.
+     */
+    public void startApp(final boolean interactive) {
+        final String yarnClasspath = yarnConfiguration.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                Joiner.on(",").join(YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH));
+        final List<String> applicationClassPaths = Lists.newArrayList();
+        Iterables.addAll(applicationClassPaths, Splitter.on(",").split(yarnClasspath));
+        final TwillController controller = twillRunner
+                .prepare(new PeriodicNotificationTwillApp(configFile))
+                .addLogHandler(new PrinterLogHandler(new PrintWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8), true)))
+                .withApplicationClassPaths(applicationClassPaths)
+                //.withApplicationArguments(args)
+                //.withArguments(runnableName, args)
+                // .withBundlerClassAcceptor(new HadoopClassExcluder())
+                .start();
+
+        final ResourceReport r = getResourceReport(controller, 5, TimeUnit.MINUTES);
+        LOG.info("Received ResourceReport: {}", r);
+        LOG.info("{} started successfully!", PeriodicNotificationTwillApp.APPLICATION_NAME);
+
+        if(interactive) {
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        Futures.getUnchecked(controller.terminate());
+                    } finally {
+                        twillRunner.stop();
+                    }
+                }
+            });
+
+            try {
+                LOG.info("{} waiting termination by user.  Type ctrl-c to terminate.", PeriodicNotificationTwillApp.class.getSimpleName());
+                controller.awaitTerminated();
+            } catch (final ExecutionException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * Terminates all instances of the {@link PeriodicNotificationTwillApp} on the YARN cluster.
+     */
+    public void stopApp() {
+        LOG.info("Stopping any running instances...");
+
+        int counter = 0;
+        // It is possible that we have launched multiple instances of the app.  For now, stop them all, one at a time.
+        for(final TwillController c : twillRunner.lookup(PeriodicNotificationTwillApp.APPLICATION_NAME)) {
+            final ResourceReport report = c.getResourceReport();
+            LOG.info("Attempting to stop {} with YARN ApplicationId: {} and Twill RunId: {}", PeriodicNotificationTwillApp.APPLICATION_NAME, report.getApplicationId(), c.getRunId());
+            Futures.getUnchecked(c.terminate());
+            LOG.info("Stopped {} with YARN ApplicationId: {} and Twill RunId: {}", PeriodicNotificationTwillApp.APPLICATION_NAME, report.getApplicationId(), c.getRunId());
+            counter++;
+        }
+
+        LOG.info("Stopped {} instance(s) of {}", counter, PeriodicNotificationTwillApp.APPLICATION_NAME);
+    }
+
+    /**
+     * Blocks until a non-null Resource report is returned.
+     * @param controller - The controller to interrogate.
+     * @param timeout - The maximum time to poll {@controller}.  Use -1 for infinite polling.
+     * @param timeoutUnits - The units of {@code timeout}.
+     * @return The ResourceReport for the application.
+     * @throws IllegalStateException If a timeout occurs before a ResourceReport is returned.
+     */
+    private ResourceReport getResourceReport(final TwillController controller, final long timeout, final TimeUnit timeoutUnits) {
+        Preconditions.checkArgument(timeout >= -1, "timeout cannot be less than -1");
+        final long timeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, timeoutUnits);
+        final long sleepMillis = 1000; // how long to sleep between retrieval attempts.
+        long totalElapsedMillis = 0;
+        ResourceReport report = controller.getResourceReport();
+        while (reportIsLoading(report)) {
+            try {
+                Thread.sleep(sleepMillis);
+            } catch (final InterruptedException e) {
+                throw new IllegalStateException(e);
+            }
+            totalElapsedMillis += sleepMillis;
+            if ((timeout != -1) && (totalElapsedMillis >= timeoutMillis)) {
+                final String errorMessage = "Timeout while waiting for the Twill Application to start on YARN.  Total elapsed time: " + TimeUnit.SECONDS.convert(totalElapsedMillis, TimeUnit.MILLISECONDS) + "s.";
+                LOG.error(errorMessage);
+                throw new IllegalStateException(errorMessage);
+            }
+            if ((totalElapsedMillis % 5000) == 0) {
+                LOG.info("Waiting for the Twill Application to start on YARN... Total elapsed time: {}s.", TimeUnit.SECONDS.convert(totalElapsedMillis, TimeUnit.MILLISECONDS));
+            }
+            report = controller.getResourceReport();
+        }
+        return report;
+    }
+
+    /**
+     * Checks to see if the report has loaded.
+     * @param report - The {@link ResourceReport} for this Twill Application.
+     * @return Return true if the report is null or incomplete.  Return false if the report is completely loaded.
+     */
+    private boolean reportIsLoading(@Nullable final ResourceReport report) {
+        if(report == null) {
+            return true;
+        }
+
+        final String yarnApplicationID = report.getApplicationId();
+        final Collection<TwillRunResources> runnableResources = report.getResources().get(PeriodicNotificationTwillRunnable.TWILL_RUNNABLE_NAME);
+
+        if(runnableResources == null || runnableResources.isEmpty()) {
+            LOG.info("Received Resource Report for YARN ApplicationID: {}, runnable resources are still loading...", yarnApplicationID);
+            return true;
+        } else {
+            LOG.info("Received Resource Report for YARN ApplicationID: {}, runnable resources are loaded.", yarnApplicationID);
+            return false;
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if(twillRunner != null) {
+            twillRunner.stop();
+        }
+    }
+
+    private static class MainOptions {
+        @Parameter(names = { "-c", "--config-file" }, description = "PeriodicNotification Application config file", required = true)
+        private File configFile;
+
+        @Parameter(names = { "-z", "--yarn-zookeepers" }, description = "(Optional) YARN Zookeepers connect string.  If not specified, the value of 'accumulo.zookeepers' from the specified '--config-file' will be reused.", required = false)
+        private String zookeepers;
+    }
+
+
+    @Parameters(commandNames = { "start" }, separators = "=", commandDescription = "Start the PeriodicNotification Application on YARN")
+    private static class CommandStart {
+        @Parameter(names = { "-i", "--interactive" }, description = "(Optional) Interactive.  If specified, blocks the console until the user types ctrl-c.", required = false)
+        private boolean interactive;
+
+        //TODO future feature.
+        //@Parameter(names = { "-p", "--accumulo.password"}, description = "Leave value blank to be prompted interactively for the 'accumulo.password' of the 'accumulo.user' specified by the '--config-file'", password = true, required = true)
+        //private String password;
+    }
+
+    @Parameters(commandNames = { "stop" }, commandDescription = "Stops PeriodicNotification Applications on YARN")
+    private static class CommandStop {
+        //TODO future feature.
+        //@Parameter(names = { "-a", "--all" }, description = "Stops all PeriodicNotification Application instances.", required = false)
+        //private boolean all = true;
+
+        //@Parameter(names = { "-i", "--instances"}, description = "CSV List of application instances to be stopped", required = false)
+        //private List<String> instanceList;
+    }
+
+
+    public static void main(final String[] args) {
+
+        final MainOptions options = new MainOptions();
+        final String START = "start";
+        final String STOP = "stop";
+        String parsedCommand = null;
+        final CommandStart commandStart = new CommandStart();
+        final CommandStop commandStop = new CommandStop();
+        final JCommander cli = new JCommander(options);
+        cli.addCommand(START, commandStart);
+        cli.addCommand(STOP, commandStop);
+        cli.setProgramName(PeriodicNotificationTwillRunner.class.getName());
+        try {
+            cli.parse(args);
+            parsedCommand = cli.getParsedCommand();
+            if(parsedCommand == null) {
+                throw new ParameterException("A command must be specified.");
+            }
+        } catch (final ParameterException e) {
+            System.err.println("Error! Invalid input: " + e.getMessage());
+            cli.usage();
+            System.exit(1);
+        }
+
+        // load the config file
+        PeriodicNotificationApplicationConfiguration conf = null;
+        try (FileInputStream fin = new FileInputStream(options.configFile)) {
+            final Properties p = new Properties();
+            p.load(fin);
+            conf = new PeriodicNotificationApplicationConfiguration(p);
+        } catch (final Exception e) {
+            LOG.warn("Unable to load specified properties file", e);
+            System.exit(1);
+        }
+
+        // pick the correct zookeepers
+        String zookeepers;
+        if(options.zookeepers != null && !options.zookeepers.isEmpty()) {
+            zookeepers = options.zookeepers;
+        } else {
+            zookeepers = conf.getAccumuloZookeepers();
+        }
+
+        try (final PeriodicNotificationTwillRunner app = new PeriodicNotificationTwillRunner(zookeepers, options.configFile)) {
+            if(START.equals(parsedCommand)) {
+                app.startApp(commandStart.interactive);
+            } else if(STOP.equals(parsedCommand)) {
+                app.stopApp();
+            } else {
+                throw new IllegalStateException("Invalid Command."); // this state should be impossible.
+            }
+        } catch (final Exception e) {
+            LOG.warn("Error occurred.", e);
+            System.exit(1);
+        }
+    }
+
+    static class HadoopClassExcluder extends ClassAcceptor {
+        @Override
+        public boolean accept(final String className, final URL classUrl, final URL classPathUrl) {
+            // exclude hadoop but not hbase package
+            return !(className.startsWith("org.apache.hadoop") && !className.startsWith("org.apache.hadoop.hbase"));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh b/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh
new file mode 100644
index 0000000..8fa3497
--- /dev/null
+++ b/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# navigate to the project directory
+PROJECT_HOME=$(dirname $(cd $(dirname $0) && pwd))
+cd $PROJECT_HOME
+
+# setup the twill classpath
+. conf/twill-env.sh
+
+# echo "Using classpath: $TWILL_CP"
+
+# run the program
+$JAVA_HOME/bin/java -cp $TWILL_CP \
+  -Dlogback.configurationFile=conf/logback.xml \
+  org.apache.rya.periodic.notification.twill.yarn.PeriodicNotificationTwillRunner "$@"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/README.md
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill/README.md b/extras/periodic.notification/twill/README.md
new file mode 100644
index 0000000..c87ad2e
--- /dev/null
+++ b/extras/periodic.notification/twill/README.md
@@ -0,0 +1,36 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+## rya.periodic.notification.twill
+
+This project serves two purposes:
+
+1) Store all `org.apache.twill:twill-api` specific code to decouple this execution 
+environment dependency from the rest of Rya's implemenation logic.  This way it 
+should be easy to integrate Rya code with alternative execution environments.
+
+2) It can be tricky to shield Twill applications from the constraints of the Twill 
+runtime environment (specifically a Guava 13.0 dependency, among potentially others).  
+By controlling the packaging of this project and leveraging the maven-shade-plugin's 
+relocation capability, we can avoid current and future classpath conflicts and allow
+for a cleaner integration with Twill than we might get with the `BundledJarRunner`
+from `org.apache.twill:twill-ext`.
+
+Note, the distribution of this twill application can be found in 
+`rya.periodic.notification.twill.yarn`.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill/pom.xml b/extras/periodic.notification/twill/pom.xml
new file mode 100644
index 0000000..e22dd45
--- /dev/null
+++ b/extras/periodic.notification/twill/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.periodic.notification.parent</artifactId>
+        <version>3.2.12-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.periodic.notification.twill</artifactId>
+
+    <name>Apache Rya Periodic Notification Service on Twill </name>
+    <description>Twill Application for executing the Apache Rya Periodic Notification Service</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <!--  redirect any other logging frameworks to slf4j within the Twill runtime -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+
+        <!-- 
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.periodic.notification.api</artifactId>
+        </dependency>
+        -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.periodic.notification.service</artifactId>
+            <exclusions>
+                <!--  exclude logging implementations -->
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.twill</groupId>
+            <artifactId>twill-api</artifactId>
+            <version>0.12.0</version>
+        </dependency>
+
+        <!-- Mark Accumulo Hadoop and Zookeeper as provided dependencies -->
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- 
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <version>2.3.7</version>
+                <extensions>true</extensions>
+                <executions>
+                    <execution>
+                        <id>bundle</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>bundle</goal>
+                        </goals>
+                        <configuration>
+                            <classifier>bundle</classifier>
+                            <instructions>
+                                <Embed-Dependency>*;inline=false</Embed-Dependency>
+                                <Embed-Transitive>true</Embed-Transitive>
+                                <Embed-Directory>lib</Embed-Directory>
+                            </instructions>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>twill-app</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <shadedClassifierName>twill-app</shadedClassifierName>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
+                            </transformers>
+                            <relocations>
+                                <!--  relocate the more modern version of guava to avoid classpath conflicts -->
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.rya.shaded.com.google.common</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <artifactSet>
+                                <excludes>
+                                    <!--  exclude logging implementations if the above exclusions/scoping don't catch them -->
+                                    <exclude>commons-logging:commons-logging</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j12</exclude>
+                                    <exclude>log4j:log4j</exclude>
+                                </excludes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java
new file mode 100644
index 0000000..d2a6125
--- /dev/null
+++ b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.twill;
+
+import java.io.File;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.ResourceSpecification.SizeUnit;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillSpecification;
+
+public class PeriodicNotificationTwillApp implements TwillApplication {
+
+
+    private final File configFile;
+    public static final String APPLICATION_NAME = PeriodicNotificationTwillApp.class.getSimpleName();
+
+    public PeriodicNotificationTwillApp(final File configFile) {
+        this.configFile = configFile;
+    }
+
+    @Override
+    public TwillSpecification configure() {
+        return TwillSpecification.Builder.with()
+                .setName(APPLICATION_NAME)
+                .withRunnable()
+                    .add(PeriodicNotificationTwillRunnable.TWILL_RUNNABLE_NAME,
+                            new PeriodicNotificationTwillRunnable(),
+                            ResourceSpecification.Builder.with()
+                                .setVirtualCores(2)
+                                .setMemory(2, SizeUnit.GIGA)
+                                .setInstances(1)
+                                .build())
+                    .withLocalFiles()
+                    .add(PeriodicNotificationTwillRunnable.CONFIG_FILE_NAME, configFile)
+                    .apply()
+                .anyOrder()
+                .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java
new file mode 100644
index 0000000..8695655
--- /dev/null
+++ b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.twill;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.rya.periodic.notification.application.PeriodicApplicationException;
+import org.apache.rya.periodic.notification.application.PeriodicNotificationApplication;
+import org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration;
+import org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationFactory;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.Command;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.api.TwillRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PeriodicNotificationTwillRunnable extends AbstractTwillRunnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(PeriodicNotificationTwillRunnable.class);
+
+    public static final String TWILL_RUNNABLE_NAME = PeriodicNotificationTwillRunnable.class.getSimpleName();
+    public static final String CONFIG_FILE_NAME = "notification.properties";
+
+    private PeriodicNotificationApplication app;
+
+    /**
+     * Called when the container process starts. Executed in container machine. If any exception is thrown from this
+     * method, this runnable won't get retry.
+     *
+     * @param context Contains information about the runtime context.
+     */
+    @Override
+    public void initialize(final TwillContext context) {
+        logger.info("Initializing the PeriodicNotificationApplication.");
+
+        final File propsFile = new File(CONFIG_FILE_NAME);
+        final PeriodicNotificationApplicationConfiguration conf;
+        try (final FileInputStream fin = new FileInputStream(propsFile)) {
+            final Properties p = new Properties();
+            p.load(fin);
+            logger.debug("Loaded properties: {}", p);
+            conf = new PeriodicNotificationApplicationConfiguration(p);
+        } catch (final Exception e) {
+            logger.error("Error loading notification properties", e);
+            throw new RuntimeException(e); // kill the Runnable
+        }
+
+        try {
+            this.app = PeriodicNotificationApplicationFactory.getPeriodicApplication(conf);
+        } catch (final PeriodicApplicationException e) {
+            logger.error("Error occurred creating PeriodicNotificationApplication", e);
+            throw new RuntimeException(e);  // kill the Runnable
+        }
+    }
+
+    /**
+     * Called when a command is received. A normal return denotes the command has been processed successfully, otherwise
+     * {@link Exception} should be thrown.
+     * @param command Contains details of the command.
+     * @throws Exception
+     */
+    @Override
+    public void handleCommand(final Command command) throws Exception {
+        // no-op
+    }
+
+    @Override
+    public void run() {
+        logger.info("Starting up the PeriodicNotificationApplication.");
+        app.start();
+        try {
+            logger.info("Blocking thread termination until the PeriodicNotificationApplication is stopped.");
+            app.blockUntilFinished();
+        } catch (IllegalStateException | ExecutionException | InterruptedException e) {
+            logger.error("An error occurred while blocking on the PeriodicNotificationApplication", e);
+        }
+        logger.info("Exiting the PeriodicNotificationApplication.");
+    }
+
+    /**
+     * Requests to stop the running service.
+     */
+    @Override
+    public void stop() {
+        logger.info("Stopping the PeriodicNotificationApplication...");
+        app.stop();
+    }
+
+    /**
+     * Called when the {@link TwillRunnable#run()} completed. Useful for doing
+     * resource cleanup. This method would only get called if the call to {@link #initialize(TwillContext)} was
+     * successful.
+     */
+    @Override
+    public void destroy() {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/README.md
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/README.md b/extras/rya.benchmark/README.md
new file mode 100644
index 0000000..228dfaf
--- /dev/null
+++ b/extras/rya.benchmark/README.md
@@ -0,0 +1,77 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License. 
+-->
+
+Benchmark Optimizations
+
+
+## KafkaLatencyBenchmark
+
+Several strategies for partitioning the rya_pcj_updater table.  If other tablet start hot spotting, they can be further split similar to how `STATEMENT_PATTERN_` is shown.
+```
+addsplits AGGREGATION_ JOIN_ PROJECTION_ QUERY_ STATEMENT_PATTERN_ urn -t rya_pcj_updater
+
+# or
+addsplits AGGREGATION_ JOIN_ PROJECTION_ QUERY_ STATEMENT_PATTERN_0 STATEMENT_PATTERN_4 STATEMENT_PATTERN_8 STATEMENT_PATTERN_c urn -t rya_pcj_updater
+
+# or
+addsplits AGGREGATION_ JOIN_ PROJECTION_ QUERY_ STATEMENT_PATTERN_0 STATEMENT_PATTERN_1 STATEMENT_PATTERN_2 STATEMENT_PATTERN_3 STATEMENT_PATTERN_4 STATEMENT_PATTERN_5 STATEMENT_PATTERN_6 STATEMENT_PATTERN_7 STATEMENT_PATTERN_8 STATEMENT_PATTERN_9 STATEMENT_PATTERN_a STATEMENT_PATTERN_b STATEMENT_PATTERN_c STATEMENT_PATTERN_d STATEMENT_PATTERN_e STATEMENT_PATTERN_f urn -t rya_pcj_updater
+
+# then ensure the splits have been applied.
+compact -t rya_pcj_updater
+```
+
+It is also possible to lower the table's split threshold to generate more tablets.
+```
+root@accumulo> config -t rya_pcj_updater -s table.split.threshold=100M
+```
+
+Identify which tablets are on what hosts and what particular data you might be 
+hotspotting on.  Note that the tablet splits are ordered lexicographically, and 
+the split point is exclusive.  So the tablet that contains AGGREGATION_ data is
+ actually contained on the tablet with the split point label: 4qn;JOIN_.
+```
+root@accumulo> tables -l
+...
+rya_osp       =>       4qr
+rya_pcj_updater =>       4qn
+rya_po        =>       4qq
+...
+
+root@accumulo> scan -t accumulo.metadata -b 4qn; -e 4qn< -c loc
+4qn;AGGREGATION_ loc:25e09c3a40b000e []    10.10.10.10:9997
+4qn;JOIN_ loc:45e09c3a2260012 []    10.10.10.11:9997
+4qn;PROJECTION_ loc:55e09c3a2cf0014 []    10.10.10.12:9997
+4qn;QUERY_ loc:35e09c3a2080021 []    10.10.10.13:9997
+4qn;STATEMENT_PATTERN_0 loc:16e09c3a436001d []    10.10.10.14:9997
+4qn;STATEMENT_PATTERN_4 loc:17e09c3a436001d []    10.10.10.15:9997
+4qn;STATEMENT_PATTERN_8 loc:18e09c3a436001d []    10.10.10.16:9997
+4qn;STATEMENT_PATTERN_c loc:19e09c3a436001d []    10.10.10.17:9997
+4qn;urn loc:15e09c3a4360019 []    10.10.10.18:9997
+4qn< loc:55e09c3a2cf0012 []    10.10.10.19:9997
+```
+
+Use the `RegexGroupBalancer` to ensure all STATEMENT_PATTERN_x tablets are evenly distributed between all available tablet servers.  This distribution strategy will also apply to other groups that are specified in the regex.
+```
+root@accumulo> config -t rya_pcj_updater -s table.custom.balancer.group.regex.pattern=(AGGREGATION_|JOIN_|PROJECTION_|QUERY_|STATEMENT_PATTERN_|urn).*
+#root@accumulo> config -t rya_pcj_updater -s table.custom.balancer.group.regex.default=AGGREGATION_
+root@accumulo> config -t rya_pcj_updater -s table.balancer=org.apache.accumulo.server.master.balancer.RegexGroupBalancer
+```
+References:  
+https://blogs.apache.org/accumulo/entry/balancing_groups_of_tablets  
+https://reviews.apache.org/r/29230/diff/2#index_header

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/pom.xml b/extras/rya.benchmark/pom.xml
index fce434b..8468290 100644
--- a/extras/rya.benchmark/pom.xml
+++ b/extras/rya.benchmark/pom.xml
@@ -55,6 +55,13 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        
+        <!-- Fluo runtime dependency -->
+        <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-core</artifactId>
+            <scope>runtime</scope>
+        </dependency>
 
         <!-- Testing -->
         <dependency>
@@ -140,7 +147,6 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
-                            <finalName>benchmarks</finalName>
                             <transformers>
                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                     <mainClass>org.openjdk.jmh.Main</mainClass>
@@ -164,6 +170,23 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>create-binary-distribution</id>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <descriptors>
+                                <descriptor>src/main/assembly/binary-release.xml</descriptor>
+                            </descriptors>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/assembly/binary-release.xml
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/assembly/binary-release.xml b/extras/rya.benchmark/src/main/assembly/binary-release.xml
new file mode 100644
index 0000000..374213f
--- /dev/null
+++ b/extras/rya.benchmark/src/main/assembly/binary-release.xml
@@ -0,0 +1,33 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly
+    xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
+    <id>bin</id>
+    <formats>
+        <format>tar.gz</format>
+    </formats>
+    <includeBaseDirectory>true</includeBaseDirectory>
+    <componentDescriptors>
+        <componentDescriptor>src/main/assembly/component-release.xml</componentDescriptor>
+    </componentDescriptors>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/assembly/component-release.xml
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/assembly/component-release.xml b/extras/rya.benchmark/src/main/assembly/component-release.xml
new file mode 100644
index 0000000..0d99717
--- /dev/null
+++ b/extras/rya.benchmark/src/main/assembly/component-release.xml
@@ -0,0 +1,81 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<component
+    xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/component/1.1.3"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/component/1.1.3 http://maven.apache.org/xsd/component-1.1.3.xsd">
+    <fileSets>
+        <fileSet>
+            <directory>src/main/config</directory>
+            <outputDirectory>conf</outputDirectory>
+            <directoryMode>0755</directoryMode>
+            <fileMode>0644</fileMode>
+            <lineEnding>unix</lineEnding>
+            <filtered>false</filtered>
+            <includes>
+                <include>*.options</include>
+                <include>*.properties</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>src/main/scripts</directory>
+            <outputDirectory>bin</outputDirectory>
+            <directoryMode>0755</directoryMode>
+            <fileMode>0755</fileMode>
+            <includes>
+                <include>*.sh</include>
+            </includes>
+            <lineEnding>unix</lineEnding>
+            <filtered>true</filtered>
+        </fileSet>
+        <fileSet>
+            <directory>src/main/scripts</directory>
+            <outputDirectory>bin</outputDirectory>
+            <directoryMode>0755</directoryMode>
+            <fileMode>0644</fileMode>
+            <includes>
+                <include>*.bat</include>
+            </includes>
+            <lineEnding>dos</lineEnding>
+            <filtered>true</filtered>
+        </fileSet>
+
+        <!-- create an empty directory for log files -->
+        <fileSet>
+            <directory>src/main/assembly</directory>
+            <outputDirectory>logs</outputDirectory>
+            <directoryMode>755</directoryMode>
+            <excludes>
+                <exclude>*</exclude>
+            </excludes>
+        </fileSet>
+
+        <fileSet>
+            <directory>${project.build.directory}</directory>
+            <outputDirectory>lib</outputDirectory>
+            <directoryMode>755</directoryMode>
+            <fileMode>0644</fileMode>
+            <includes>
+                <include>${project.artifactId}-${project.version}-shaded.jar</include>
+            </includes>
+        </fileSet>
+    </fileSets>
+</component>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/common.options
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/config/common.options b/extras/rya.benchmark/src/main/config/common.options
new file mode 100644
index 0000000..9a0c7c1
--- /dev/null
+++ b/extras/rya.benchmark/src/main/config/common.options
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This file contains example configuration values and should
+# be modified to target your Rya/Kafka environment.
+
+--zookeepers
+zoo1,zoo2,zoo3,zoo4,zoo5
+
+--kafka-bootstrap-servers
+kafka1:9092,kafka2:9092
+
+--accumulo-instance
+accumuloInstance
+
+--rya-instance
+rya_
+
+--username
+accumuloUser
+
+# specifying --password here prompts the user to enter 
+# the accumulo password interactively on the shell
+--password
+
+# local directory for storing benchmark output
+--output-directory
+results
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/config/log4j.properties b/extras/rya.benchmark/src/main/config/log4j.properties
new file mode 100644
index 0000000..1101514
--- /dev/null
+++ b/extras/rya.benchmark/src/main/config/log4j.properties
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Valid levels:
+# TRACE, DEBUG, INFO, WARN, ERROR and FATAL
+log4j.rootCategory=INFO, CONSOLE, LOGFILE
+
+# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.CONSOLE.layout.ConversionPattern=[%p] %m%n
+log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c %x - %m%n
+
+# LOGFILE is set to be a File appender using a PatternLayout.
+log4j.appender.LOGFILE=org.apache.log4j.FileAppender
+log4j.appender.LOGFILE.File=logs/benchmark.log
+#log4j.appender.LOGFILE.Threshold=DEBUG
+log4j.appender.LOGFILE.Append=true
+
+log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.LOGFILE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+#log4j.appender.LOGFILE.layout=org.apache.log4j.EnhancedPatternLayout
+#log4j.appender.LOGFILE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n
+

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/periodic.options
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/config/periodic.options b/extras/rya.benchmark/src/main/config/periodic.options
new file mode 100644
index 0000000..51ac8ac
--- /dev/null
+++ b/extras/rya.benchmark/src/main/config/periodic.options
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+--ingest-iterations
+1000
+
+--ingest-observations-per-type
+10
+
+--ingest-types
+5
+
+--ingest-type-prefix
+car_
+
+--ingest-period-sec
+30
+
+--report-period-sec
+10
+
+--periodic-query-window
+15
+
+#every 30 seconds
+--periodic-query-period
+.5
+
+--periodic-query-time-units
+minutes
+
+--periodic-query-registration-topic
+notifications
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/projection.options
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/config/projection.options b/extras/rya.benchmark/src/main/config/projection.options
new file mode 100644
index 0000000..9bba2de
--- /dev/null
+++ b/extras/rya.benchmark/src/main/config/projection.options
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+--ingest-iterations
+1000
+
+--ingest-observations-per-type
+10
+
+--ingest-types
+5
+
+--ingest-type-prefix
+car_
+
+--ingest-period-sec
+30
+
+--report-period-sec
+10
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java
new file mode 100644
index 0000000..a848ebb
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.periodic;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.base.Objects;
+
+public class BenchmarkOptions {
+    @Parameter(names = { "-ii", "--ingest-iterations" }, description = "Number of ingest iterations.  Total data published is -i x -obs x -t", required = true)
+    private int numIterations;
+
+    @Parameter(names = { "-iobs", "--ingest-observations-per-type" }, description = "Observations per Type per Iteration to generate.", required = true)
+    private int observationsPerTypePerIteration;
+
+    @Parameter(names = { "-it", "--ingest-types" }, description = "The number of unique types to generate.", required = true)
+    private int numTypes;
+
+    @Parameter(names = { "-itp", "--ingest-type-prefix" }, description = "The prefix to use for a type, for example 'car_'", required = true)
+    private String typePrefix;
+
+    @Parameter(names = { "-ip", "--ingest-period-sec" }, description = "The period, in seconds between ingests of the data generated for one iteration.", required = true)
+    private int ingestPeriodSeconds;
+
+    @Parameter(names = { "-rp", "--report-period-sec" }, description = "The period, in seconds between persisting reports of the current state.", required = true)
+    private int resultPeriodSeconds;
+
+    public int getNumIterations() {
+        return numIterations;
+    }
+
+    public int getObservationsPerTypePerIteration() {
+        return observationsPerTypePerIteration;
+    }
+
+    public int getNumTypes() {
+        return numTypes;
+    }
+
+    public String getTypePrefix() {
+        return typePrefix;
+    }
+
+    public int getIngestPeriodSeconds() {
+        return ingestPeriodSeconds;
+    }
+
+    public int getResultPeriodSeconds() {
+        return resultPeriodSeconds;
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this)
+                .add("numIterations", numIterations)
+                .add("observationsPerTypePerIteration", observationsPerTypePerIteration)
+                .add("numTypes", numTypes)
+                .add("typePrefix", typePrefix)
+                .add("ingestPeriodSeconds", ingestPeriodSeconds)
+                .add("resultPeriodSeconds", resultPeriodSeconds)
+                .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java
new file mode 100644
index 0000000..fdd3b63
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.periodic;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+
+import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Generates sets of statements used for benchmarking
+ */
+public class BenchmarkStatementGenerator {
+
+    private static final Logger logger = LoggerFactory.getLogger(BenchmarkStatementGenerator.class);
+
+    private final ValueFactory vf;
+    private final DatatypeFactory dtf;
+
+    public BenchmarkStatementGenerator() throws DatatypeConfigurationException {
+        vf = new ValueFactoryImpl();
+        dtf = DatatypeFactory.newInstance();
+    }
+
+    /**
+     * Generates (numObservationsPerType x numTypes) statements of the form:
+     *
+     * <pre>
+     * urn:obs_n uri:hasTime zonedTime
+     * urn:obs_n uri:hasObsType typePrefix_m
+     * </pre>
+     *
+     * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by
+     * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes.
+     *
+     * @param numObservationsPerType - The quantity of observations per type to generate.
+     * @param numTypes - The number of types to generate observations for.
+     * @param typePrefix - The prefix to be used for the type literal in the statement.
+     * @param observationOffset - The offset to be used for determining the value of n in the above statements.
+     * @param zonedTime - The time to be used for all observations generated.
+     * @return A new list of all generated Statements.
+     */
+    public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix, final long observationOffset, final ZonedDateTime zonedTime) {
+        final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT);
+        final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
+        final List<Statement> statements = Lists.newArrayList();
+
+        for (long i = 0; i < numObservationsPerType; i++) {
+            for(int j = 0; j < numTypes; j++) {
+                final long observationId = observationOffset + i*numTypes + j;
+                //final String obsId = "urn:obs_" + Long.toHexString(observationId)  + "_" + observationId;
+                //final String obsId = "urn:obs_" + observationId;
+                final String obsId = "urn:obs_" + String.format("%020d", observationId);
+                final String type = typePrefix + j;
+                //logger.info(obsId + " " + type + " " + litTime);
+                statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime));
+                statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type)));
+            }
+        }
+
+        return statements;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java
new file mode 100644
index 0000000..e87e6a9
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.periodic;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.base.Objects;
+
+public class CommonOptions {
+
+    @Parameter(names = { "-u", "--username" }, description = "Accumulo Username", required = true)
+    private String username;
+
+    @Parameter(names = { "-p", "--password" }, description = "Accumulo Password", required = true, password=true)
+    private String password;
+
+    @Parameter(names = { "-ai", "--accumulo-instance" }, description = "Accumulo Instance", required = true)
+    private String accumuloInstance;
+
+    @Parameter(names = { "-ri", "--rya-instance" }, description = "Rya Instance", required = true)
+    private String ryaInstance;
+
+    @Parameter(names = { "-z", "--zookeepers" }, description = "Accumulo Zookeepers", required = true)
+    private String zookeepers;
+
+    @Parameter(names = { "-k", "--kafka-bootstrap-servers" }, description = "Kafka bootstrap server string, for example: kafka1:9092,kafka2:9092", required = true)
+    private String kafkaBootstrap;
+
+    @Parameter(names = { "-o", "--output-directory" }, description = "The directory that output should be persisted to.", required = true)
+    private File outputDirectory;
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public String getAccumuloInstance() {
+        return accumuloInstance;
+    }
+
+    public String getRyaInstance() {
+        return ryaInstance;
+    }
+
+    public String getZookeepers() {
+        return zookeepers;
+    }
+
+    public String getKafkaBootstrap() {
+        return kafkaBootstrap;
+    }
+
+    public File getOutputDirectory() {
+        return outputDirectory;
+    }
+
+    public RyaClient buildRyaClient() throws AccumuloException, AccumuloSecurityException {
+        final Instance instance = new ZooKeeperInstance(accumuloInstance, zookeepers);
+        final AccumuloConnectionDetails accumuloDetails = new AccumuloConnectionDetails(username, password.toCharArray(), accumuloInstance, zookeepers);
+        return AccumuloRyaClientFactory.build(accumuloDetails, instance.getConnector(username, new PasswordToken(password)));
+    }
+
+    public Properties getKafkaConsumerProperties() {
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap);
+        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProps.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "5000");  // reduce this value to 5 seconds for the scenario where we subscribe before the topic exists.
+        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return consumerProps;
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this)
+                .add("username", username)
+                .add("password", "[redacted]")
+                .add("accumuloInstance", accumuloInstance)
+                .add("ryaInstance", ryaInstance)
+                .add("zookeepers", zookeepers)
+                .add("kafkaBootstrap", kafkaBootstrap)
+                .add("outputDirectory", outputDirectory)
+                .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java
new file mode 100644
index 0000000..454f7e0
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.periodic;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.xml.datatype.DatatypeConfigurationException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.LoadStatements;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import org.openrdf.model.Statement;
+import org.openrdf.query.BindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.ParameterException;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * This benchmark is useful for determining the performance characteristics of a Rya Triplestore under continuous ingest
+ * that has a PCJ Query that is incrementally updated by the Rya PCJ Fluo App (aka PCJ Updater).
+ * <p>
+ * This benchmark periodically loads a batch of data into Rya and reports the delay of the following query
+ *
+ * <pre>
+ * PREFIX time: &lt;http://www.w3.org/2006/time#&gt;
+ * SELECT ?type (count(?obs) as ?total)
+ * WHERE {
+ *   ?obs &lt;uri:hasTime&gt; ?time .
+ *   ?obs &lt;uri:hasObsType&gt; ?type .
+ * }
+ * GROUP BY ?type
+ * </pre>
+ * <p>
+ * This benchmark is useful for characterizing any latency between Truth (data ingested to Rya) and Reported (query
+ * result published to Kafka).
+ * <p>
+ * This benchmark is also useful for stress testing a Fluo App configuration and Accumulo Tablet configuration.
+ * <p>
+ * This benchmark expects the provided RyaInstance to have already been constructed and an appropriately configured Rya
+ * PCJ Fluo App for that RyaInstance to be deployed on your YARN cluster.
+ */
+public class KafkaLatencyBenchmark implements AutoCloseable {
+
+    public static final Logger logger = LoggerFactory.getLogger(KafkaLatencyBenchmark.class);
+
+    /**
+     * Data structure for storing Type
+     */
+    private final Map<String, Stat> typeToStatMap = Maps.newTreeMap();
+
+    /**
+     * ThreadPool for publishing data and logging stats.
+     */
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(20);
+
+    private final CommonOptions options;
+    private final BenchmarkOptions benchmarkOptions;
+    private final RyaClient client;
+    DateTimeFormatter fsFormatter = DateTimeFormatter.ofPattern( "uuuu-MM-dd'T'HH-mm-ss" );
+    private final LocalDateTime startTime;
+    List<ScheduledFuture<?>> futureList = Lists.newArrayList();
+
+    public KafkaLatencyBenchmark(final CommonOptions options, final BenchmarkOptions benchmarkOptions) throws AccumuloException, AccumuloSecurityException {
+        this.options = Objects.requireNonNull(options);
+        this.benchmarkOptions = Objects.requireNonNull(benchmarkOptions);
+        this.client = Objects.requireNonNull(options.buildRyaClient());
+        this.startTime = LocalDateTime.now();
+
+        logger.info("Running {} with the following input parameters:\n{}\n{}", this.getClass(), options, benchmarkOptions);
+    }
+
+    @Override
+    public void close() throws Exception {
+        logger.info("Stopping threads.");
+        scheduler.shutdown();
+
+        cancelAllScheduledTasks();
+        logger.info("Waiting for all threads to terminate...");
+        scheduler.awaitTermination(1, TimeUnit.DAYS);
+        logger.info("All threads terminated.");
+    }
+
+    private void cancelAllScheduledTasks() {
+        logger.info("Canceling all tasks");
+        for(final ScheduledFuture<?> task : futureList) {
+            task.cancel(false);
+        }
+        futureList.clear();
+    }
+
+
+    public void start() throws InstanceDoesNotExistException, RyaClientException {
+        logger.info("Issuing Query");
+        String topic;
+        boolean periodic;
+        if(benchmarkOptions instanceof PeriodicQueryCommand) {
+            topic = issuePeriodicQuery((PeriodicQueryCommand) benchmarkOptions);
+            periodic = true;
+        } else {
+            topic = issueQuery();
+            periodic = false;
+        }
+        logger.info("Query Issued. Received PCJ ID: {}", topic);
+        startDataIngestTask();
+        startStatsPrinterTask();
+        startCsvPrinterTask();
+
+        if(periodic) {
+            updatePeriodicStatsFromKafka(topic);  // blocking operation.
+        } else {
+            updateStatsFromKafka(topic);  // blocking operation.
+        }
+
+    }
+
+    private String issueQuery() throws InstanceDoesNotExistException, RyaClientException {
+        final String sparql = "prefix time: <http://www.w3.org/2006/time#> "
+                + "select ?type (count(?obs) as ?total) where { "
+                + "    ?obs <uri:hasTime> ?time. "
+                + "    ?obs <uri:hasObsType> ?type "
+                + "} "
+                + "group by ?type";
+
+        logger.info("Query: {}", sparql);
+        return client.getCreatePCJ().createPCJ(options.getRyaInstance(), sparql, ImmutableSet.of(ExportStrategy.KAFKA));
+    }
+
+    private String issuePeriodicQuery(final PeriodicQueryCommand periodicOptions) throws InstanceDoesNotExistException, RyaClientException {
+        final String sparql = "prefix function: <http://org.apache.rya/function#> "
+                + "prefix time: <http://www.w3.org/2006/time#> "
+                + "select ?type (count(?obs) as ?total) where {"
+                + "Filter(function:periodic(?time, " +  periodicOptions.getPeriodicQueryWindow() + ", " + periodicOptions.getPeriodicQueryPeriod() + ", time:" + periodicOptions.getPeriodicQueryTimeUnits() + ")) "
+                + "?obs <uri:hasTime> ?time. "
+                + "?obs <uri:hasObsType> ?type } "
+                + "group by ?type";
+        logger.info("Query: {}", sparql);
+        final String queryId = client.getCreatePeriodicPCJ().createPeriodicPCJ(options.getRyaInstance(), sparql, periodicOptions.getPeriodicQueryRegistrationTopic(), options.getKafkaBootstrap());
+        logger.info("Received query id: {}", queryId);
+        return queryId.substring("QUERY_".length());  // remove the QUERY_ prefix.
+    }
+
+
+    private void startDataIngestTask() {
+        final int initialPublishDelaySeconds = 1;
+
+        final LoadStatements loadCommand = client.getLoadStatements();
+
+        // initialize the stats map
+        for(int typeId = 0; typeId < benchmarkOptions.getNumTypes(); typeId++) {
+            final String type = benchmarkOptions.getTypePrefix() + typeId;
+            typeToStatMap.put(type, new Stat(type));
+        }
+
+        final LoaderTask loaderTask = new LoaderTask(benchmarkOptions.getNumIterations(),
+                benchmarkOptions.getObservationsPerTypePerIteration(), benchmarkOptions.getNumTypes(),
+                benchmarkOptions.getTypePrefix(), loadCommand, options.getRyaInstance());
+
+        final ScheduledFuture<?> loaderTaskFuture = scheduler.scheduleAtFixedRate(loaderTask, initialPublishDelaySeconds, benchmarkOptions.getIngestPeriodSeconds(), TimeUnit.SECONDS);
+        futureList.add(loaderTaskFuture);
+
+        loaderTask.setShutdownOperation(() -> {
+            cancelAllScheduledTasks();
+        });
+    }
+
+    private void startStatsPrinterTask() {
+        final Runnable statLogger = () -> {
+            final StringBuilder sb = new StringBuilder();
+            sb.append("Results\n");
+            for(final Stat s : typeToStatMap.values()) {
+                sb.append(s).append("\n");
+            }
+            logger.info("{}",sb);
+        };
+
+        final int initialPrintDelaySeconds = 11;
+
+        final ScheduledFuture<?> statLoggerFuture = scheduler.scheduleAtFixedRate(statLogger, initialPrintDelaySeconds, benchmarkOptions.getResultPeriodSeconds(), TimeUnit.SECONDS);
+        futureList.add(statLoggerFuture);
+    }
+
+    private void startCsvPrinterTask() {
+
+        final int initialPrintDelaySeconds = 11;
+        final long printPeriodSeconds = benchmarkOptions.getResultPeriodSeconds();
+
+        final Runnable csvPrinterTask = new Runnable() {
+            private final AtomicInteger printCounter = new AtomicInteger(0);
+            private final File outFile = new File(options.getOutputDirectory(), "run-" + fsFormatter.format(startTime) + ".csv");
+
+            @Override
+            public void run() {
+                final int count = printCounter.getAndIncrement();
+                final StringBuilder sb = new StringBuilder();
+                if(count == 0) {
+                    sb.append("elapsed-seconds");
+                    for(final Stat s : typeToStatMap.values()) {
+                      sb.append(",").append(s.getCsvStringHeader());
+                    }
+                    sb.append("\n");
+                }
+
+                sb.append(count*printPeriodSeconds);
+                for(final Stat s : typeToStatMap.values()) {
+                    sb.append(",").append(s.getCsvString());
+                }
+                sb.append("\n");
+                try {
+                    FileUtils.write(outFile, sb.toString(), StandardCharsets.UTF_8, true);
+                } catch (final IOException e) {
+                    logger.warn("Error writing to file " + outFile, e);
+                }
+            }
+        };
+
+        final ScheduledFuture<?> csvPrinterFuture = scheduler.scheduleAtFixedRate(csvPrinterTask, initialPrintDelaySeconds, printPeriodSeconds, TimeUnit.SECONDS);
+        futureList.add(csvPrinterFuture);
+    }
+
+
+
+    private void updateStatsFromKafka(final String topic) {
+        try (KafkaConsumer<String, VisibilityBindingSet> consumer = new KafkaConsumer<>(options.getKafkaConsumerProperties(), new StringDeserializer(), new KryoVisibilityBindingSetSerializer())) {
+            consumer.subscribe(Arrays.asList(topic));
+            while (!futureList.isEmpty()) {
+                final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(500);  // check kafka at most twice a second.
+                handle(records);
+            }
+        } catch (final Exception e) {
+            logger.warn("Exception occurred", e);
+        }
+    }
+
+    private void updatePeriodicStatsFromKafka(final String topic) {
+        try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(options.getKafkaConsumerProperties(), new StringDeserializer(), new BindingSetSerDe())) {
+            consumer.subscribe(Arrays.asList(topic));
+            while (!futureList.isEmpty()) {
+                final ConsumerRecords<String, BindingSet> records = consumer.poll(500);  // check kafka at most twice a second.
+                handle(records);
+            }
+        } catch (final Exception e) {
+            logger.warn("Exception occurred", e);
+        }
+    }
+
+    private void handle(final ConsumerRecords<String, ? extends BindingSet> records) {
+        if(records.count() > 0) {
+            logger.debug("Received {} records", records.count());
+        }
+        for(final ConsumerRecord<String, ? extends BindingSet> record: records){
+            final BindingSet result = record.value();
+            logger.debug("Received BindingSet: {}", result);
+
+            final String type = result.getBinding("type").getValue().stringValue();
+            final long total = Long.parseLong(result.getBinding("total").getValue().stringValue());
+
+            final Stat stat = typeToStatMap.get(type);
+            if(stat == null) {
+                logger.warn("Not expecting to receive type: {}", type);
+            } else {
+                stat.fluoTotal.set(total);
+            }
+        }
+    }
+
+    private class LoaderTask implements Runnable {
+        private final AtomicLong iterations = new AtomicLong(0);
+        private final int numIterations;
+        private final int numTypes;
+        private final String typePrefix;
+        private final long observationsPerTypePerIteration;
+
+        private final LoadStatements loadStatements;
+        private final String ryaInstanceName;
+        private Runnable shutdownOperation;
+
+        public LoaderTask(final int numIterations, final long observationsPerTypePerIteration, final int numTypes, final String typePrefix, final LoadStatements loadStatements, final String ryaInstanceName) {
+            this.numIterations = numIterations;
+            this.observationsPerTypePerIteration = observationsPerTypePerIteration;
+            this.numTypes = numTypes;
+            this.typePrefix = typePrefix;
+            this.loadStatements = loadStatements;
+            this.ryaInstanceName = ryaInstanceName;
+        }
+
+        @Override
+        public void run() {
+            try {
+                final BenchmarkStatementGenerator gen = new BenchmarkStatementGenerator();
+
+                final long i = iterations.getAndIncrement();
+                logger.info("Publishing iteration [{} of {}]", i, numIterations);
+                if(i >= numIterations) {
+                    logger.info("Reached maximum iterations...");
+                    shutdownOperation.run();
+                    return;
+                }
+                final long observationsPerIteration = observationsPerTypePerIteration * numTypes;
+                final long iterationOffset = i * observationsPerIteration;
+                logger.info("Generating {} Observations", observationsPerIteration);
+                final Iterable<Statement> statements = gen.generate(observationsPerTypePerIteration, numTypes, typePrefix, iterationOffset, ZonedDateTime.now());
+                logger.info("Publishing {} Observations", observationsPerIteration);
+                final long t1 = System.currentTimeMillis();
+                loadStatements.loadStatements(ryaInstanceName, statements);
+                logger.info("Published {} observations in in {}s", observationsPerIteration, ((System.currentTimeMillis() - t1)/1000.0));
+                logger.info("Updating published totals...");
+                for(int typeId = 0; typeId < numTypes; typeId++) {
+                    typeToStatMap.get(typePrefix + typeId).total.addAndGet(observationsPerTypePerIteration);
+                }
+                logger.info("Finished publishing.");
+            } catch (final RyaClientException e) {
+                logger.warn("Error while writing statements", e);
+            } catch (final DatatypeConfigurationException e) {
+                logger.warn("Error creating generator", e);
+            }
+
+        }
+
+        public void setShutdownOperation(final Runnable f) {
+            this.shutdownOperation = f;
+        }
+    }
+
+
+    /**
+     * Simple data structure for storing and reporting statistics for a Type.
+     */
+    private class Stat {
+        protected final AtomicLong fluoTotal = new AtomicLong(0);
+        protected final AtomicLong total = new AtomicLong(0);
+        private final String type;
+        private final LongSummaryStatistics diffStats = new LongSummaryStatistics();
+        public Stat(final String type) {
+            this.type = type;
+        }
+
+        @Override
+        public String toString() {
+            final long t = total.get();
+            final long ft = fluoTotal.get();
+            final long diff = t - ft;
+            diffStats.accept(diff);
+            return type + " published total: " + t + " fluo: " + ft + " difference: " + diff + " diffStats: " + diffStats;
+        }
+
+        public String getCsvString() {
+            final long t = total.get();
+            final long ft = fluoTotal.get();
+            final long diff = t - ft;
+            return Joiner.on(",").join(type, t, ft, diff);
+        }
+
+        public String getCsvStringHeader() {
+            return "type,published_total,fluo_total_" + type + ",difference_" + type;
+        }
+    }
+
+    public static void main(final String[] args) {
+
+        final CommonOptions options = new CommonOptions();
+        final ProjectionQueryCommand projectionCommand = new ProjectionQueryCommand();
+        final PeriodicQueryCommand periodicCommand = new PeriodicQueryCommand();
+
+        BenchmarkOptions parsedCommand = null;
+        final JCommander cli = new JCommander();
+        cli.addObject(options);
+        cli.addCommand(projectionCommand);
+        cli.addCommand(periodicCommand);
+        cli.setProgramName(KafkaLatencyBenchmark.class.getName());
+
+        try {
+            cli.parse(args);
+            final String parsedName = cli.getParsedCommand();
+            if ("periodic".equals(parsedName)) {
+                parsedCommand = periodicCommand;
+            }
+            if ("projection".equals(parsedName)) {
+                parsedCommand = projectionCommand;
+            }
+            if (parsedCommand == null) {
+                throw new ParameterException("A command must be specified.");
+            }
+        } catch (final ParameterException e) {
+            System.err.println("Error! Invalid input: " + e.getMessage());
+            cli.usage();
+            System.exit(1);
+        }
+
+        try (KafkaLatencyBenchmark benchmark = new KafkaLatencyBenchmark(options, parsedCommand)) {
+            benchmark.start();
+        } catch (final Exception e) {
+          logger.warn("Exception occured.", e);
+        }
+    }
+}