You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/11/07 15:50:42 UTC
[3/5] incubator-rya git commit: RYA-356 Added a Twill App for running
the periodic service. Closes #248.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java b/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java
new file mode 100644
index 0000000..552324e
--- /dev/null
+++ b/extras/periodic.notification/twill.yarn/src/main/java/org/apache/rya/periodic/notification/twill/yarn/PeriodicNotificationTwillRunner.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.twill.yarn;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration;
+import org.apache.rya.periodic.notification.twill.PeriodicNotificationTwillApp;
+import org.apache.rya.periodic.notification.twill.PeriodicNotificationTwillRunnable;
+import org.apache.twill.api.ClassAcceptor;
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.yarn.YarnTwillRunnerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+
+/**
+ * This class is responsible for starting and stopping the {@link PeriodicNotificationTwillApp} on a Hadoop YARN cluster.
+ */
+public class PeriodicNotificationTwillRunner implements AutoCloseable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(PeriodicNotificationTwillRunner.class);
+
+ private final YarnConfiguration yarnConfiguration;
+ private final TwillRunnerService twillRunner;
+ private final File configFile;
+
+ /**
+ *
+ * @param yarnZookeepers - The zookeeper connect string used by the Hadoop YARN cluster.
+ * @param configFile - The config file used by {@link PeriodicNotificationTwillApp}. Typically notification.properties.
+ */
+ public PeriodicNotificationTwillRunner(final String yarnZookeepers, final File configFile) {
+ Preconditions.checkArgument(configFile.exists(), "Config File must exist");
+ Objects.requireNonNull(yarnZookeepers, "YARN Zookeepers must not be null.");
+ this.configFile = configFile;
+ yarnConfiguration = new YarnConfiguration();
+ twillRunner = new YarnTwillRunnerService(yarnConfiguration, yarnZookeepers);
+ twillRunner.start();
+
+ // sleep to give the YarnTwillRunnerService time to retrieve state from zookeeper
+ try {
+ Thread.sleep(1000);
+ } catch (final InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Start an instance of the {@link PeriodicNotificationTwillApp}.
+ *
+ * @param interactive - If true, this method will block until the user terminates this JVM, at which point the
+ * {@link PeriodicNotificationTwillApp} on the YARN cluster will also be terminated. If false, this
+ * method will return after startup.
+ */
+ public void startApp(final boolean interactive) {
+ final String yarnClasspath = yarnConfiguration.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ Joiner.on(",").join(YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH));
+ final List<String> applicationClassPaths = Lists.newArrayList();
+ Iterables.addAll(applicationClassPaths, Splitter.on(",").split(yarnClasspath));
+ final TwillController controller = twillRunner
+ .prepare(new PeriodicNotificationTwillApp(configFile))
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8), true)))
+ .withApplicationClassPaths(applicationClassPaths)
+ //.withApplicationArguments(args)
+ //.withArguments(runnableName, args)
+ // .withBundlerClassAcceptor(new HadoopClassExcluder())
+ .start();
+
+ final ResourceReport r = getResourceReport(controller, 5, TimeUnit.MINUTES);
+ LOG.info("Received ResourceReport: {}", r);
+ LOG.info("{} started successfully!", PeriodicNotificationTwillApp.APPLICATION_NAME);
+
+ if(interactive) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ Futures.getUnchecked(controller.terminate());
+ } finally {
+ twillRunner.stop();
+ }
+ }
+ });
+
+ try {
+ LOG.info("{} waiting termination by user. Type ctrl-c to terminate.", PeriodicNotificationTwillApp.class.getSimpleName());
+ controller.awaitTerminated();
+ } catch (final ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Terminates all instances of the {@link PeriodicNotificationTwillApp} on the YARN cluster.
+ */
+ public void stopApp() {
+ LOG.info("Stopping any running instances...");
+
+ int counter = 0;
+ // It is possible that we have launched multiple instances of the app. For now, stop them all, one at a time.
+ for(final TwillController c : twillRunner.lookup(PeriodicNotificationTwillApp.APPLICATION_NAME)) {
+ final ResourceReport report = c.getResourceReport();
+ LOG.info("Attempting to stop {} with YARN ApplicationId: {} and Twill RunId: {}", PeriodicNotificationTwillApp.APPLICATION_NAME, report.getApplicationId(), c.getRunId());
+ Futures.getUnchecked(c.terminate());
+ LOG.info("Stopped {} with YARN ApplicationId: {} and Twill RunId: {}", PeriodicNotificationTwillApp.APPLICATION_NAME, report.getApplicationId(), c.getRunId());
+ counter++;
+ }
+
+ LOG.info("Stopped {} instance(s) of {}", counter, PeriodicNotificationTwillApp.APPLICATION_NAME);
+ }
+
+ /**
+ * Blocks until a non-null Resource report is returned.
+ * @param controller - The controller to interrogate.
+ * @param timeout - The maximum time to poll {@controller}. Use -1 for infinite polling.
+ * @param timeoutUnits - The units of {@code timeout}.
+ * @return The ResourceReport for the application.
+ * @throws IllegalStateException If a timeout occurs before a ResourceReport is returned.
+ */
+ private ResourceReport getResourceReport(final TwillController controller, final long timeout, final TimeUnit timeoutUnits) {
+ Preconditions.checkArgument(timeout >= -1, "timeout cannot be less than -1");
+ final long timeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, timeoutUnits);
+ final long sleepMillis = 1000; // how long to sleep between retrieval attempts.
+ long totalElapsedMillis = 0;
+ ResourceReport report = controller.getResourceReport();
+ while (reportIsLoading(report)) {
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (final InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ totalElapsedMillis += sleepMillis;
+ if ((timeout != -1) && (totalElapsedMillis >= timeoutMillis)) {
+ final String errorMessage = "Timeout while waiting for the Twill Application to start on YARN. Total elapsed time: " + TimeUnit.SECONDS.convert(totalElapsedMillis, TimeUnit.MILLISECONDS) + "s.";
+ LOG.error(errorMessage);
+ throw new IllegalStateException(errorMessage);
+ }
+ if ((totalElapsedMillis % 5000) == 0) {
+ LOG.info("Waiting for the Twill Application to start on YARN... Total elapsed time: {}s.", TimeUnit.SECONDS.convert(totalElapsedMillis, TimeUnit.MILLISECONDS));
+ }
+ report = controller.getResourceReport();
+ }
+ return report;
+ }
+
+ /**
+ * Checks to see if the report has loaded.
+ * @param report - The {@link ResourceReport} for this Twill Application.
+ * @return Return true if the report is null or incomplete. Return false if the report is completely loaded.
+ */
+ private boolean reportIsLoading(@Nullable final ResourceReport report) {
+ if(report == null) {
+ return true;
+ }
+
+ final String yarnApplicationID = report.getApplicationId();
+ final Collection<TwillRunResources> runnableResources = report.getResources().get(PeriodicNotificationTwillRunnable.TWILL_RUNNABLE_NAME);
+
+ if(runnableResources == null || runnableResources.isEmpty()) {
+ LOG.info("Received Resource Report for YARN ApplicationID: {}, runnable resources are still loading...", yarnApplicationID);
+ return true;
+ } else {
+ LOG.info("Received Resource Report for YARN ApplicationID: {}, runnable resources are loaded.", yarnApplicationID);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if(twillRunner != null) {
+ twillRunner.stop();
+ }
+ }
+
+ private static class MainOptions {
+ @Parameter(names = { "-c", "--config-file" }, description = "PeriodicNotification Application config file", required = true)
+ private File configFile;
+
+ @Parameter(names = { "-z", "--yarn-zookeepers" }, description = "(Optional) YARN Zookeepers connect string. If not specified, the value of 'accumulo.zookeepers' from the specified '--config-file' will be reused.", required = false)
+ private String zookeepers;
+ }
+
+
+ @Parameters(commandNames = { "start" }, separators = "=", commandDescription = "Start the PeriodicNotification Application on YARN")
+ private static class CommandStart {
+ @Parameter(names = { "-i", "--interactive" }, description = "(Optional) Interactive. If specified, blocks the console until the user types ctrl-c.", required = false)
+ private boolean interactive;
+
+ //TODO future feature.
+ //@Parameter(names = { "-p", "--accumulo.password"}, description = "Leave value blank to be prompted interactively for the 'accumulo.password' of the 'accumulo.user' specified by the '--config-file'", password = true, required = true)
+ //private String password;
+ }
+
+ @Parameters(commandNames = { "stop" }, commandDescription = "Stops PeriodicNotification Applications on YARN")
+ private static class CommandStop {
+ //TODO future feature.
+ //@Parameter(names = { "-a", "--all" }, description = "Stops all PeriodicNotification Application instances.", required = false)
+ //private boolean all = true;
+
+ //@Parameter(names = { "-i", "--instances"}, description = "CSV List of application instances to be stopped", required = false)
+ //private List<String> instanceList;
+ }
+
+
+ public static void main(final String[] args) {
+
+ final MainOptions options = new MainOptions();
+ final String START = "start";
+ final String STOP = "stop";
+ String parsedCommand = null;
+ final CommandStart commandStart = new CommandStart();
+ final CommandStop commandStop = new CommandStop();
+ final JCommander cli = new JCommander(options);
+ cli.addCommand(START, commandStart);
+ cli.addCommand(STOP, commandStop);
+ cli.setProgramName(PeriodicNotificationTwillRunner.class.getName());
+ try {
+ cli.parse(args);
+ parsedCommand = cli.getParsedCommand();
+ if(parsedCommand == null) {
+ throw new ParameterException("A command must be specified.");
+ }
+ } catch (final ParameterException e) {
+ System.err.println("Error! Invalid input: " + e.getMessage());
+ cli.usage();
+ System.exit(1);
+ }
+
+ // load the config file
+ PeriodicNotificationApplicationConfiguration conf = null;
+ try (FileInputStream fin = new FileInputStream(options.configFile)) {
+ final Properties p = new Properties();
+ p.load(fin);
+ conf = new PeriodicNotificationApplicationConfiguration(p);
+ } catch (final Exception e) {
+ LOG.warn("Unable to load specified properties file", e);
+ System.exit(1);
+ }
+
+ // pick the correct zookeepers
+ String zookeepers;
+ if(options.zookeepers != null && !options.zookeepers.isEmpty()) {
+ zookeepers = options.zookeepers;
+ } else {
+ zookeepers = conf.getAccumuloZookeepers();
+ }
+
+ try (final PeriodicNotificationTwillRunner app = new PeriodicNotificationTwillRunner(zookeepers, options.configFile)) {
+ if(START.equals(parsedCommand)) {
+ app.startApp(commandStart.interactive);
+ } else if(STOP.equals(parsedCommand)) {
+ app.stopApp();
+ } else {
+ throw new IllegalStateException("Invalid Command."); // this state should be impossible.
+ }
+ } catch (final Exception e) {
+ LOG.warn("Error occurred.", e);
+ System.exit(1);
+ }
+ }
+
+ static class HadoopClassExcluder extends ClassAcceptor {
+ @Override
+ public boolean accept(final String className, final URL classUrl, final URL classPathUrl) {
+ // exclude hadoop but not hbase package
+ return !(className.startsWith("org.apache.hadoop") && !className.startsWith("org.apache.hadoop.hbase"));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh b/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh
new file mode 100644
index 0000000..8fa3497
--- /dev/null
+++ b/extras/periodic.notification/twill.yarn/src/main/scripts/periodicNotificationTwillApp.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# navigate to the project directory
+PROJECT_HOME=$(dirname $(cd $(dirname $0) && pwd))
+cd $PROJECT_HOME
+
+# setup the twill classpath
+. conf/twill-env.sh
+
+# echo "Using classpath: $TWILL_CP"
+
+# run the program
+$JAVA_HOME/bin/java -cp $TWILL_CP \
+ -Dlogback.configurationFile=conf/logback.xml \
+ org.apache.rya.periodic.notification.twill.yarn.PeriodicNotificationTwillRunner "$@"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/README.md
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill/README.md b/extras/periodic.notification/twill/README.md
new file mode 100644
index 0000000..c87ad2e
--- /dev/null
+++ b/extras/periodic.notification/twill/README.md
@@ -0,0 +1,36 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+## rya.periodic.notification.twill
+
+This project serves two purposes:
+
+1) Store all `org.apache.twill:twill-api` specific code to decouple this execution
+environment dependency from the rest of Rya's implemenation logic. This way it
+should be easy to integrate Rya code with alternative execution environments.
+
+2) It can be tricky to shield Twill applications from the constraints of the Twill
+runtime environment (specifically a Guava 13.0 dependency, among potentially others).
+By controlling the packaging of this project and leveraging the maven-shade-plugin's
+relocation capability, we can avoid current and future classpath conflicts and allow
+for a cleaner integration with Twill than we might get with the `BundledJarRunner`
+from `org.apache.twill:twill-ext`.
+
+Note, the distribution of this twill application can be found in
+`rya.periodic.notification.twill.yarn`.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill/pom.xml b/extras/periodic.notification/twill/pom.xml
new file mode 100644
index 0000000..e22dd45
--- /dev/null
+++ b/extras/periodic.notification/twill/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.notification.parent</artifactId>
+ <version>3.2.12-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.periodic.notification.twill</artifactId>
+
+ <name>Apache Rya Periodic Notification Service on Twill </name>
+ <description>Twill Application for executing the Apache Rya Periodic Notification Service</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <!-- redirect any other logging frameworks to slf4j within the Twill runtime -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <!--
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.notification.api</artifactId>
+ </dependency>
+ -->
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.notification.service</artifactId>
+ <exclusions>
+ <!-- exclude logging implementations -->
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.twill</groupId>
+ <artifactId>twill-api</artifactId>
+ <version>0.12.0</version>
+ </dependency>
+
+ <!-- Mark Accumulo Hadoop and Zookeeper as provided dependencies -->
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!--
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>2.3.7</version>
+ <extensions>true</extensions>
+ <executions>
+ <execution>
+ <id>bundle</id>
+ <phase>package</phase>
+ <goals>
+ <goal>bundle</goal>
+ </goals>
+ <configuration>
+ <classifier>bundle</classifier>
+ <instructions>
+ <Embed-Dependency>*;inline=false</Embed-Dependency>
+ <Embed-Transitive>true</Embed-Transitive>
+ <Embed-Directory>lib</Embed-Directory>
+ </instructions>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>twill-app</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedClassifierName>twill-app</shadedClassifierName>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
+ </transformers>
+ <relocations>
+ <!-- relocate the more modern version of guava to avoid classpath conflicts -->
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.rya.shaded.com.google.common</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <excludes>
+ <!-- exclude logging implementations if the above exclusions/scoping don't catch them -->
+ <exclude>commons-logging:commons-logging</exclude>
+ <exclude>org.slf4j:slf4j-log4j12</exclude>
+ <exclude>log4j:log4j</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java
new file mode 100644
index 0000000..d2a6125
--- /dev/null
+++ b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillApp.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.twill;
+
+import java.io.File;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.ResourceSpecification.SizeUnit;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillSpecification;
+
+public class PeriodicNotificationTwillApp implements TwillApplication {
+
+
+ private final File configFile;
+ public static final String APPLICATION_NAME = PeriodicNotificationTwillApp.class.getSimpleName();
+
+ public PeriodicNotificationTwillApp(final File configFile) {
+ this.configFile = configFile;
+ }
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName(APPLICATION_NAME)
+ .withRunnable()
+ .add(PeriodicNotificationTwillRunnable.TWILL_RUNNABLE_NAME,
+ new PeriodicNotificationTwillRunnable(),
+ ResourceSpecification.Builder.with()
+ .setVirtualCores(2)
+ .setMemory(2, SizeUnit.GIGA)
+ .setInstances(1)
+ .build())
+ .withLocalFiles()
+ .add(PeriodicNotificationTwillRunnable.CONFIG_FILE_NAME, configFile)
+ .apply()
+ .anyOrder()
+ .build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java
new file mode 100644
index 0000000..8695655
--- /dev/null
+++ b/extras/periodic.notification/twill/src/main/java/org/apache/rya/periodic/notification/twill/PeriodicNotificationTwillRunnable.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.twill;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.rya.periodic.notification.application.PeriodicApplicationException;
+import org.apache.rya.periodic.notification.application.PeriodicNotificationApplication;
+import org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration;
+import org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationFactory;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.Command;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.api.TwillRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PeriodicNotificationTwillRunnable extends AbstractTwillRunnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(PeriodicNotificationTwillRunnable.class);
+
+ public static final String TWILL_RUNNABLE_NAME = PeriodicNotificationTwillRunnable.class.getSimpleName();
+ public static final String CONFIG_FILE_NAME = "notification.properties";
+
+ private PeriodicNotificationApplication app;
+
+ /**
+ * Called when the container process starts. Executed in container machine. If any exception is thrown from this
+ * method, this runnable won't get retry.
+ *
+ * @param context Contains information about the runtime context.
+ */
+ @Override
+ public void initialize(final TwillContext context) {
+ logger.info("Initializing the PeriodicNotificationApplication.");
+
+ final File propsFile = new File(CONFIG_FILE_NAME);
+ final PeriodicNotificationApplicationConfiguration conf;
+ try (final FileInputStream fin = new FileInputStream(propsFile)) {
+ final Properties p = new Properties();
+ p.load(fin);
+ logger.debug("Loaded properties: {}", p);
+ conf = new PeriodicNotificationApplicationConfiguration(p);
+ } catch (final Exception e) {
+ logger.error("Error loading notification properties", e);
+ throw new RuntimeException(e); // kill the Runnable
+ }
+
+ try {
+ this.app = PeriodicNotificationApplicationFactory.getPeriodicApplication(conf);
+ } catch (final PeriodicApplicationException e) {
+ logger.error("Error occurred creating PeriodicNotificationApplication", e);
+ throw new RuntimeException(e); // kill the Runnable
+ }
+ }
+
+ /**
+ * Called when a command is received. A normal return denotes the command has been processed successfully, otherwise
+ * {@link Exception} should be thrown.
+ * @param command Contains details of the command.
+ * @throws Exception
+ */
+ @Override
+ public void handleCommand(final Command command) throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void run() {
+ logger.info("Starting up the PeriodicNotificationApplication.");
+ app.start();
+ try {
+ logger.info("Blocking thread termination until the PeriodicNotificationApplication is stopped.");
+ app.blockUntilFinished();
+ } catch (IllegalStateException | ExecutionException | InterruptedException e) {
+ logger.error("An error occurred while blocking on the PeriodicNotificationApplication", e);
+ }
+ logger.info("Exiting the PeriodicNotificationApplication.");
+ }
+
+ /**
+ * Requests to stop the running service.
+ */
+ @Override
+ public void stop() {
+ logger.info("Stopping the PeriodicNotificationApplication...");
+ app.stop();
+ }
+
+ /**
+ * Called when the {@link TwillRunnable#run()} completed. Useful for doing
+ * resource cleanup. This method would only get called if the call to {@link #initialize(TwillContext)} was
+ * successful.
+ */
+ @Override
+ public void destroy() {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/README.md
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/README.md b/extras/rya.benchmark/README.md
new file mode 100644
index 0000000..228dfaf
--- /dev/null
+++ b/extras/rya.benchmark/README.md
@@ -0,0 +1,77 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Benchmark Optimizations
+
+
+## KafkaLatencyBenchmark
+
+Several strategies for partitioning the rya_pcj_updater table. If other tablet start hot spotting, they can be further split similar to how `STATEMENT_PATTERN_` is shown.
+```
+addsplits AGGREGATION_ JOIN_ PROJECTION_ QUERY_ STATEMENT_PATTERN_ urn -t rya_pcj_updater
+
+# or
+addsplits AGGREGATION_ JOIN_ PROJECTION_ QUERY_ STATEMENT_PATTERN_0 STATEMENT_PATTERN_4 STATEMENT_PATTERN_8 STATEMENT_PATTERN_c urn -t rya_pcj_updater
+
+# or
+addsplits AGGREGATION_ JOIN_ PROJECTION_ QUERY_ STATEMENT_PATTERN_0 STATEMENT_PATTERN_1 STATEMENT_PATTERN_2 STATEMENT_PATTERN_3 STATEMENT_PATTERN_4 STATEMENT_PATTERN_5 STATEMENT_PATTERN_6 STATEMENT_PATTERN_7 STATEMENT_PATTERN_8 STATEMENT_PATTERN_9 STATEMENT_PATTERN_a STATEMENT_PATTERN_b STATEMENT_PATTERN_c STATEMENT_PATTERN_d STATEMENT_PATTERN_e STATEMENT_PATTERN_f urn -t rya_pcj_updater
+
+# then ensure the splits have been applied.
+compact -t rya_pcj_updater
+```
+
+It is also possible to lower the table's split threshold to generate more tablets.
+```
+root@accumulo> config -t rya_pcj_updater -s table.split.threshold=100M
+```
+
+Identify which tablets are on what hosts and what particular data you might be
+hotspotting on. Note that the tablet splits are ordered lexicographically, and
+the split point is exclusive. So the tablet that contains AGGREGATION_ data is
+ actually contained on the tablet with the split point label: 4qn;JOIN_.
+```
+root@accumulo> tables -l
+...
+rya_osp => 4qr
+rya_pcj_updater => 4qn
+rya_po => 4qq
+...
+
+root@accumulo> scan -t accumulo.metadata -b 4qn; -e 4qn< -c loc
+4qn;AGGREGATION_ loc:25e09c3a40b000e [] 10.10.10.10:9997
+4qn;JOIN_ loc:45e09c3a2260012 [] 10.10.10.11:9997
+4qn;PROJECTION_ loc:55e09c3a2cf0014 [] 10.10.10.12:9997
+4qn;QUERY_ loc:35e09c3a2080021 [] 10.10.10.13:9997
+4qn;STATEMENT_PATTERN_0 loc:16e09c3a436001d [] 10.10.10.14:9997
+4qn;STATEMENT_PATTERN_4 loc:17e09c3a436001d [] 10.10.10.15:9997
+4qn;STATEMENT_PATTERN_8 loc:18e09c3a436001d [] 10.10.10.16:9997
+4qn;STATEMENT_PATTERN_c loc:19e09c3a436001d [] 10.10.10.17:9997
+4qn;urn loc:15e09c3a4360019 [] 10.10.10.18:9997
+4qn< loc:55e09c3a2cf0012 [] 10.10.10.19:9997
+```
+
+Use the `RegexGroupBalancer` to ensure all STATEMENT_PATTERN_x tablets are evenly distributed between all available tablet servers. This distribution strategy will also apply to other groups that are specified in the regex.
+```
+root@accumulo> config -t rya_pcj_updater -s table.custom.balancer.group.regex.pattern=(AGGREGATION_|JOIN_|PROJECTION_|QUERY_|STATEMENT_PATTERN_|urn).*
+#root@accumulo> config -t rya_pcj_updater -s table.custom.balancer.group.regex.default=AGGREGATION_
+root@accumulo> config -t rya_pcj_updater -s table.balancer=org.apache.accumulo.server.master.balancer.RegexGroupBalancer
+```
+References:
+https://blogs.apache.org/accumulo/entry/balancing_groups_of_tablets
+https://reviews.apache.org/r/29230/diff/2#index_header
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/pom.xml b/extras/rya.benchmark/pom.xml
index fce434b..8468290 100644
--- a/extras/rya.benchmark/pom.xml
+++ b/extras/rya.benchmark/pom.xml
@@ -55,6 +55,13 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+
+ <!-- Fluo runtime dependency -->
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-core</artifactId>
+ <scope>runtime</scope>
+ </dependency>
<!-- Testing -->
<dependency>
@@ -140,7 +147,6 @@
<goal>shade</goal>
</goals>
<configuration>
- <finalName>benchmarks</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
@@ -164,6 +170,23 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-binary-distribution</id>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-release.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/assembly/binary-release.xml
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/assembly/binary-release.xml b/extras/rya.benchmark/src/main/assembly/binary-release.xml
new file mode 100644
index 0000000..374213f
--- /dev/null
+++ b/extras/rya.benchmark/src/main/assembly/binary-release.xml
@@ -0,0 +1,33 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
+ <id>bin</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>true</includeBaseDirectory>
+ <componentDescriptors>
+ <componentDescriptor>src/main/assembly/component-release.xml</componentDescriptor>
+ </componentDescriptors>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/assembly/component-release.xml
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/assembly/component-release.xml b/extras/rya.benchmark/src/main/assembly/component-release.xml
new file mode 100644
index 0000000..0d99717
--- /dev/null
+++ b/extras/rya.benchmark/src/main/assembly/component-release.xml
@@ -0,0 +1,81 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<component
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/component/1.1.3"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/component/1.1.3 http://maven.apache.org/xsd/component-1.1.3.xsd">
+ <fileSets>
+ <fileSet>
+ <directory>src/main/config</directory>
+ <outputDirectory>conf</outputDirectory>
+ <directoryMode>0755</directoryMode>
+ <fileMode>0644</fileMode>
+ <lineEnding>unix</lineEnding>
+ <filtered>false</filtered>
+ <includes>
+ <include>*.options</include>
+ <include>*.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/scripts</directory>
+ <outputDirectory>bin</outputDirectory>
+ <directoryMode>0755</directoryMode>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>*.sh</include>
+ </includes>
+ <lineEnding>unix</lineEnding>
+ <filtered>true</filtered>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/scripts</directory>
+ <outputDirectory>bin</outputDirectory>
+ <directoryMode>0755</directoryMode>
+ <fileMode>0644</fileMode>
+ <includes>
+ <include>*.bat</include>
+ </includes>
+ <lineEnding>dos</lineEnding>
+ <filtered>true</filtered>
+ </fileSet>
+
+ <!-- create an empty directory for log files -->
+ <fileSet>
+ <directory>src/main/assembly</directory>
+ <outputDirectory>logs</outputDirectory>
+ <directoryMode>755</directoryMode>
+ <excludes>
+ <exclude>*</exclude>
+ </excludes>
+ </fileSet>
+
+ <fileSet>
+ <directory>${project.build.directory}</directory>
+ <outputDirectory>lib</outputDirectory>
+ <directoryMode>755</directoryMode>
+ <fileMode>0644</fileMode>
+ <includes>
+ <include>${project.artifactId}-${project.version}-shaded.jar</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</component>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/common.options
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/config/common.options b/extras/rya.benchmark/src/main/config/common.options
new file mode 100644
index 0000000..9a0c7c1
--- /dev/null
+++ b/extras/rya.benchmark/src/main/config/common.options
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This file contains example configuration values and should
+# be modified to target your Rya/Kafka environment.
+
+--zookeepers
+zoo1,zoo2,zoo3,zoo4,zoo5
+
+--kafka-bootstrap-servers
+kafka1:9092,kafka2:9092
+
+--accumulo-instance
+accumuloInstance
+
+--rya-instance
+rya_
+
+--username
+accumuloUser
+
+# specifying --password here prompts the user to enter
+# the accumulo password interactively on the shell
+--password
+
+# local directory for storing benchmark output
+--output-directory
+results
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/config/log4j.properties b/extras/rya.benchmark/src/main/config/log4j.properties
new file mode 100644
index 0000000..1101514
--- /dev/null
+++ b/extras/rya.benchmark/src/main/config/log4j.properties
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Valid levels:
+# TRACE, DEBUG, INFO, WARN, ERROR and FATAL
+log4j.rootCategory=INFO, CONSOLE, LOGFILE
+
+# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.CONSOLE.layout.ConversionPattern=[%p] %m%n
+log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c %x - %m%n
+
+# LOGFILE is set to be a File appender using a PatternLayout.
+log4j.appender.LOGFILE=org.apache.log4j.FileAppender
+log4j.appender.LOGFILE.File=logs/benchmark.log
+#log4j.appender.LOGFILE.Threshold=DEBUG
+log4j.appender.LOGFILE.Append=true
+
+log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.LOGFILE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+#log4j.appender.LOGFILE.layout=org.apache.log4j.EnhancedPatternLayout
+#log4j.appender.LOGFILE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n
+
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/periodic.options
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/config/periodic.options b/extras/rya.benchmark/src/main/config/periodic.options
new file mode 100644
index 0000000..51ac8ac
--- /dev/null
+++ b/extras/rya.benchmark/src/main/config/periodic.options
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+--ingest-iterations
+1000
+
+--ingest-observations-per-type
+10
+
+--ingest-types
+5
+
+--ingest-type-prefix
+car_
+
+--ingest-period-sec
+30
+
+--report-period-sec
+10
+
+--periodic-query-window
+15
+
+#every 30 seconds
+--periodic-query-period
+.5
+
+--periodic-query-time-units
+minutes
+
+--periodic-query-registration-topic
+notifications
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/config/projection.options
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/config/projection.options b/extras/rya.benchmark/src/main/config/projection.options
new file mode 100644
index 0000000..9bba2de
--- /dev/null
+++ b/extras/rya.benchmark/src/main/config/projection.options
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+--ingest-iterations
+1000
+
+--ingest-observations-per-type
+10
+
+--ingest-types
+5
+
+--ingest-type-prefix
+car_
+
+--ingest-period-sec
+30
+
+--report-period-sec
+10
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java
new file mode 100644
index 0000000..a848ebb
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkOptions.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.periodic;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.base.Objects;
+
+public class BenchmarkOptions {
+ @Parameter(names = { "-ii", "--ingest-iterations" }, description = "Number of ingest iterations. Total data published is -i x -obs x -t", required = true)
+ private int numIterations;
+
+ @Parameter(names = { "-iobs", "--ingest-observations-per-type" }, description = "Observations per Type per Iteration to generate.", required = true)
+ private int observationsPerTypePerIteration;
+
+ @Parameter(names = { "-it", "--ingest-types" }, description = "The number of unique types to generate.", required = true)
+ private int numTypes;
+
+ @Parameter(names = { "-itp", "--ingest-type-prefix" }, description = "The prefix to use for a type, for example 'car_'", required = true)
+ private String typePrefix;
+
+ @Parameter(names = { "-ip", "--ingest-period-sec" }, description = "The period, in seconds between ingests of the data generated for one iteration.", required = true)
+ private int ingestPeriodSeconds;
+
+ @Parameter(names = { "-rp", "--report-period-sec" }, description = "The period, in seconds between persisting reports of the current state.", required = true)
+ private int resultPeriodSeconds;
+
+ public int getNumIterations() {
+ return numIterations;
+ }
+
+ public int getObservationsPerTypePerIteration() {
+ return observationsPerTypePerIteration;
+ }
+
+ public int getNumTypes() {
+ return numTypes;
+ }
+
+ public String getTypePrefix() {
+ return typePrefix;
+ }
+
+ public int getIngestPeriodSeconds() {
+ return ingestPeriodSeconds;
+ }
+
+ public int getResultPeriodSeconds() {
+ return resultPeriodSeconds;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("numIterations", numIterations)
+ .add("observationsPerTypePerIteration", observationsPerTypePerIteration)
+ .add("numTypes", numTypes)
+ .add("typePrefix", typePrefix)
+ .add("ingestPeriodSeconds", ingestPeriodSeconds)
+ .add("resultPeriodSeconds", resultPeriodSeconds)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java
new file mode 100644
index 0000000..fdd3b63
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/BenchmarkStatementGenerator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.periodic;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+
+import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Generates sets of statements used for benchmarking
+ */
+public class BenchmarkStatementGenerator {
+
+ private static final Logger logger = LoggerFactory.getLogger(BenchmarkStatementGenerator.class);
+
+ private final ValueFactory vf;
+ private final DatatypeFactory dtf;
+
+ public BenchmarkStatementGenerator() throws DatatypeConfigurationException {
+ vf = new ValueFactoryImpl();
+ dtf = DatatypeFactory.newInstance();
+ }
+
+ /**
+ * Generates (numObservationsPerType x numTypes) statements of the form:
+ *
+ * <pre>
+ * urn:obs_n uri:hasTime zonedTime
+ * urn:obs_n uri:hasObsType typePrefix_m
+ * </pre>
+ *
+ * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by
+ * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes.
+ *
+ * @param numObservationsPerType - The quantity of observations per type to generate.
+ * @param numTypes - The number of types to generate observations for.
+ * @param typePrefix - The prefix to be used for the type literal in the statement.
+ * @param observationOffset - The offset to be used for determining the value of n in the above statements.
+ * @param zonedTime - The time to be used for all observations generated.
+ * @return A new list of all generated Statements.
+ */
+ public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix, final long observationOffset, final ZonedDateTime zonedTime) {
+ final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT);
+ final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
+ final List<Statement> statements = Lists.newArrayList();
+
+ for (long i = 0; i < numObservationsPerType; i++) {
+ for(int j = 0; j < numTypes; j++) {
+ final long observationId = observationOffset + i*numTypes + j;
+ //final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId;
+ //final String obsId = "urn:obs_" + observationId;
+ final String obsId = "urn:obs_" + String.format("%020d", observationId);
+ final String type = typePrefix + j;
+ //logger.info(obsId + " " + type + " " + litTime);
+ statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime));
+ statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type)));
+ }
+ }
+
+ return statements;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java
new file mode 100644
index 0000000..e87e6a9
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/CommonOptions.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.periodic;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.base.Objects;
+
+public class CommonOptions {
+
+ @Parameter(names = { "-u", "--username" }, description = "Accumulo Username", required = true)
+ private String username;
+
+ @Parameter(names = { "-p", "--password" }, description = "Accumulo Password", required = true, password=true)
+ private String password;
+
+ @Parameter(names = { "-ai", "--accumulo-instance" }, description = "Accumulo Instance", required = true)
+ private String accumuloInstance;
+
+ @Parameter(names = { "-ri", "--rya-instance" }, description = "Rya Instance", required = true)
+ private String ryaInstance;
+
+ @Parameter(names = { "-z", "--zookeepers" }, description = "Accumulo Zookeepers", required = true)
+ private String zookeepers;
+
+ @Parameter(names = { "-k", "--kafka-bootstrap-servers" }, description = "Kafka bootstrap server string, for example: kafka1:9092,kafka2:9092", required = true)
+ private String kafkaBootstrap;
+
+ @Parameter(names = { "-o", "--output-directory" }, description = "The directory that output should be persisted to.", required = true)
+ private File outputDirectory;
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getAccumuloInstance() {
+ return accumuloInstance;
+ }
+
+ public String getRyaInstance() {
+ return ryaInstance;
+ }
+
+ public String getZookeepers() {
+ return zookeepers;
+ }
+
+ public String getKafkaBootstrap() {
+ return kafkaBootstrap;
+ }
+
+ public File getOutputDirectory() {
+ return outputDirectory;
+ }
+
+ public RyaClient buildRyaClient() throws AccumuloException, AccumuloSecurityException {
+ final Instance instance = new ZooKeeperInstance(accumuloInstance, zookeepers);
+ final AccumuloConnectionDetails accumuloDetails = new AccumuloConnectionDetails(username, password.toCharArray(), accumuloInstance, zookeepers);
+ return AccumuloRyaClientFactory.build(accumuloDetails, instance.getConnector(username, new PasswordToken(password)));
+ }
+
+ public Properties getKafkaConsumerProperties() {
+ final Properties consumerProps = new Properties();
+ consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap);
+ consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "5000"); // reduce this value to 5 seconds for the scenario where we subscribe before the topic exists.
+ consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return consumerProps;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("username", username)
+ .add("password", "[redacted]")
+ .add("accumuloInstance", accumuloInstance)
+ .add("ryaInstance", ryaInstance)
+ .add("zookeepers", zookeepers)
+ .add("kafkaBootstrap", kafkaBootstrap)
+ .add("outputDirectory", outputDirectory)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java
new file mode 100644
index 0000000..454f7e0
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.periodic;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.xml.datatype.DatatypeConfigurationException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.LoadStatements;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import org.openrdf.model.Statement;
+import org.openrdf.query.BindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.ParameterException;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * This benchmark is useful for determining the performance characteristics of a Rya Triplestore under continuous ingest
+ * that has a PCJ Query that is incrementally updated by the Rya PCJ Fluo App (aka PCJ Updater).
+ * <p>
+ * This benchmark periodically loads a batch of data into Rya and reports the delay of the following query
+ *
+ * <pre>
+ * PREFIX time: <http://www.w3.org/2006/time#>
+ * SELECT ?type (count(?obs) as ?total)
+ * WHERE {
+ * ?obs <uri:hasTime> ?time .
+ * ?obs <uri:hasObsType> ?type .
+ * }
+ * GROUP BY ?type
+ * </pre>
+ * <p>
+ * This benchmark is useful for characterizing any latency between Truth (data ingested to Rya) and Reported (query
+ * result published to Kafka).
+ * <p>
+ * This benchmark is also useful for stress testing a Fluo App configuration and Accumulo Tablet configuration.
+ * <p>
+ * This benchmark expects the provided RyaInstance to have already been constructed and an appropriately configured Rya
+ * PCJ Fluo App for that RyaInstance to be deployed on your YARN cluster.
+ */
+public class KafkaLatencyBenchmark implements AutoCloseable {
+
+ public static final Logger logger = LoggerFactory.getLogger(KafkaLatencyBenchmark.class);
+
+ /**
+ * Data structure for storing Type
+ */
+ private final Map<String, Stat> typeToStatMap = Maps.newTreeMap();
+
+ /**
+ * ThreadPool for publishing data and logging stats.
+ */
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(20);
+
+ private final CommonOptions options;
+ private final BenchmarkOptions benchmarkOptions;
+ private final RyaClient client;
+ DateTimeFormatter fsFormatter = DateTimeFormatter.ofPattern( "uuuu-MM-dd'T'HH-mm-ss" );
+ private final LocalDateTime startTime;
+ List<ScheduledFuture<?>> futureList = Lists.newArrayList();
+
+ public KafkaLatencyBenchmark(final CommonOptions options, final BenchmarkOptions benchmarkOptions) throws AccumuloException, AccumuloSecurityException {
+ this.options = Objects.requireNonNull(options);
+ this.benchmarkOptions = Objects.requireNonNull(benchmarkOptions);
+ this.client = Objects.requireNonNull(options.buildRyaClient());
+ this.startTime = LocalDateTime.now();
+
+ logger.info("Running {} with the following input parameters:\n{}\n{}", this.getClass(), options, benchmarkOptions);
+ }
+
+ @Override
+ public void close() throws Exception {
+ logger.info("Stopping threads.");
+ scheduler.shutdown();
+
+ cancelAllScheduledTasks();
+ logger.info("Waiting for all threads to terminate...");
+ scheduler.awaitTermination(1, TimeUnit.DAYS);
+ logger.info("All threads terminated.");
+ }
+
+ private void cancelAllScheduledTasks() {
+ logger.info("Canceling all tasks");
+ for(final ScheduledFuture<?> task : futureList) {
+ task.cancel(false);
+ }
+ futureList.clear();
+ }
+
+
+ public void start() throws InstanceDoesNotExistException, RyaClientException {
+ logger.info("Issuing Query");
+ String topic;
+ boolean periodic;
+ if(benchmarkOptions instanceof PeriodicQueryCommand) {
+ topic = issuePeriodicQuery((PeriodicQueryCommand) benchmarkOptions);
+ periodic = true;
+ } else {
+ topic = issueQuery();
+ periodic = false;
+ }
+ logger.info("Query Issued. Received PCJ ID: {}", topic);
+ startDataIngestTask();
+ startStatsPrinterTask();
+ startCsvPrinterTask();
+
+ if(periodic) {
+ updatePeriodicStatsFromKafka(topic); // blocking operation.
+ } else {
+ updateStatsFromKafka(topic); // blocking operation.
+ }
+
+ }
+
+ private String issueQuery() throws InstanceDoesNotExistException, RyaClientException {
+ final String sparql = "prefix time: <http://www.w3.org/2006/time#> "
+ + "select ?type (count(?obs) as ?total) where { "
+ + " ?obs <uri:hasTime> ?time. "
+ + " ?obs <uri:hasObsType> ?type "
+ + "} "
+ + "group by ?type";
+
+ logger.info("Query: {}", sparql);
+ return client.getCreatePCJ().createPCJ(options.getRyaInstance(), sparql, ImmutableSet.of(ExportStrategy.KAFKA));
+ }
+
+ private String issuePeriodicQuery(final PeriodicQueryCommand periodicOptions) throws InstanceDoesNotExistException, RyaClientException {
+ final String sparql = "prefix function: <http://org.apache.rya/function#> "
+ + "prefix time: <http://www.w3.org/2006/time#> "
+ + "select ?type (count(?obs) as ?total) where {"
+ + "Filter(function:periodic(?time, " + periodicOptions.getPeriodicQueryWindow() + ", " + periodicOptions.getPeriodicQueryPeriod() + ", time:" + periodicOptions.getPeriodicQueryTimeUnits() + ")) "
+ + "?obs <uri:hasTime> ?time. "
+ + "?obs <uri:hasObsType> ?type } "
+ + "group by ?type";
+ logger.info("Query: {}", sparql);
+ final String queryId = client.getCreatePeriodicPCJ().createPeriodicPCJ(options.getRyaInstance(), sparql, periodicOptions.getPeriodicQueryRegistrationTopic(), options.getKafkaBootstrap());
+ logger.info("Received query id: {}", queryId);
+ return queryId.substring("QUERY_".length()); // remove the QUERY_ prefix.
+ }
+
+
+ private void startDataIngestTask() {
+ final int initialPublishDelaySeconds = 1;
+
+ final LoadStatements loadCommand = client.getLoadStatements();
+
+ // initialize the stats map
+ for(int typeId = 0; typeId < benchmarkOptions.getNumTypes(); typeId++) {
+ final String type = benchmarkOptions.getTypePrefix() + typeId;
+ typeToStatMap.put(type, new Stat(type));
+ }
+
+ final LoaderTask loaderTask = new LoaderTask(benchmarkOptions.getNumIterations(),
+ benchmarkOptions.getObservationsPerTypePerIteration(), benchmarkOptions.getNumTypes(),
+ benchmarkOptions.getTypePrefix(), loadCommand, options.getRyaInstance());
+
+ final ScheduledFuture<?> loaderTaskFuture = scheduler.scheduleAtFixedRate(loaderTask, initialPublishDelaySeconds, benchmarkOptions.getIngestPeriodSeconds(), TimeUnit.SECONDS);
+ futureList.add(loaderTaskFuture);
+
+ loaderTask.setShutdownOperation(() -> {
+ cancelAllScheduledTasks();
+ });
+ }
+
+ private void startStatsPrinterTask() {
+ final Runnable statLogger = () -> {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Results\n");
+ for(final Stat s : typeToStatMap.values()) {
+ sb.append(s).append("\n");
+ }
+ logger.info("{}",sb);
+ };
+
+ final int initialPrintDelaySeconds = 11;
+
+ final ScheduledFuture<?> statLoggerFuture = scheduler.scheduleAtFixedRate(statLogger, initialPrintDelaySeconds, benchmarkOptions.getResultPeriodSeconds(), TimeUnit.SECONDS);
+ futureList.add(statLoggerFuture);
+ }
+
+ private void startCsvPrinterTask() {
+
+ final int initialPrintDelaySeconds = 11;
+ final long printPeriodSeconds = benchmarkOptions.getResultPeriodSeconds();
+
+ final Runnable csvPrinterTask = new Runnable() {
+ private final AtomicInteger printCounter = new AtomicInteger(0);
+ private final File outFile = new File(options.getOutputDirectory(), "run-" + fsFormatter.format(startTime) + ".csv");
+
+ @Override
+ public void run() {
+ final int count = printCounter.getAndIncrement();
+ final StringBuilder sb = new StringBuilder();
+ if(count == 0) {
+ sb.append("elapsed-seconds");
+ for(final Stat s : typeToStatMap.values()) {
+ sb.append(",").append(s.getCsvStringHeader());
+ }
+ sb.append("\n");
+ }
+
+ sb.append(count*printPeriodSeconds);
+ for(final Stat s : typeToStatMap.values()) {
+ sb.append(",").append(s.getCsvString());
+ }
+ sb.append("\n");
+ try {
+ FileUtils.write(outFile, sb.toString(), StandardCharsets.UTF_8, true);
+ } catch (final IOException e) {
+ logger.warn("Error writing to file " + outFile, e);
+ }
+ }
+ };
+
+ final ScheduledFuture<?> csvPrinterFuture = scheduler.scheduleAtFixedRate(csvPrinterTask, initialPrintDelaySeconds, printPeriodSeconds, TimeUnit.SECONDS);
+ futureList.add(csvPrinterFuture);
+ }
+
+
+
+ private void updateStatsFromKafka(final String topic) {
+ try (KafkaConsumer<String, VisibilityBindingSet> consumer = new KafkaConsumer<>(options.getKafkaConsumerProperties(), new StringDeserializer(), new KryoVisibilityBindingSetSerializer())) {
+ consumer.subscribe(Arrays.asList(topic));
+ while (!futureList.isEmpty()) {
+ final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(500); // check kafka at most twice a second.
+ handle(records);
+ }
+ } catch (final Exception e) {
+ logger.warn("Exception occurred", e);
+ }
+ }
+
+ private void updatePeriodicStatsFromKafka(final String topic) {
+ try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(options.getKafkaConsumerProperties(), new StringDeserializer(), new BindingSetSerDe())) {
+ consumer.subscribe(Arrays.asList(topic));
+ while (!futureList.isEmpty()) {
+ final ConsumerRecords<String, BindingSet> records = consumer.poll(500); // check kafka at most twice a second.
+ handle(records);
+ }
+ } catch (final Exception e) {
+ logger.warn("Exception occurred", e);
+ }
+ }
+
+ private void handle(final ConsumerRecords<String, ? extends BindingSet> records) {
+ if(records.count() > 0) {
+ logger.debug("Received {} records", records.count());
+ }
+ for(final ConsumerRecord<String, ? extends BindingSet> record: records){
+ final BindingSet result = record.value();
+ logger.debug("Received BindingSet: {}", result);
+
+ final String type = result.getBinding("type").getValue().stringValue();
+ final long total = Long.parseLong(result.getBinding("total").getValue().stringValue());
+
+ final Stat stat = typeToStatMap.get(type);
+ if(stat == null) {
+ logger.warn("Not expecting to receive type: {}", type);
+ } else {
+ stat.fluoTotal.set(total);
+ }
+ }
+ }
+
+ private class LoaderTask implements Runnable {
+ private final AtomicLong iterations = new AtomicLong(0);
+ private final int numIterations;
+ private final int numTypes;
+ private final String typePrefix;
+ private final long observationsPerTypePerIteration;
+
+ private final LoadStatements loadStatements;
+ private final String ryaInstanceName;
+ private Runnable shutdownOperation;
+
+ public LoaderTask(final int numIterations, final long observationsPerTypePerIteration, final int numTypes, final String typePrefix, final LoadStatements loadStatements, final String ryaInstanceName) {
+ this.numIterations = numIterations;
+ this.observationsPerTypePerIteration = observationsPerTypePerIteration;
+ this.numTypes = numTypes;
+ this.typePrefix = typePrefix;
+ this.loadStatements = loadStatements;
+ this.ryaInstanceName = ryaInstanceName;
+ }
+
+ @Override
+ public void run() {
+ try {
+ final BenchmarkStatementGenerator gen = new BenchmarkStatementGenerator();
+
+ final long i = iterations.getAndIncrement();
+ logger.info("Publishing iteration [{} of {}]", i, numIterations);
+ if(i >= numIterations) {
+ logger.info("Reached maximum iterations...");
+ shutdownOperation.run();
+ return;
+ }
+ final long observationsPerIteration = observationsPerTypePerIteration * numTypes;
+ final long iterationOffset = i * observationsPerIteration;
+ logger.info("Generating {} Observations", observationsPerIteration);
+ final Iterable<Statement> statements = gen.generate(observationsPerTypePerIteration, numTypes, typePrefix, iterationOffset, ZonedDateTime.now());
+ logger.info("Publishing {} Observations", observationsPerIteration);
+ final long t1 = System.currentTimeMillis();
+ loadStatements.loadStatements(ryaInstanceName, statements);
+ logger.info("Published {} observations in in {}s", observationsPerIteration, ((System.currentTimeMillis() - t1)/1000.0));
+ logger.info("Updating published totals...");
+ for(int typeId = 0; typeId < numTypes; typeId++) {
+ typeToStatMap.get(typePrefix + typeId).total.addAndGet(observationsPerTypePerIteration);
+ }
+ logger.info("Finished publishing.");
+ } catch (final RyaClientException e) {
+ logger.warn("Error while writing statements", e);
+ } catch (final DatatypeConfigurationException e) {
+ logger.warn("Error creating generator", e);
+ }
+
+ }
+
+ public void setShutdownOperation(final Runnable f) {
+ this.shutdownOperation = f;
+ }
+ }
+
+
+ /**
+ * Simple data structure for storing and reporting statistics for a Type.
+ */
+ private class Stat {
+ protected final AtomicLong fluoTotal = new AtomicLong(0);
+ protected final AtomicLong total = new AtomicLong(0);
+ private final String type;
+ private final LongSummaryStatistics diffStats = new LongSummaryStatistics();
+ public Stat(final String type) {
+ this.type = type;
+ }
+
+ @Override
+ public String toString() {
+ final long t = total.get();
+ final long ft = fluoTotal.get();
+ final long diff = t - ft;
+ diffStats.accept(diff);
+ return type + " published total: " + t + " fluo: " + ft + " difference: " + diff + " diffStats: " + diffStats;
+ }
+
+ public String getCsvString() {
+ final long t = total.get();
+ final long ft = fluoTotal.get();
+ final long diff = t - ft;
+ return Joiner.on(",").join(type, t, ft, diff);
+ }
+
+ public String getCsvStringHeader() {
+ return "type,published_total,fluo_total_" + type + ",difference_" + type;
+ }
+ }
+
+ public static void main(final String[] args) {
+
+ final CommonOptions options = new CommonOptions();
+ final ProjectionQueryCommand projectionCommand = new ProjectionQueryCommand();
+ final PeriodicQueryCommand periodicCommand = new PeriodicQueryCommand();
+
+ BenchmarkOptions parsedCommand = null;
+ final JCommander cli = new JCommander();
+ cli.addObject(options);
+ cli.addCommand(projectionCommand);
+ cli.addCommand(periodicCommand);
+ cli.setProgramName(KafkaLatencyBenchmark.class.getName());
+
+ try {
+ cli.parse(args);
+ final String parsedName = cli.getParsedCommand();
+ if ("periodic".equals(parsedName)) {
+ parsedCommand = periodicCommand;
+ }
+ if ("projection".equals(parsedName)) {
+ parsedCommand = projectionCommand;
+ }
+ if (parsedCommand == null) {
+ throw new ParameterException("A command must be specified.");
+ }
+ } catch (final ParameterException e) {
+ System.err.println("Error! Invalid input: " + e.getMessage());
+ cli.usage();
+ System.exit(1);
+ }
+
+ try (KafkaLatencyBenchmark benchmark = new KafkaLatencyBenchmark(options, parsedCommand)) {
+ benchmark.start();
+ } catch (final Exception e) {
+ logger.warn("Exception occured.", e);
+ }
+ }
+}