You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/11/07 15:50:44 UTC

[5/5] incubator-rya git commit: RYA-356 Added a Twill App for running the periodic service. Closes #248.

RYA-356 Added a Twill App for running the periodic service. Closes #248.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/8acd24b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/8acd24b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/8acd24b5

Branch: refs/heads/master
Commit: 8acd24b5ec477f7943453e74d753dab03be99352
Parents: b372ebc
Author: jdasch <hc...@gmail.com>
Authored: Thu Sep 7 16:57:13 2017 -0400
Committer: Caleb Meier <ca...@parsons.com>
Committed: Tue Nov 7 07:49:12 2017 -0800

----------------------------------------------------------------------
 common/rya.api/pom.xml                          |  10 +-
 .../apache/rya/api/client/LoadStatements.java   |  41 ++
 .../org/apache/rya/api/client/RyaClient.java    |  18 +-
 .../org/apache/rya/api/resolver/RyaContext.java |  65 +--
 extras/indexing/pom.xml                         |   5 +
 .../client/accumulo/AccumuloLoadStatements.java | 123 +++++
 .../accumulo/AccumuloRyaClientFactory.java      |   1 +
 .../pcj/matching/AccumuloIndexSetProvider.java  |  15 +-
 extras/periodic.notification/api/pom.xml        |  48 +-
 .../api/PeriodicNotificationClient.java         |  17 +-
 .../KafkaNotificationRegistrationClient.java    |  34 +-
 .../serialization/BindingSetSerDe.java          |   9 +-
 .../CommandNotificationSerializer.java          |  30 +-
 extras/periodic.notification/pom.xml            |  55 +--
 extras/periodic.notification/service/pom.xml    |  72 ++-
 .../PeriodicNotificationApplication.java        |  76 +++-
 ...dicNotificationApplicationConfiguration.java | 142 +++---
 .../PeriodicNotificationApplicationFactory.java |  81 ++--
 .../exporter/KafkaExporterExecutor.java         |  41 +-
 .../KafkaPeriodicBindingSetExporter.java        |  57 ++-
 .../NotificationProcessorExecutor.java          |  49 +-
 .../TimestampedNotificationProcessor.java       |  82 ++--
 .../notification/pruner/AccumuloBinPruner.java  |  28 +-
 .../notification/pruner/FluoBinPruner.java      |  26 +-
 .../pruner/PeriodicQueryPruner.java             |  64 +--
 .../pruner/PeriodicQueryPrunerExecutor.java     |  31 +-
 .../kafka/KafkaNotificationProvider.java        |  29 +-
 .../kafka/PeriodicNotificationConsumer.java     |  44 +-
 extras/periodic.notification/tests/pom.xml      |  30 +-
 .../PeriodicNotificationApplicationIT.java      |   2 +-
 .../src/test/resources/notification.properties  |  25 +-
 .../periodic.notification/twill.yarn/README.md  |  18 +
 extras/periodic.notification/twill.yarn/pom.xml |  98 ++++
 .../src/main/assembly/binary-release.xml        |  30 ++
 .../src/main/assembly/component-release.xml     | 104 +++++
 .../src/main/config/hadoop/core-site.xml        |  25 ++
 .../src/main/config/hadoop/yarn-site.xml        |  25 ++
 .../twill.yarn/src/main/config/logback.xml      |  57 +++
 .../src/main/config/notification.properties     |  67 +++
 .../twill.yarn/src/main/config/twill-env.sh     |  63 +++
 .../yarn/PeriodicNotificationTwillRunner.java   | 315 +++++++++++++
 .../scripts/periodicNotificationTwillApp.sh     |  32 ++
 extras/periodic.notification/twill/README.md    |  36 ++
 extras/periodic.notification/twill/pom.xml      | 177 ++++++++
 .../twill/PeriodicNotificationTwillApp.java     |  57 +++
 .../PeriodicNotificationTwillRunnable.java      | 119 +++++
 extras/rya.benchmark/README.md                  |  77 ++++
 extras/rya.benchmark/pom.xml                    |  25 +-
 .../src/main/assembly/binary-release.xml        |  33 ++
 .../src/main/assembly/component-release.xml     |  81 ++++
 .../src/main/config/common.options              |  44 ++
 .../src/main/config/log4j.properties            |  41 ++
 .../src/main/config/periodic.options            |  49 ++
 .../src/main/config/projection.options          |  36 ++
 .../benchmark/periodic/BenchmarkOptions.java    |  78 ++++
 .../periodic/BenchmarkStatementGenerator.java   |  90 ++++
 .../rya/benchmark/periodic/CommonOptions.java   | 117 +++++
 .../periodic/KafkaLatencyBenchmark.java         | 445 +++++++++++++++++++
 .../periodic/PeriodicQueryCommand.java          |  70 +++
 .../periodic/ProjectionQueryCommand.java        |  31 ++
 .../scripts/periodicNotificationBenchmark.sh    |  32 ++
 .../scripts/projectionNotificationBenchmark.sh  |  32 ++
 extras/rya.export/export.client/conf/config.xml |  18 +-
 .../AccumuloPeriodicQueryResultStorage.java     |  95 ++--
 .../rya.manual/src/site/markdown/pcj-updater.md |  18 +-
 extras/rya.pcj.fluo/pcj.fluo.app/pom.xml        |   5 +-
 .../pcj/fluo/app/FilterResultUpdater.java       |  14 +-
 .../app/batch/AbstractSpanBatchInformation.java |   4 +-
 .../fluo/app/batch/JoinBatchInformation.java    |   9 +-
 .../export/kafka/KafkaBindingSetExporter.java   |  14 +-
 .../kafka/KafkaBindingSetExporterFactory.java   |  14 +-
 .../export/kafka/KafkaExportParameterBase.java  |   5 +-
 .../export/kafka/KafkaRyaSubGraphExporter.java  |  23 +-
 .../kafka/KafkaRyaSubGraphExporterFactory.java  |  13 +-
 .../KryoVisibilityBindingSetSerializer.java     |  46 +-
 .../rya/RyaBindingSetExporterFactory.java       |   5 +-
 .../fluo/app/observers/QueryResultObserver.java |  37 +-
 .../pcj/fluo/app/observers/TripleObserver.java  |  15 +-
 .../indexing/pcj/fluo/app/query/FluoQuery.java  |  84 ++--
 .../fluo/app/query/FluoQueryMetadataDAO.java    |  68 +--
 .../pcj/fluo/integration/KafkaExportIT.java     |  25 +-
 .../org/apache/rya/shell/RyaAdminCommands.java  |   3 +-
 pom.xml                                         |  74 ++-
 83 files changed, 3619 insertions(+), 824 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/pom.xml
----------------------------------------------------------------------
diff --git a/common/rya.api/pom.xml b/common/rya.api/pom.xml
index 65ef381..a683507 100644
--- a/common/rya.api/pom.xml
+++ b/common/rya.api/pom.xml
@@ -32,6 +32,10 @@ under the License.
 
     <dependencies>
         <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.calrissian.mango</groupId>
             <artifactId>mango-core</artifactId>
         </dependency>
@@ -60,7 +64,6 @@ under the License.
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
-            <version>2.8.0</version>
         </dependency>
         <dependency>
             <groupId>com.github.stephenc.findbugs</groupId>
@@ -71,10 +74,9 @@ under the License.
             <artifactId>jcip-annotations</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.esotericsoftware.kryo</groupId>
+            <groupId>com.esotericsoftware</groupId>
             <artifactId>kryo</artifactId>
-            <version>2.24.0</version>
-		</dependency>
+        </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java b/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java
new file mode 100644
index 0000000..2fdb77b
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import org.openrdf.model.Statement;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Loads a set of statements into an instance of Rya.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface LoadStatements {
+
+    /**
+     * Loads a set of RDF statements into an instance of Rya.
+     *
+     * @param ryaInstanceName - The name of the Rya instance the statements will be loaded into. (not null)
+     * @param statements - An iterable of Statement objects that should be added to Rya.  (not null)
+     * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+     * @throws RyaClientException Something caused the command to fail.
+     */
+    public void loadStatements(String ryaInstanceName, Iterable<? extends Statement> statements) throws InstanceDoesNotExistException, RyaClientException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
index c04bd86..1278193 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
@@ -44,6 +44,7 @@ public class RyaClient {
     private final AddUser addUser;
     private final RemoveUser removeUser;
     private final Uninstall uninstall;
+    private final LoadStatements loadStatements;
     private final LoadStatementsFile loadStatementsFile;
     private final ExecuteSparqlQuery executeSparqlQuery;
 
@@ -64,6 +65,7 @@ public class RyaClient {
             final AddUser addUser,
             final RemoveUser removeUser,
             final Uninstall uninstall,
+            final LoadStatements loadStatements,
             final LoadStatementsFile loadStatementsFile,
             final ExecuteSparqlQuery executeSparqlQuery) {
         this.install = requireNonNull(install);
@@ -79,6 +81,7 @@ public class RyaClient {
         this.addUser = requireNonNull(addUser);
         this.removeUser = requireNonNull(removeUser);
         this.uninstall = requireNonNull(uninstall);
+        this.loadStatements = requireNonNull(loadStatements);
         this.loadStatementsFile = requireNonNull(loadStatementsFile);
         this.executeSparqlQuery = requireNonNull(executeSparqlQuery);
     }
@@ -105,10 +108,10 @@ public class RyaClient {
     public DeletePCJ getDeletePCJ() {
         return deletePcj;
     }
-    
+
     /**
      * @return An instance of {@link CreatePeridodicPCJ} that is connected to a Rya Periodic Storage
-     */ 
+     */
     public CreatePeriodicPCJ getCreatePeriodicPCJ() {
         return createPeriodicPcj;
     }
@@ -119,7 +122,7 @@ public class RyaClient {
     public DeletePeriodicPCJ getDeletePeriodicPCJ() {
         return deletePeriodicPcj;
     }
-    
+
     /**
      * @return An instance of {@link ListIncrementalQueries} for displaying queries that are incrementallly
      * maintained by the Rya instance
@@ -127,7 +130,7 @@ public class RyaClient {
     public ListIncrementalQueries getListIncrementalQueries() {
         return listIncrementalQueries;
     }
-    
+
     /**
      * @return An instance of {@link BatchUpdatePCJ} that is connect to a Rya storage
      *   if the Rya instance supports PCJ indexing.
@@ -179,6 +182,13 @@ public class RyaClient {
     }
 
     /**
+     * @return An instance of {@link LoadStatements} that is connected to a Rya storage.
+     */
+    public LoadStatements getLoadStatements() {
+        return loadStatements;
+    }
+
+    /**
      * @return An instance of {@link LoadStatementsFile} that is connected to a Rya storage.
      */
     public LoadStatementsFile getLoadStatementsFile() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java b/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java
index d19226e..49fc5d1 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java
@@ -8,9 +8,9 @@ package org.apache.rya.api.resolver;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,11 +39,10 @@ import org.apache.rya.api.resolver.impl.RyaTypeResolverImpl;
 import org.apache.rya.api.resolver.impl.RyaURIResolver;
 import org.apache.rya.api.resolver.impl.ServiceBackedRyaTypeResolverMappings;
 import org.apache.rya.api.resolver.impl.ShortRyaTypeResolver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.openrdf.model.URI;
 import org.openrdf.model.vocabulary.XMLSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Date: 7/16/12
@@ -51,10 +50,10 @@ import org.openrdf.model.vocabulary.XMLSchema;
  */
 public class RyaContext {
 
-    public Log logger = LogFactory.getLog(RyaContext.class);
+    public Logger logger = LoggerFactory.getLogger(RyaContext.class);
 
-    private Map<URI, RyaTypeResolver> uriToResolver = new HashMap<URI, RyaTypeResolver>();
-    private Map<Byte, RyaTypeResolver> byteToResolver = new HashMap<Byte, RyaTypeResolver>();
+    private final Map<URI, RyaTypeResolver> uriToResolver = new HashMap<URI, RyaTypeResolver>();
+    private final Map<Byte, RyaTypeResolver> byteToResolver = new HashMap<Byte, RyaTypeResolver>();
     private RyaTypeResolver defaultResolver = new CustomDatatypeResolver();
 
     private RyaContext() {
@@ -91,47 +90,51 @@ public class RyaContext {
     public synchronized static RyaContext getInstance() {
         return RyaContextHolder.INSTANCE;
     }
-    
+
 
     //need to go from datatype->resolver
-    public RyaTypeResolver retrieveResolver(URI datatype) {
-        RyaTypeResolver ryaTypeResolver = uriToResolver.get(datatype);
-        if (ryaTypeResolver == null) return defaultResolver;
+    public RyaTypeResolver retrieveResolver(final URI datatype) {
+        final RyaTypeResolver ryaTypeResolver = uriToResolver.get(datatype);
+        if (ryaTypeResolver == null) {
+            return defaultResolver;
+        }
         return ryaTypeResolver;
     }
 
     //need to go from byte->resolver
-    public RyaTypeResolver retrieveResolver(byte markerByte) {
-        RyaTypeResolver ryaTypeResolver = byteToResolver.get(markerByte);
-        if (ryaTypeResolver == null) return defaultResolver;
+    public RyaTypeResolver retrieveResolver(final byte markerByte) {
+        final RyaTypeResolver ryaTypeResolver = byteToResolver.get(markerByte);
+        if (ryaTypeResolver == null) {
+            return defaultResolver;
+        }
         return ryaTypeResolver;
     }
 
-    public byte[] serialize(RyaType ryaType) throws RyaTypeResolverException {
-        RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType());
+    public byte[] serialize(final RyaType ryaType) throws RyaTypeResolverException {
+        final RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType());
         if (ryaTypeResolver != null) {
             return ryaTypeResolver.serialize(ryaType);
         }
         return null;
     }
 
-    public byte[][] serializeType(RyaType ryaType) throws RyaTypeResolverException {
-        RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType());
+    public byte[][] serializeType(final RyaType ryaType) throws RyaTypeResolverException {
+        final RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType());
         if (ryaTypeResolver != null) {
             return ryaTypeResolver.serializeType(ryaType);
         }
         return null;
     }
 
-    public RyaType deserialize(byte[] bytes) throws RyaTypeResolverException {
-        RyaTypeResolver ryaTypeResolver = retrieveResolver(bytes[bytes.length - 1]);
+    public RyaType deserialize(final byte[] bytes) throws RyaTypeResolverException {
+        final RyaTypeResolver ryaTypeResolver = retrieveResolver(bytes[bytes.length - 1]);
         if (ryaTypeResolver != null) {
             return ryaTypeResolver.deserialize(bytes);
         }
         return null;
     }
 
-    public void addRyaTypeResolverMapping(RyaTypeResolverMapping mapping) {
+    public void addRyaTypeResolverMapping(final RyaTypeResolverMapping mapping) {
         if (!uriToResolver.containsKey(mapping.getRyaDataType())) {
             if (logger.isDebugEnabled()) {
                 logger.debug("addRyaTypeResolverMapping uri:[" + mapping.getRyaDataType() + "] byte:[" + mapping.getMarkerByte() + "] for mapping[" + mapping + "]");
@@ -143,14 +146,14 @@ public class RyaContext {
         }
     }
 
-    public void addRyaTypeResolverMappings(List<RyaTypeResolverMapping> mappings) {
-        for (RyaTypeResolverMapping mapping : mappings) {
+    public void addRyaTypeResolverMappings(final List<RyaTypeResolverMapping> mappings) {
+        for (final RyaTypeResolverMapping mapping : mappings) {
             addRyaTypeResolverMapping(mapping);
         }
     }
 
-    public RyaTypeResolver removeRyaTypeResolver(URI dataType) {
-        RyaTypeResolver ryaTypeResolver = uriToResolver.remove(dataType);
+    public RyaTypeResolver removeRyaTypeResolver(final URI dataType) {
+        final RyaTypeResolver ryaTypeResolver = uriToResolver.remove(dataType);
         if (ryaTypeResolver != null) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Removing ryaType Resolver uri[" + dataType + "] + [" + ryaTypeResolver + "]");
@@ -161,8 +164,8 @@ public class RyaContext {
         return null;
     }
 
-    public RyaTypeResolver removeRyaTypeResolver(byte markerByte) {
-        RyaTypeResolver ryaTypeResolver = byteToResolver.remove(markerByte);
+    public RyaTypeResolver removeRyaTypeResolver(final byte markerByte) {
+        final RyaTypeResolver ryaTypeResolver = byteToResolver.remove(markerByte);
         if (ryaTypeResolver != null) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Removing ryaType Resolver byte[" + markerByte + "] + [" + ryaTypeResolver + "]");
@@ -174,8 +177,8 @@ public class RyaContext {
     }
 
     //transform range
-    public RyaRange transformRange(RyaRange range) throws RyaTypeResolverException {
-        RyaTypeResolver ryaTypeResolver = retrieveResolver(range.getStart().getDataType());
+    public RyaRange transformRange(final RyaRange range) throws RyaTypeResolverException {
+        final RyaTypeResolver ryaTypeResolver = retrieveResolver(range.getStart().getDataType());
         if (ryaTypeResolver != null) {
             return ryaTypeResolver.transformRange(range);
         }
@@ -186,7 +189,7 @@ public class RyaContext {
         return defaultResolver;
     }
 
-    public void setDefaultResolver(RyaTypeResolver defaultResolver) {
+    public void setDefaultResolver(final RyaTypeResolver defaultResolver) {
         this.defaultResolver = defaultResolver;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml
index 6fe35e9..7f3901e 100644
--- a/extras/indexing/pom.xml
+++ b/extras/indexing/pom.xml
@@ -149,6 +149,11 @@
                 <artifactId>maven-shade-plugin</artifactId>
                 <executions>
                     <execution>
+                        <id>map-reduce</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
                         <configuration>
                             <shadedArtifactAttached>true</shadedArtifactAttached>
                             <shadedClassifierName>map-reduce</shadedClassifierName>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java
new file mode 100644
index 0000000..9556bcf
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.LoadStatements;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.openrdf.model.Statement;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An Accumulo implementation of the {@link LoadStatements} command.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloLoadStatements extends AccumuloCommand implements LoadStatements {
+    private static final Logger log = Logger.getLogger(AccumuloLoadStatements.class);
+
+    private final InstanceExists instanceExists;
+
+    /**
+     * Constructs an instance of {@link AccumuloLoadStatements}.
+     *
+     * @param connectionDetails - Details about the values that were used to create
+     *   the connector to the cluster. (not null)
+     * @param connector - Provides programmatic access to the instance of Accumulo
+     *   that hosts Rya instance. (not null)
+     */
+    public AccumuloLoadStatements(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+        super(connectionDetails, connector);
+        instanceExists = new AccumuloInstanceExists(connectionDetails, connector);
+    }
+
+    @Override
+    public void loadStatements(final String ryaInstanceName, final Iterable<? extends Statement> statements) throws InstanceDoesNotExistException, RyaClientException {
+        requireNonNull(ryaInstanceName);
+        requireNonNull(statements);
+
+        // Ensure the Rya Instance exists.
+        if(!instanceExists.exists(ryaInstanceName)) {
+            throw new InstanceDoesNotExistException(String.format("There is no Rya instance named '%s'.", ryaInstanceName));
+        }
+
+        Sail sail = null;
+        SailRepository sailRepo = null;
+        SailRepositoryConnection sailRepoConn = null;
+
+        try {
+            // Get a Sail object that is connected to the Rya instance.
+            final AccumuloRdfConfiguration ryaConf = getAccumuloConnectionDetails().buildAccumuloRdfConfiguration(ryaInstanceName);
+            ryaConf.setFlush(false); //RYA-327 should address this hardcoded value.
+            sail = RyaSailFactory.getInstance(ryaConf);
+
+            // Load the file.
+            sailRepo = new SailRepository(sail);
+            sailRepoConn = sailRepo.getConnection();
+            sailRepoConn.add(statements);
+
+        } catch (final SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException  e) {
+            log.warn("Exception while loading:", e);
+            throw new RyaClientException("A problem connecting to the Rya instance named '" + ryaInstanceName + "' has caused the load to fail.", e);
+        } catch (final Exception e) {
+            log.warn("Exception while loading:", e);
+            throw new RyaClientException("A problem processing the RDF statements has caused the load into Rya instance named " + ryaInstanceName + "to fail.", e);
+        } finally {
+            // Shut it all down.
+            if(sailRepoConn != null) {
+                try {
+                    sailRepoConn.close();
+                } catch (final RepositoryException e) {
+                    log.warn("Couldn't close the SailRepoConnection that is attached to the Rya instance.", e);
+                }
+            }
+            if(sailRepo != null) {
+                try {
+                    sailRepo.shutDown();
+                } catch (final RepositoryException e) {
+                    log.warn("Couldn't shut down the SailRepository that is attached to the Rya instance.", e);
+                }
+            }
+            if(sail != null) {
+                try {
+                    sail.shutDown();
+                } catch (final SailException e) {
+                    log.warn("Couldn't shut down the Sail that is attached to the Rya instance.", e);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index d9bf644..fcc712c 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -62,6 +62,7 @@ public class AccumuloRyaClientFactory {
                 new AccumuloAddUser(connectionDetails, connector),
                 new AccumuloRemoveUser(connectionDetails, connector),
                 new AccumuloUninstall(connectionDetails, connector),
+                new AccumuloLoadStatements(connectionDetails, connector),
                 new AccumuloLoadStatementsFile(connectionDetails, connector),
                 new AccumuloExecuteSparqlQuery(connectionDetails, connector));
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
index 40e2c77..4a15665 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
@@ -18,12 +18,11 @@
  */
 package org.apache.rya.indexing.pcj.matching;
 
-import static java.util.Objects.requireNonNull;
-
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -73,13 +72,11 @@ public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTup
     private boolean init = false;
 
     public AccumuloIndexSetProvider(final Configuration conf) {
-        Preconditions.checkNotNull(conf);
-        this.conf = conf;
+        this.conf = Objects.requireNonNull(conf);
     }
 
     public AccumuloIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) {
-        Preconditions.checkNotNull(conf);
-        this.conf = conf;
+        this(conf);
         indexCache = indices;
         init = true;
     }
@@ -155,9 +152,9 @@ public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTup
      */
     private List<ExternalTupleSet> getAccIndices() throws Exception {
 
-        requireNonNull(conf);
-        final String tablePrefix = requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX));
-        final Connector conn = requireNonNull(ConfigUtils.getConnector(conf));
+        Objects.requireNonNull(conf);
+        final String tablePrefix = Objects.requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX));
+        final Connector conn = Objects.requireNonNull(ConfigUtils.getConnector(conf));
         List<String> tables = null;
 
         if (conf instanceof RdfCloudTripleStoreConfiguration) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/pom.xml b/extras/periodic.notification/api/pom.xml
index 9f62e73..01d5c60 100644
--- a/extras/periodic.notification/api/pom.xml
+++ b/extras/periodic.notification/api/pom.xml
@@ -1,15 +1,23 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
-        contributor license agreements. See the NOTICE file distributed with this 
-        work for additional information regarding copyright ownership. The ASF licenses 
-        this file to you under the Apache License, Version 2.0 (the "License"); you 
-        may not use this file except in compliance with the License. You may obtain 
-        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
-        required by applicable law or agreed to in writing, software distributed 
-        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
-        OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
-        the specific language governing permissions and limitations under the License. -->
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.rya</groupId>
@@ -23,16 +31,13 @@
     <description>API for Periodic Notification Applications</description>
 
     <dependencies>
-
         <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-            <version>2.8.0</version>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
         </dependency>
         <dependency>
             <groupId>org.openrdf.sesame</groupId>
@@ -46,6 +51,13 @@
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.indexing.pcj</artifactId>
         </dependency>
+        
+        <!--  testing dependencies  -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
index ff08733..5a473d2 100644
--- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
@@ -36,20 +36,20 @@ public interface PeriodicNotificationClient extends AutoCloseable {
      * Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor}
      * @param notification - notification to be added
      */
-    public void addNotification(PeriodicNotification notification);
-    
+    void addNotification(PeriodicNotification notification);
+
     /**
      * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
      * @param notification - notification to be deleted
      */
-    public void deleteNotification(BasicNotification notification);
-    
+    void deleteNotification(BasicNotification notification);
+
     /**
      * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
      * @param notification - id corresponding to the notification to be deleted
      */
-    public void deleteNotification(String notificationId);
-    
+    void deleteNotification(String notificationId);
+
     /**
      * Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor}
      * @param id - Periodic Query id
@@ -57,8 +57,5 @@ public interface PeriodicNotificationClient extends AutoCloseable {
      * @param delay - initial delay for starting periodic notifications
      * @param unit - time unit of delay and period
      */
-    public void addNotification(String id, long period, long delay, TimeUnit unit);
-    
-    public void close();
-    
+    void addNotification(String id, long period, long delay, TimeUnit unit);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
index bb438be..b022d3e 100644
--- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
@@ -31,50 +31,50 @@ import org.apache.rya.periodic.notification.notification.PeriodicNotification;
 
 /**
  *  Implementation of {@link PeriodicNotificaitonClient} used to register new notification
- *  requests with the PeriodicQueryService. 
+ *  requests with the PeriodicQueryService.
  *
  */
 public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient {
 
-    private KafkaProducer<String, CommandNotification> producer;
-    private String topic;
-    
-    public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) {
+    private final KafkaProducer<String, CommandNotification> producer;
+    private final String topic;
+
+    public KafkaNotificationRegistrationClient(final String topic, final KafkaProducer<String, CommandNotification> producer) {
         this.topic = topic;
         this.producer = producer;
     }
-    
+
     @Override
-    public void addNotification(PeriodicNotification notification) {
+    public void addNotification(final PeriodicNotification notification) {
         processNotification(new CommandNotification(Command.ADD, notification));
 
     }
 
     @Override
-    public void deleteNotification(BasicNotification notification) {
+    public void deleteNotification(final BasicNotification notification) {
         processNotification(new CommandNotification(Command.DELETE, notification));
     }
 
     @Override
-    public void deleteNotification(String notificationId) {
+    public void deleteNotification(final String notificationId) {
         processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId)));
     }
 
     @Override
-    public void addNotification(String id, long period, long delay, TimeUnit unit) {
-        Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build();
+    public void addNotification(final String id, final long period, final long delay, final TimeUnit unit) {
+        final Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build();
         processNotification(new CommandNotification(Command.ADD, notification));
     }
-    
-   
-    private void processNotification(CommandNotification notification) {
+
+
+    private void processNotification(final CommandNotification notification) {
         producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification));
     }
-    
+
     @Override
     public void close() {
+        // TODO scoping issue.  If we're closing this producer, we should also create it - otherwise other classes may be using it
+        // or we shouldn't implement autocloseable.
         producer.close();
     }
-    
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
index 129bd6d..6db7b18 100644
--- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
@@ -25,12 +25,13 @@ import java.util.Map;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
 import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.primitives.Bytes;
@@ -42,7 +43,7 @@ import com.google.common.primitives.Bytes;
  */
 public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> {
 
-    private static final Logger log = Logger.getLogger(BindingSetSerDe.class);
+    private static final Logger log = LoggerFactory.getLogger(BindingSetSerDe.class);
     private static final AccumuloPcjSerializer serializer =  new AccumuloPcjSerializer();
     private static final byte[] DELIM_BYTE = "\u0002".getBytes(StandardCharsets.UTF_8);
 
@@ -60,7 +61,7 @@ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<Bin
             final int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE);
             final byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex);
             final byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length);
-            final VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";"));
+            final VariableOrder varOrder = new VariableOrder(new String(varOrderBytes, StandardCharsets.UTF_8).split(";"));
             return getBindingSet(varOrder, bsBytesNoVarOrder);
         } catch(final Exception e) {
             log.trace("Unable to deserialize BindingSet: " + bsBytes);
@@ -75,7 +76,7 @@ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<Bin
     private byte[] getBytes(final VariableOrder varOrder, final BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException {
         final byte[] bsBytes = serializer.convert(bs, varOrder);
         final String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders());
-        final byte[] varOrderBytes = varOrderString.getBytes("UTF-8");
+        final byte[] varOrderBytes = varOrderString.getBytes(StandardCharsets.UTF_8);
         return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
index 302e1be..13c789f 100644
--- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
@@ -18,7 +18,7 @@
  */
 package org.apache.rya.periodic.notification.serialization;
 
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 import org.apache.kafka.common.serialization.Deserializer;
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParseException;
 
 /**
  * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s
@@ -43,22 +44,23 @@ public class CommandNotificationSerializer implements Serializer<CommandNotifica
     private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class);
 
     @Override
-    public CommandNotification deserialize(String topic, byte[] bytes) {
-        String json = null;
+    public CommandNotification deserialize(final String topic, final byte[] bytes) {
         try {
-            json = new String(bytes, "UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            LOG.info("Unable to deserialize notification for topic: " + topic);
+            final String json = new String(bytes, StandardCharsets.UTF_8);
+            return gson.fromJson(json, CommandNotification.class);
+        } catch (final JsonParseException e) {
+            LOG.warn("Unable to deserialize notification for topic: " + topic);
+            throw new RuntimeException(e);
         }
-        return gson.fromJson(json, CommandNotification.class);
+
     }
 
     @Override
-    public byte[] serialize(String topic, CommandNotification command) {
+    public byte[] serialize(final String topic, final CommandNotification command) {
         try {
-            return gson.toJson(command).getBytes("UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            LOG.info("Unable to serialize notification: " + command  + "for topic: " + topic);
+            return gson.toJson(command).getBytes(StandardCharsets.UTF_8);
+        } catch (final JsonParseException e) {
+            LOG.warn("Unable to serialize notification: " + command  + "for topic: " + topic);
             throw new RuntimeException(e);
         }
     }
@@ -67,10 +69,10 @@ public class CommandNotificationSerializer implements Serializer<CommandNotifica
     public void close() {
         // Do nothing. Nothing to close
     }
-    
+
     @Override
-    public void configure(Map<String, ?> arg0, boolean arg1) {
+    public void configure(final Map<String, ?> arg0, final boolean arg1) {
         // Do nothing. Nothing to configure
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/pom.xml b/extras/periodic.notification/pom.xml
index c49db73..7610b88 100644
--- a/extras/periodic.notification/pom.xml
+++ b/extras/periodic.notification/pom.xml
@@ -1,40 +1,43 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+    http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
 -->
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>rya.periodic.notification.parent</artifactId>
-  
-   <parent>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
         <groupId>org.apache.rya</groupId>
         <artifactId>rya.extras</artifactId>
         <version>3.2.12-incubating-SNAPSHOT</version>
     </parent>
-  
-  <name>Apache Rya Periodic Notification Parent</name>
-  <description>Parent POM for Rya Periodic Notification Projects</description>
-  
-  <packaging>pom</packaging>
+
+    <artifactId>rya.periodic.notification.parent</artifactId>
+
+    <name>Apache Rya Periodic Notification Parent</name>
+    <description>Parent POM for Rya Periodic Notification Projects</description>
+
+    <packaging>pom</packaging>
 
     <modules>
         <module>api</module>
         <module>service</module>
+        <module>twill</module>
+        <module>twill.yarn</module>
         <module>tests</module>
     </modules>
-  
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/pom.xml b/extras/periodic.notification/service/pom.xml
index 2e61733..18aef13 100644
--- a/extras/periodic.notification/service/pom.xml
+++ b/extras/periodic.notification/service/pom.xml
@@ -1,15 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
-    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
-        contributor license agreements. See the NOTICE file distributed with this 
-        work for additional information regarding copyright ownership. The ASF licenses 
-        this file to you under the Apache License, Version 2.0 (the "License"); you 
-        may not use this file except in compliance with the License. You may obtain 
-        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
-        required by applicable law or agreed to in writing, software distributed 
-        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
-        OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
-        the specific language governing permissions and limitations under the License. -->
     <parent>
         <groupId>org.apache.rya</groupId>
         <artifactId>rya.periodic.notification.parent</artifactId>
@@ -22,42 +31,20 @@
     <description>Notifications for Rya Periodic Service</description>
 
     <dependencies>
-
+        <!--  compile dependencies -->
         <dependency>
-            <groupId>org.apache.twill</groupId>
-            <artifactId>twill-api</artifactId>
-            <version>0.11.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.twill</groupId>
-            <artifactId>twill-yarn</artifactId>
-            <version>0.11.0</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>kafka_2.10</artifactId>
-                    <groupId>org.apache.kafka</groupId>
-                </exclusion>
-            </exclusions>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
-            <version>2.8.0</version>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.fluo</groupId>
             <artifactId>fluo-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-core</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.indexing</artifactId>
         </dependency>
@@ -71,13 +58,22 @@
         </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>
-            <artifactId>rya.pcj.fluo.app</artifactId>
+            <artifactId>rya.periodic.notification.api</artifactId>
         </dependency>
+        
+        <!--  runtime dependencies -->
         <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.periodic.notification.api</artifactId>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-core</artifactId>
+            <scope>runtime</scope>
         </dependency>
 
+        <!--  testing dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
index 92a7d18..79abe2f 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
@@ -18,7 +18,10 @@
  */
 package org.apache.rya.periodic.notification.application;
 
-import org.apache.log4j.Logger;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.periodic.notification.api.BinPruner;
 import org.apache.rya.periodic.notification.api.BindingSetRecord;
@@ -30,6 +33,8 @@ import org.apache.rya.periodic.notification.processor.NotificationProcessorExecu
 import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
 import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
 import org.openrdf.query.algebra.evaluation.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -68,36 +73,40 @@ import com.google.common.base.Preconditions;
  * the period at which the user wants to receive updates, and the time unit. The
  * following query requests all observations that occurred within the last
  * minute and requests updates every 15 seconds. It also performs a count on
- * those observations. <br>
- * <br>
- * <li>prefix function: http://org.apache.rya/function#
- * <li>"prefix time: http://www.w3.org/2006/time#
- * <li>"select (count(?obs) as ?total) where {
- * <li>"Filter(function:periodic(?time, 1, .25, time:minutes))
- * <li>"?obs uri:hasTime ?time.
- * <li>"?obs uri:hasId ?id }
- * <li>
+ * those observations.
+ * <p>
+ * <pre>
+ * PREFIX function: http://org.apache.rya/function#
+ * PREFIX time: http://www.w3.org/2006/time#
+ * SELECT (count(?obs) as ?total) WHERE {
+ *     FILTER (function:periodic(?time, 1, .25, time:minutes))
+ *     ?obs uri:hasTime ?time.
+ *     ?obs uri:hasId ?id
+ * }
+ * </pre>
  */
 public class PeriodicNotificationApplication implements LifeCycle {
 
-    private static final Logger log = Logger.getLogger(PeriodicNotificationApplication.class);
-    private NotificationCoordinatorExecutor coordinator;
-    private KafkaNotificationProvider provider;
-    private PeriodicQueryPrunerExecutor pruner;
-    private NotificationProcessorExecutor processor;
-    private KafkaExporterExecutor exporter;
+    private static final Logger log = LoggerFactory.getLogger(PeriodicNotificationApplication.class);
+    private final NotificationCoordinatorExecutor coordinator;
+    private final KafkaNotificationProvider provider;
+    private final PeriodicQueryPrunerExecutor pruner;
+    private final NotificationProcessorExecutor processor;
+    private final KafkaExporterExecutor exporter;
     private boolean running = false;
+    private Optional<CompletableFuture<Void>> finished = Optional.empty();
+
 
     /**
      * Creates a PeriodicNotificationApplication
-     * @param provider - {@link KafkaNotificationProvider} that retrieves new Notificaiton requests from Kafka
+     * @param provider - {@link KafkaNotificationProvider} that retrieves new Notification requests from Kafka
      * @param coordinator - {NotificationCoordinator} that manages PeriodicNotifications.
      * @param processor - {@link NotificationProcessorExecutor} that processes PeriodicNotifications
      * @param exporter - {@link KafkaExporterExecutor} that exports periodic results
      * @param pruner - {@link PeriodicQueryPrunerExecutor} that cleans up old periodic bins
      */
-    public PeriodicNotificationApplication(KafkaNotificationProvider provider, NotificationCoordinatorExecutor coordinator,
-            NotificationProcessorExecutor processor, KafkaExporterExecutor exporter, PeriodicQueryPrunerExecutor pruner) {
+    public PeriodicNotificationApplication(final KafkaNotificationProvider provider, final NotificationCoordinatorExecutor coordinator,
+            final NotificationProcessorExecutor processor, final KafkaExporterExecutor exporter, final PeriodicQueryPrunerExecutor pruner) {
         this.provider = Preconditions.checkNotNull(provider);
         this.coordinator = Preconditions.checkNotNull(coordinator);
         this.processor = Preconditions.checkNotNull(processor);
@@ -115,18 +124,37 @@ public class PeriodicNotificationApplication implements LifeCycle {
             pruner.start();
             exporter.start();
             running = true;
+            finished = Optional.of(new CompletableFuture<>());
+        }
+    }
+
+    /**
+     * Blocks the current thread until another thread has called the {@link #stop()}.
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws IllegalStateException
+     */
+    public void blockUntilFinished() throws ExecutionException, InterruptedException, IllegalStateException {
+        if(finished.isPresent()) {
+            finished.get().get();
+        } else {
+            throw new IllegalStateException("Cannot block if the application has not been started yet");
         }
     }
 
     @Override
     public void stop() {
         log.info("Stopping PeriodicNotificationApplication.");
+        if(!finished.isPresent()) {
+            throw new IllegalStateException("Cannot stop if the application has not been started yet");
+        }
         provider.stop();
         coordinator.stop();
         processor.stop();
         pruner.stop();
         exporter.stop();
         running = false;
+        finished.get().complete(null);
     }
 
     /**
@@ -154,7 +182,7 @@ public class PeriodicNotificationApplication implements LifeCycle {
          * @param pruner - PeriodicQueryPrunerExecutor for cleaning up old periodic bins
          * @return this Builder for chaining method calls
          */
-        public Builder setPruner(PeriodicQueryPrunerExecutor pruner) {
+        public Builder setPruner(final PeriodicQueryPrunerExecutor pruner) {
             this.pruner = pruner;
             return this;
         }
@@ -164,12 +192,12 @@ public class PeriodicNotificationApplication implements LifeCycle {
          * @param provider - KafkaNotificationProvider for retrieving new periodic notification requests from Kafka
          * @return this Builder for chaining method calls
          */
-        public Builder setProvider(KafkaNotificationProvider provider) {
+        public Builder setProvider(final KafkaNotificationProvider provider) {
             this.provider = provider;
             return this;
         }
 
-        public Builder setProcessor(NotificationProcessorExecutor processor) {
+        public Builder setProcessor(final NotificationProcessorExecutor processor) {
             this.processor = processor;
             return this;
         }
@@ -179,7 +207,7 @@ public class PeriodicNotificationApplication implements LifeCycle {
          * @param exporter for exporting periodic query results to Kafka
          * @return this Builder for chaining method calls
          */
-        public Builder setExporter(KafkaExporterExecutor exporter) {
+        public Builder setExporter(final KafkaExporterExecutor exporter) {
             this.exporter = exporter;
             return this;
         }
@@ -189,7 +217,7 @@ public class PeriodicNotificationApplication implements LifeCycle {
          * @param coordinator for managing and generating periodic notifications
          * @return this Builder for chaining method calls
          */
-        public Builder setCoordinator(NotificationCoordinatorExecutor coordinator) {
+        public Builder setCoordinator(final NotificationCoordinatorExecutor coordinator) {
             this.coordinator = coordinator;
             return this;
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
index ff58979..9f0631d 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
@@ -18,35 +18,36 @@
  */
 package org.apache.rya.periodic.notification.application;
 
+import java.util.Objects;
 import java.util.Properties;
 
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 
-import com.google.common.base.Preconditions;
 
 /**
  * Configuration object for creating a {@link PeriodicNotificationApplication}.
  */
 public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfConfiguration {
 
-    public static String FLUO_APP_NAME = "fluo.app.name";
-    public static String FLUO_TABLE_NAME = "fluo.table.name";
-    public static String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
-    public static String NOTIFICATION_TOPIC = "kafka.notification.topic";
-    public static String NOTIFICATION_GROUP_ID = "kafka.notification.group.id";
-    public static String NOTIFICATION_CLIENT_ID = "kafka.notification.client.id";
-    public static String COORDINATOR_THREADS = "cep.coordinator.threads";
-    public static String PRODUCER_THREADS = "cep.producer.threads";
-    public static String EXPORTER_THREADS = "cep.exporter.threads";
-    public static String PROCESSOR_THREADS = "cep.processor.threads";
-    public static String PRUNER_THREADS = "cep.pruner.threads";
-    
+    public static final String RYA_PERIODIC_PREFIX = "rya.periodic.notification.";
+    public static final String RYA_PCJ_PREFIX = "rya.pcj.";
+    public static final String FLUO_APP_NAME = RYA_PCJ_PREFIX +"fluo.app.name";
+    public static final String FLUO_TABLE_NAME = RYA_PCJ_PREFIX + "fluo.table.name";
+    public static final String KAFKA_BOOTSTRAP_SERVERS = RYA_PERIODIC_PREFIX + "kafka.bootstrap.servers";
+    public static final String NOTIFICATION_TOPIC = RYA_PERIODIC_PREFIX + "kafka.topic";
+    public static final String NOTIFICATION_GROUP_ID = RYA_PERIODIC_PREFIX + "kafka.group.id";
+    public static final String NOTIFICATION_CLIENT_ID = RYA_PERIODIC_PREFIX + "kafka.client.id";
+    public static final String COORDINATOR_THREADS = RYA_PERIODIC_PREFIX + "coordinator.threads";
+    public static final String PRODUCER_THREADS = RYA_PERIODIC_PREFIX + "producer.threads";
+    public static final String EXPORTER_THREADS = RYA_PERIODIC_PREFIX + "exporter.threads";
+    public static final String PROCESSOR_THREADS = RYA_PERIODIC_PREFIX + "processor.threads";
+    public static final String PRUNER_THREADS = RYA_PERIODIC_PREFIX + "pruner.threads";
+
     public PeriodicNotificationApplicationConfiguration() {}
-    
+
     /**
      * Creates an PeriodicNotificationApplicationConfiguration object from a Properties file.  This method assumes
      * that all values in the Properties file are Strings and that the Properties file uses the keys below.
-     * See rya.cep/cep.integration.tests/src/test/resources/properties/notification.properties for an example.
      * <br>
      * <ul>
      * <li>"accumulo.auths" - String of Accumulo authorizations. Default is empty String.
@@ -55,23 +56,22 @@ public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfCon
      * <li>"accumulo.password" - Accumulo password (required)
      * <li>"accumulo.rya.prefix" - Prefix for Accumulo backed Rya instance.  Default is "rya_"
      * <li>"accumulo.zookeepers" - Zookeepers for underlying Accumulo instance (required)
-     * <li>"fluo.app.name" - Name of Fluo Application (required)
-     * <li>"fluo.table.name" - Name of Fluo Table (required)
-     * <li>"kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers and Consumers (required)
-     * <li>"kafka.notification.topic" - Topic to which new Periodic Notifications are published. Default is "notifications".
-     * <li>"kafka.notification.client.id" - Client Id for notification topic.  Default is "consumer0"
-     * <li>"kafka.notification.group.id" - Group Id for notification topic.  Default is "group0"
-     * <li>"cep.coordinator.threads" - Number of threads used by coordinator. Default is 1.
-     * <li>"cep.producer.threads" - Number of threads used by producer.  Default is 1.
-     * <li>"cep.exporter.threads" - Number of threads used by exporter.  Default is 1.
-     * <li>"cep.processor.threads" - Number of threads used by processor.  Default is 1.
-     * <li>"cep.pruner.threads" - Number of threads used by pruner.  Default is 1.
+     * <li>"rya.pcj.fluo.app.name" - Name of Fluo Application (required)
+     * <li>"rya.pcj.fluo.table.name" - Name of Fluo Table (required)
+     * <li>"rya.periodic.notification.kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers and Consumers (required)
+     * <li>"rya.periodic.notification.kafka.topic" - Topic to which new Periodic Notifications are published. Default is "notifications".
+     * <li>"rya.periodic.notification.kafka.client.id" - Client Id for notification topic.  Default is "consumer0"
+     * <li>"rya.periodic.notification.kafka.group.id" - Group Id for notification topic.  Default is "group0"
+     * <li>"rya.periodic.notification.coordinator.threads" - Number of threads used by coordinator. Default is 1.
+     * <li>"rya.periodic.notification.producer.threads" - Number of threads used by producer.  Default is 1.
+     * <li>"rya.periodic.notification.exporter.threads" - Number of threads used by exporter.  Default is 1.
+     * <li>"rya.periodic.notification.processor.threads" - Number of threads used by processor.  Default is 1.
+     * <li>"rya.periodic.notification.pruner.threads" - Number of threads used by pruner.  Default is 1.
      * </ul>
      * <br>
      * @param props - Properties file containing Accumulo specific configuration parameters
-     * @return AccumumuloRdfConfiguration with properties set
      */
-    public PeriodicNotificationApplicationConfiguration(Properties props) {
+    public PeriodicNotificationApplicationConfiguration(final Properties props) {
        super(fromProperties(props));
        setFluoAppName(props.getProperty(FLUO_APP_NAME));
        setFluoTableName(props.getProperty(FLUO_TABLE_NAME));
@@ -85,170 +85,170 @@ public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfCon
        setPrunerThreads(Integer.parseInt(props.getProperty(PRUNER_THREADS, "1")));
        setCoordinatorThreads(Integer.parseInt(props.getProperty(COORDINATOR_THREADS, "1")));
     }
-    
+
     /**
      * Sets the name of the Fluo Application
-     * @param fluoAppName 
+     * @param fluoAppName
      */
-    public void setFluoAppName(String fluoAppName) {
-        set(FLUO_APP_NAME, Preconditions.checkNotNull(fluoAppName));
+    public void setFluoAppName(final String fluoAppName) {
+        set(FLUO_APP_NAME, Objects.requireNonNull(fluoAppName));
     }
-    
+
     /**
      * Sets the name of the Fluo table
      * @param fluoTableName
      */
-    public void setFluoTableName(String fluoTableName) {
-       set(FLUO_TABLE_NAME, Preconditions.checkNotNull(fluoTableName)); 
+    public void setFluoTableName(final String fluoTableName) {
+       set(FLUO_TABLE_NAME, Objects.requireNonNull(fluoTableName));
     }
-    
+
     /**
      * Sets the Kafka bootstrap servers
      * @param bootStrapServers
      */
-    public void setBootStrapServers(String bootStrapServers) {
-        set(KAFKA_BOOTSTRAP_SERVERS, Preconditions.checkNotNull(bootStrapServers)); 
+    public void setBootStrapServers(final String bootStrapServers) {
+        set(KAFKA_BOOTSTRAP_SERVERS, Objects.requireNonNull(bootStrapServers));
     }
-    
+
     /**
      * Sets the Kafka topic name for new notification requests
      * @param notificationTopic
      */
-    public void setNotificationTopic(String notificationTopic) {
-        set(NOTIFICATION_TOPIC, Preconditions.checkNotNull(notificationTopic));
+    public void setNotificationTopic(final String notificationTopic) {
+        set(NOTIFICATION_TOPIC, Objects.requireNonNull(notificationTopic));
     }
-    
+
     /**
      * Sets the GroupId for new notification request topic
      * @param notificationGroupId
      */
-    public void setNotificationGroupId(String notificationGroupId) {
-        set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationGroupId));
+    public void setNotificationGroupId(final String notificationGroupId) {
+        set(NOTIFICATION_GROUP_ID, Objects.requireNonNull(notificationGroupId));
     }
-    
+
     /**
      * Sets the ClientId for the Kafka notification topic
      * @param notificationClientId
      */
-    public void setNotificationClientId(String notificationClientId) {
-        set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationClientId));
+    public void setNotificationClientId(final String notificationClientId) {
+        set(NOTIFICATION_CLIENT_ID, Objects.requireNonNull(notificationClientId));
     }
-    
+
     /**
      * Sets the number of threads for the coordinator
      * @param threads
      */
-    public void setCoordinatorThreads(int threads) {
+    public void setCoordinatorThreads(final int threads) {
         setInt(COORDINATOR_THREADS, threads);
     }
-    
+
     /**
      * Sets the number of threads for the exporter
      * @param threads
      */
-    public void setExporterThreads(int threads) {
+    public void setExporterThreads(final int threads) {
         setInt(EXPORTER_THREADS, threads);
     }
-    
+
     /**
      * Sets the number of threads for the producer for reading new periodic notifications
      * @param threads
      */
-    public void setProducerThreads(int threads) {
+    public void setProducerThreads(final int threads) {
         setInt(PRODUCER_THREADS, threads);
     }
-    
+
     /**
      * Sets the number of threads for the bin pruner
      * @param threads
      */
-    public void setPrunerThreads(int threads) {
+    public void setPrunerThreads(final int threads) {
         setInt(PRUNER_THREADS, threads);
     }
-    
+
     /**
      * Sets the number of threads for the Notification processor
      * @param threads
      */
-    public void setProcessorThreads(int threads) {
+    public void setProcessorThreads(final int threads) {
         setInt(PROCESSOR_THREADS, threads);
     }
-    
+
     /**
      * @return name of the Fluo application
      */
     public String getFluoAppName() {
         return get(FLUO_APP_NAME);
     }
-    
+
     /**
      * @return name of the Fluo table
      */
     public String getFluoTableName() {
-       return get(FLUO_TABLE_NAME); 
+       return get(FLUO_TABLE_NAME);
     }
-    
+
     /**
      * @return Kafka bootstrap servers
      */
     public String getBootStrapServers() {
-        return get(KAFKA_BOOTSTRAP_SERVERS); 
+        return get(KAFKA_BOOTSTRAP_SERVERS);
     }
-    
+
     /**
      * @return notification topic
      */
     public String getNotificationTopic() {
         return get(NOTIFICATION_TOPIC, "notifications");
     }
-    
+
     /**
      * @return Kafka GroupId for the notificaton topic
      */
     public String getNotificationGroupId() {
         return get(NOTIFICATION_GROUP_ID, "group0");
     }
-    
+
     /**
      * @return Kafka ClientId for the notification topic
      */
     public String getNotificationClientId() {
         return get(NOTIFICATION_CLIENT_ID, "consumer0");
     }
-    
+
     /**
      * @return the number of threads for the coordinator
      */
     public int getCoordinatorThreads() {
         return getInt(COORDINATOR_THREADS, 1);
     }
-    
+
     /**
      * @return the number of threads for the exporter
      */
     public int getExporterThreads() {
         return getInt(EXPORTER_THREADS, 1);
     }
-    
+
     /**
      * @return the number of threads for the notification producer
      */
     public int getProducerThreads() {
         return getInt(PRODUCER_THREADS, 1);
     }
-    
+
     /**
      * @return the number of threads for the bin pruner
      */
     public int getPrunerThreads() {
         return getInt(PRUNER_THREADS, 1);
     }
-    
+
     /**
      * @return number of threads for the processor
      */
     public int getProcessorThreads() {
         return getInt(PROCESSOR_THREADS, 1);
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
index 771a4ab..fbc03f3 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
@@ -18,10 +18,13 @@
  */
 package org.apache.rya.periodic.notification.application;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -33,6 +36,7 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
@@ -51,6 +55,9 @@ import org.apache.rya.periodic.notification.registration.kafka.KafkaNotification
 import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
 import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
 import org.openrdf.query.BindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * Factory for creating a {@link PeriodicNotificationApplication}.
@@ -59,82 +66,88 @@ public class PeriodicNotificationApplicationFactory {
 
     /**
      * Create a PeriodicNotificationApplication.
-     * @param props - Properties file that specifies the parameters needed to create the application
+     * @param conf - Configuration object that specifies the parameters needed to create the application
      * @return PeriodicNotificationApplication to periodically poll Rya Fluo for new results
      * @throws PeriodicApplicationException
      */
-    public static PeriodicNotificationApplication getPeriodicApplication(Properties props) throws PeriodicApplicationException {
-        PeriodicNotificationApplicationConfiguration conf = new PeriodicNotificationApplicationConfiguration(props);
-        Properties kafkaProps = getKafkaProperties(conf);
+    public static PeriodicNotificationApplication getPeriodicApplication(final PeriodicNotificationApplicationConfiguration conf) throws PeriodicApplicationException {
+        final Properties kafkaConsumerProps = getKafkaConsumerProperties(conf);
+        final Properties kafkaProducerProps = getKafkaProducerProperties(conf);
 
-        BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
-        BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
-        BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();
+        final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+        final BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
+        final BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();
 
         FluoClient fluo = null;
         try {
-            PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf);
+            final PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf);
             fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf);
-            NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications);
+            final NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications);
             addRegisteredNotices(coordinator, fluo.newSnapshot());
-            KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProps, bindingSets);
-            PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins);
-            NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads());
-            KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaProps);
+            final KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProducerProps, bindingSets);
+            final PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins);
+            final NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads());
+            final KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaConsumerProps);
             return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter)
                     .setProcessor(processor).setPruner(pruner).build();
         } catch (AccumuloException | AccumuloSecurityException e) {
             throw new PeriodicApplicationException(e.getMessage());
-        } 
+        }
     }
-    
-    private static void addRegisteredNotices(NotificationCoordinatorExecutor coord, Snapshot sx) {
+
+    private static void addRegisteredNotices(final NotificationCoordinatorExecutor coord, final Snapshot sx) {
         coord.start();
-        PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
+        final PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
         provider.processRegisteredNotifications(coord, sx);
     }
 
-    private static NotificationCoordinatorExecutor getCoordinator(int numThreads, BlockingQueue<TimestampedNotification> notifications) {
+    private static NotificationCoordinatorExecutor getCoordinator(final int numThreads, final BlockingQueue<TimestampedNotification> notifications) {
         return new PeriodicNotificationCoordinatorExecutor(numThreads, notifications);
     }
 
-    private static KafkaExporterExecutor getExporter(int numThreads, Properties props, BlockingQueue<BindingSetRecord> bindingSets) {
-        KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe());
+    private static KafkaExporterExecutor getExporter(final int numThreads, final Properties props, final BlockingQueue<BindingSetRecord> bindingSets) {
+        final KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe());
         return new KafkaExporterExecutor(producer, numThreads, bindingSets);
     }
 
-    private static PeriodicQueryPrunerExecutor getPruner(PeriodicQueryResultStorage storage, FluoClient fluo, int numThreads,
-            BlockingQueue<NodeBin> bins) {
+    private static PeriodicQueryPrunerExecutor getPruner(final PeriodicQueryResultStorage storage, final FluoClient fluo, final int numThreads,
+            final BlockingQueue<NodeBin> bins) {
         return new PeriodicQueryPrunerExecutor(storage, fluo, numThreads, bins);
     }
 
-    private static NotificationProcessorExecutor getProcessor(PeriodicQueryResultStorage periodicStorage,
-            BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets,
-            int numThreads) {
+    private static NotificationProcessorExecutor getProcessor(final PeriodicQueryResultStorage periodicStorage,
+            final BlockingQueue<TimestampedNotification> notifications, final BlockingQueue<NodeBin> bins, final BlockingQueue<BindingSetRecord> bindingSets,
+            final int numThreads) {
         return new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, numThreads);
     }
 
-    private static KafkaNotificationProvider getProvider(int numThreads, String topic, NotificationCoordinatorExecutor coord,
-            Properties props) {
+    private static KafkaNotificationProvider getProvider(final int numThreads, final String topic, final NotificationCoordinatorExecutor coord,
+            final Properties props) {
         return new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord,
                 numThreads);
     }
 
-    private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(PeriodicNotificationApplicationConfiguration conf)
+    private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(final PeriodicNotificationApplicationConfiguration conf)
             throws AccumuloException, AccumuloSecurityException {
-        Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers());
-        Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword()));
-        String ryaInstance = conf.getTablePrefix();
+        final Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers());
+        final Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword()));
+        final String ryaInstance = conf.getTablePrefix();
         return new AccumuloPeriodicQueryResultStorage(conn, ryaInstance);
     }
-    
-    private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { 
-        Properties kafkaProps = new Properties();
+
+    private static Properties getKafkaConsumerProperties(final PeriodicNotificationApplicationConfiguration conf) {
+        final Properties kafkaProps = new Properties();
         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
         kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId());
         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        kafkaProps.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "30000");  // reduce this value to 30 seconds for the scenario where we subscribe before the topic exists.
         return kafkaProps;
     }
 
+    private static Properties getKafkaProducerProperties(final PeriodicNotificationApplicationConfiguration conf) {
+        final Properties kafkaProps = new Properties();
+        kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
+        return kafkaProps;
+    }
 }