You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/09/11 16:14:40 UTC

[7/7] incubator-rya git commit: RYA-355 Refactored the periodic notification service structure. Closes #221.

RYA-355 Refactored the periodic notification service structure. Closes #221.

New artifactIds align better with code packaging, semantics.
Light pom cleaning.
Eliminates redundant text in file paths.
Makes adding the twill app for RYA-356 a little cleaner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/de365c17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/de365c17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/de365c17

Branch: refs/heads/master
Commit: de365c1795b700cc5383be1927c4e598c37d948a
Parents: 28b0a52
Author: jdasch <hc...@gmail.com>
Authored: Fri Sep 1 20:51:25 2017 -0400
Committer: Caleb Meier <ca...@parsons.com>
Committed: Mon Sep 11 09:13:53 2017 -0700

----------------------------------------------------------------------
 extras/indexing/pom.xml                         |   2 +-
 extras/periodic.notification/api/.gitignore     |   1 +
 extras/periodic.notification/api/pom.xml        |  52 ++
 .../periodic/notification/api/BinPruner.java    |  40 ++
 .../notification/api/BindingSetExporter.java    |  37 ++
 .../notification/api/BindingSetRecord.java      |  80 +++
 .../api/BindingSetRecordExportException.java    |  45 ++
 .../periodic/notification/api/LifeCycle.java    |  45 ++
 .../rya/periodic/notification/api/NodeBin.java  |  77 +++
 .../periodic/notification/api/Notification.java |  34 ++
 .../api/NotificationCoordinatorExecutor.java    |  41 ++
 .../notification/api/NotificationProcessor.java |  41 ++
 .../api/PeriodicNotificationClient.java         |  64 +++
 .../notification/BasicNotification.java         |  76 +++
 .../notification/CommandNotification.java       |  99 ++++
 .../notification/PeriodicNotification.java      | 178 +++++++
 .../notification/TimestampedNotification.java   |  69 +++
 .../KafkaNotificationRegistrationClient.java    |  80 +++
 .../BasicNotificationTypeAdapter.java           |  55 +++
 .../serialization/BindingSetSerDe.java          | 105 ++++
 .../CommandNotificationSerializer.java          |  76 +++
 .../CommandNotificationTypeAdapter.java         |  89 ++++
 .../PeriodicNotificationTypeAdapter.java        |  73 +++
 extras/periodic.notification/pom.xml            |  40 ++
 extras/periodic.notification/service/pom.xml    | 102 ++++
 .../PeriodicApplicationException.java           |  47 ++
 .../PeriodicNotificationApplication.java        | 207 ++++++++
 ...dicNotificationApplicationConfiguration.java | 254 ++++++++++
 .../PeriodicNotificationApplicationFactory.java | 140 ++++++
 ...PeriodicNotificationCoordinatorExecutor.java | 159 ++++++
 .../exporter/KafkaExporterExecutor.java         | 110 +++++
 .../KafkaPeriodicBindingSetExporter.java        |  99 ++++
 .../NotificationProcessorExecutor.java          | 114 +++++
 .../TimestampedNotificationProcessor.java       | 203 ++++++++
 .../notification/pruner/AccumuloBinPruner.java  |  66 +++
 .../notification/pruner/FluoBinPruner.java      |  76 +++
 .../pruner/PeriodicQueryPruner.java             | 107 ++++
 .../pruner/PeriodicQueryPrunerExecutor.java     | 104 ++++
 .../recovery/PeriodicNotificationProvider.java  | 142 ++++++
 .../kafka/KafkaNotificationProvider.java        | 123 +++++
 .../kafka/PeriodicNotificationConsumer.java     |  88 ++++
 .../CommandNotificationSerializerTest.java      |  60 +++
 extras/periodic.notification/tests/pom.xml      |  62 +++
 .../PeriodicNotificationApplicationIT.java      | 493 +++++++++++++++++++
 .../PeriodicNotificationProviderIT.java         |  71 +++
 .../PeriodicNotificationExporterIT.java         | 143 ++++++
 .../PeriodicNotificationProcessorIT.java        | 121 +++++
 .../pruner/PeriodicNotificationBinPrunerIT.java | 283 +++++++++++
 .../PeriodicCommandNotificationConsumerIT.java  | 139 ++++++
 .../tests/src/test/resources/log4j.properties   |  37 ++
 .../src/test/resources/notification.properties  |  35 ++
 extras/pom.xml                                  |   2 +-
 extras/rya.pcj.fluo/pcj.fluo.api/pom.xml        |  12 +-
 .../periodic.service.api/.gitignore             |   1 -
 .../periodic.service.api/pom.xml                |  52 --
 .../periodic/notification/api/BinPruner.java    |  40 --
 .../notification/api/BindingSetExporter.java    |  37 --
 .../notification/api/BindingSetRecord.java      |  80 ---
 .../api/BindingSetRecordExportException.java    |  45 --
 .../periodic/notification/api/LifeCycle.java    |  45 --
 .../rya/periodic/notification/api/NodeBin.java  |  77 ---
 .../periodic/notification/api/Notification.java |  34 --
 .../api/NotificationCoordinatorExecutor.java    |  41 --
 .../notification/api/NotificationProcessor.java |  41 --
 .../api/PeriodicNotificationClient.java         |  64 ---
 .../notification/BasicNotification.java         |  76 ---
 .../notification/CommandNotification.java       |  99 ----
 .../notification/PeriodicNotification.java      | 178 -------
 .../notification/TimestampedNotification.java   |  69 ---
 .../KafkaNotificationRegistrationClient.java    |  80 ---
 .../BasicNotificationTypeAdapter.java           |  55 ---
 .../serialization/BindingSetSerDe.java          | 105 ----
 .../CommandNotificationSerializer.java          |  76 ---
 .../CommandNotificationTypeAdapter.java         |  89 ----
 .../PeriodicNotificationTypeAdapter.java        |  73 ---
 .../periodic.service.integration.tests/pom.xml  |  62 ---
 .../PeriodicNotificationApplicationIT.java      | 493 -------------------
 .../PeriodicNotificationProviderIT.java         |  71 ---
 .../PeriodicNotificationExporterIT.java         | 143 ------
 .../PeriodicNotificationProcessorIT.java        | 121 -----
 .../pruner/PeriodicNotificationBinPrunerIT.java | 283 -----------
 .../PeriodicCommandNotificationConsumerIT.java  | 139 ------
 .../src/test/resources/log4j.properties         |  37 --
 .../src/test/resources/notification.properties  |  35 --
 .../periodic.service.notification/pom.xml       | 112 -----
 .../PeriodicApplicationException.java           |  47 --
 .../PeriodicNotificationApplication.java        | 207 --------
 ...dicNotificationApplicationConfiguration.java | 254 ----------
 .../PeriodicNotificationApplicationFactory.java | 140 ------
 ...PeriodicNotificationCoordinatorExecutor.java | 159 ------
 .../exporter/KafkaExporterExecutor.java         | 110 -----
 .../KafkaPeriodicBindingSetExporter.java        |  99 ----
 .../NotificationProcessorExecutor.java          | 114 -----
 .../TimestampedNotificationProcessor.java       | 203 --------
 .../notification/pruner/AccumuloBinPruner.java  |  66 ---
 .../notification/pruner/FluoBinPruner.java      |  76 ---
 .../pruner/PeriodicQueryPruner.java             | 107 ----
 .../pruner/PeriodicQueryPrunerExecutor.java     | 104 ----
 .../recovery/PeriodicNotificationProvider.java  | 142 ------
 .../kafka/KafkaNotificationProvider.java        | 123 -----
 .../kafka/PeriodicNotificationConsumer.java     |  88 ----
 .../CommandNotificationSerializerTest.java      |  60 ---
 extras/rya.periodic.service/pom.xml             |  40 --
 pom.xml                                         |  11 +-
 104 files changed, 5093 insertions(+), 5108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml
index 16a205f..1d3a32b 100644
--- a/extras/indexing/pom.xml
+++ b/extras/indexing/pom.xml
@@ -83,7 +83,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>
-            <artifactId>rya.periodic.service.api</artifactId>
+            <artifactId>rya.periodic.notification.api</artifactId>
         </dependency>
         <!-- OpenRDF -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/.gitignore
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/.gitignore b/extras/periodic.notification/api/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/extras/periodic.notification/api/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/pom.xml b/extras/periodic.notification/api/pom.xml
new file mode 100644
index 0000000..aecd723
--- /dev/null
+++ b/extras/periodic.notification/api/pom.xml
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+        contributor license agreements. See the NOTICE file distributed with this 
+        work for additional information regarding copyright ownership. The ASF licenses 
+        this file to you under the Apache License, Version 2.0 (the "License"); you 
+        may not use this file except in compliance with the License. You may obtain 
+        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+        required by applicable law or agreed to in writing, software distributed 
+        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+        OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+        the specific language governing permissions and limitations under the License. -->
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.periodic.notification.parent</artifactId>
+        <version>3.2.11-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.periodic.notification.api</artifactId>
+
+    <name>Apache Rya Periodic Notification API</name>
+    <description>API for Periodic Notification Applications</description>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.8.0</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-query</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing.pcj</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
new file mode 100644
index 0000000..f4a083c
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Object that cleans up old {@link BindingSet}s corresponding to the specified
+ * {@link NodeBin}. This class deletes all BindingSets with the bin 
+ * indicated by {@link NodeBin#getBin()}.  A BindingSet corresponds to a given
+ * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}
+ * and value equal to the given bin.
+ *
+ */
+public interface BinPruner {
+    
+    /**
+     * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}.
+     * @param bin - NodeBin that indicates which BindingSets to delete..
+     */
+    public void pruneBindingSetBin(NodeBin bin);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
new file mode 100644
index 0000000..491576b
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.BindingSet;
+
+/**
+ * An Object that is used to export {@link BindingSet}s to an external repository or queuing system.
+ *
+ */
+public interface BindingSetExporter {
+
+    /**
+     * This method exports the BindingSet to the external repository or queuing system
+     * that this BindingSetExporter is configured to export to.
+     * @param bindingSet - {@link BindingSet} to be exported
+     * @throws ResultExportException
+     */
+    public void exportNotification(BindingSetRecord bindingSet) throws BindingSetRecordExportException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
new file mode 100644
index 0000000..c3f70f1
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Objects;
+
+/**
+ * Object that associates a {@link BindingSet} with a given Kafka topic.
+ * This ensures that the {@link KafkaPeriodicBindingSetExporter} can export
+ * each BindingSet to its appropriate topic.
+ *
+ */
+public class BindingSetRecord {
+
+    private BindingSet bs;
+    private String topic;
+    
+    public BindingSetRecord(BindingSet bs, String topic) {
+        this.bs = bs;
+        this.topic = topic;
+    }
+    
+    /**
+     * @return BindingSet in this BindingSetRecord
+     */
+    public BindingSet getBindingSet() {
+        return bs;
+    }
+    
+    /**
+     * @return Kafka topic for this BindingSetRecord
+     */
+    public String getTopic() {
+        return topic;
+    }
+    
+    @Override 
+    public boolean equals(Object o) {
+        if(this == o) {
+            return true;
+        }
+        
+        if(o instanceof BindingSetRecord) {
+            BindingSetRecord record = (BindingSetRecord) o;
+            return Objects.equal(this.bs, record.bs)&&Objects.equal(this.topic,record.topic);
+        }
+        
+        return false;
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(bs, topic);
+    }
+    
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Binding Set Record \n").append("  Topic: " + topic + "\n").append("  BindingSet: " + bs + "\n")
+                .toString();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
new file mode 100644
index 0000000..94e4980
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * A result could not be exported.
+ */
+public class BindingSetRecordExportException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs an instance of {@link BindingSetRecordExportException}.
+     *
+     * @param message - Explains why the exception was thrown.
+     */
+    public BindingSetRecordExportException(final String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs an instance of {@link BindingSetRecordExportException}.
+     *
+     * @param message - Explains why the exception was thrown.
+     * @param cause - The exception that caused this one to be thrown.
+     */
+    public BindingSetRecordExportException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
new file mode 100644
index 0000000..b1e8bad
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Interface providing basic life cycle functionality,
+ * including stopping and starting any class implementing this
+ * interface and checking whether is it running.
+ *
+ */
+public interface LifeCycle {
+
+    /**
+     * Starts a running application.
+     */
+    public void start();
+
+    /**
+     * Stops a running application.
+     */
+    public void stop();
+    
+    /**
+     * Determine if application is currently running.
+     * @return true if application is running and false otherwise.
+     */
+    public boolean currentlyRunning();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
new file mode 100644
index 0000000..3ed7979
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.Objects;
+
+/**
+ * Object used to indicate the id of a given Periodic Query
+ * along with a particular bin of results.  This Object is used
+ * by the {@link BinPruner} to clean up old query results after
+ * they have been processed.
+ *
+ */
+public class NodeBin {
+
+    private long bin;
+    private String nodeId;
+
+    public NodeBin(String nodeId, long bin) {
+        this.bin = bin;
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return id of Periodic Query
+     */
+    public String getNodeId() {
+        return nodeId;
+    }
+/**
+ * @return bin id of results for a given Periodic Query 
+ */
+    public long getBin() {
+        return bin;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other instanceof NodeBin) {
+            NodeBin bin = (NodeBin) other;
+            return this.bin == bin.bin && this.nodeId.equals(bin.nodeId);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bin, nodeId);
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Node Bin \n").append("   QueryId: " + nodeId + "\n").append("   Bin: " + bin + "\n").toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
new file mode 100644
index 0000000..3e9e0d1
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public interface Notification {
+
+    /**
+     * @return id of a Periodic Query
+     */
+    public String getId();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
new file mode 100644
index 0000000..d53dc17
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+
+/**
+ * Object that manages the periodic notifications for the Periodic Query Service.
+ * This Object processes new requests for periodic updates by registering them with
+ * some sort of service that generates periodic updates (such as a {@link ScheduledExecutorService}).
+ *
+ */
+public interface NotificationCoordinatorExecutor extends LifeCycle {
+
+    /**
+     * Registers or deletes a {@link CommandNotification}s with the periodic service to
+     * generate notifications at a regular interval indicated by the CommandNotification.
+     * @param notification - CommandNotification to be registered or deleted from the periodic update
+     * service.
+     */
+    public void processNextCommandNotification(CommandNotification notification);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
new file mode 100644
index 0000000..4ac9089
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
@@ -0,0 +1,41 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+
+/**
+ * Object that processes new {@link TimestampedNotification}s generated by {@link NotificationCoordinatorExecutor}.
+ * It is expected that the NotificationCoordinatorExecutor will this Object with notifications to perform work via some sort 
+ * sort of queuing service such as a BlockingQueue or Kafka.  This Object processes the notifications by retrieving
+ * query results associated with the Periodic Query id given by {@link TimestampedNotification#getId()}, parsing them
+ * and then providing them to another service to be exported.
+ *
+ */
+public interface NotificationProcessor {
+
+    /**
+     * Processes {@link TimestampedNotification}s by retrieving the Periodic Query results
+     * associated the query id given by {@link TimestampedNotification#getId()}.
+     * @param notification - contains information about which query results to retrieve
+     */
+    public void processNotification(TimestampedNotification notification);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
new file mode 100644
index 0000000..ff08733
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ * Object to register {@link PeriodicNotification}s with an external queuing
+ * service to be handled by a {@link NotificationCoordinatorExecutor} service.
+ * The service will generate notifications to process Periodic Query results at regular
+ * intervals corresponding the period of the PeriodicNotification.
+ *
+ */
+public interface PeriodicNotificationClient extends AutoCloseable {
+
+    /**
+     * Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor}
+     * @param notification - notification to be added
+     */
+    public void addNotification(PeriodicNotification notification);
+    
+    /**
+     * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
+     * @param notification - notification to be deleted
+     */
+    public void deleteNotification(BasicNotification notification);
+    
+    /**
+     * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
+     * @param notification - id corresponding to the notification to be deleted
+     */
+    public void deleteNotification(String notificationId);
+    
+    /**
+     * Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor}
+     * @param id - Periodic Query id
+     * @param period - period indicating frequency at which notifications will be generated
+     * @param delay - initial delay for starting periodic notifications
+     * @param unit - time unit of delay and period
+     */
+    public void addNotification(String id, long period, long delay, TimeUnit unit);
+    
+    public void close();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
new file mode 100644
index 0000000..c31a5c0
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public class BasicNotification implements Notification {
+
+    private String id;
+
+    /**
+     * Creates a BasicNotification
+     * @param id - Fluo query id associated with this Notification
+     */
+    public BasicNotification(String id) {
+        this.id = id;
+    }
+
+    /**
+     * @return the Fluo Query Id that this notification will generate results for
+     */
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other instanceof BasicNotification) {
+            BasicNotification not = (BasicNotification) other;
+            return Objects.equal(this.id, not.id);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(id);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        return builder.append("id").append("=").append(id).toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
new file mode 100644
index 0000000..597b228
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This Object contains a Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic Query with the
+ * indicated id. Additionally, the CommandNotification contains a
+ * {@link Command} about which action the
+ * {@link NotificationCoordinatorExecutor} should take (adding or deleting).
+ * CommandNotifications are meant to be added to an external work queue (such as
+ * Kafka) to be processed by the NotificationCoordinatorExecutor.
+ *
+ */
+public class CommandNotification implements Notification {
+
+    private Notification notification;
+    private Command command;
+
+    public enum Command {
+        ADD, DELETE
+    };
+
+    /**
+     * Creates a new CommandNotification
+     * @param command - the command associated with this notification (either add, update, or delete)
+     * @param notification - the underlying notification associated with this command
+     */
+    public CommandNotification(Command command, Notification notification) {
+        this.notification = Preconditions.checkNotNull(notification);
+        this.command = Preconditions.checkNotNull(command);
+    }
+
+    @Override
+    public String getId() {
+        return notification.getId();
+    }
+
+    /**
+     * Returns {@link Notification} contained by this CommmandNotification.
+     * @return - Notification contained by this Object
+     */
+    public Notification getNotification() {
+        return this.notification;
+    }
+
+    /**
+     * @return Command contained by this Object (either add or delete)
+     */
+    public Command getCommand() {
+        return this.command;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other instanceof CommandNotification) {
+            CommandNotification cn = (CommandNotification) other;
+            return Objects.equal(this.command, cn.command) && Objects.equal(this.notification, cn.notification);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(command, notification);
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("command").append("=").append(command.toString()).append(";")
+                .append(notification.toString()).toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
new file mode 100644
index 0000000..aa9e581
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Notification Object used by the Periodic Query Service to inform workers to
+ * process results for a given Periodic Query with the indicated id.
+ * Additionally, this Object contains a period that indicates a frequency at
+ * which regular updates are generated.
+ *
+ */
+public class PeriodicNotification implements Notification {
+
+    private String id;
+    private long period;
+    private TimeUnit periodTimeUnit;
+    private long initialDelay;
+
+    /**
+     * Creates a PeriodicNotification.
+     * @param id - Fluo Query Id that this notification is associated with
+     * @param period - period at which notifications are generated
+     * @param periodTimeUnit - time unit associated with the period and delay
+     * @param initialDelay - amount of time to wait before generating the first notification
+     */
+    public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) {
+        this.id = Preconditions.checkNotNull(id);
+        this.periodTimeUnit = Preconditions.checkNotNull(periodTimeUnit);
+        Preconditions.checkArgument(period > 0 && initialDelay >= 0);
+        this.period = period;
+        this.initialDelay = initialDelay;
+    }
+    
+
+    /**
+     * Create a PeriodicNotification
+     * @param other - other PeriodicNotification used in copy constructor
+     */
+    public PeriodicNotification(PeriodicNotification other) {
+        this(other.id, other.period, other.periodTimeUnit, other.initialDelay);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * @return - period at which regular notifications are generated
+     */
+    public long getPeriod() {
+        return period;
+    }
+
+    /**
+     * @return time unit of period and initial delay
+     */
+    public TimeUnit getTimeUnit() {
+        return periodTimeUnit;
+    }
+
+    /**
+     * @return amount of time to delay before beginning to generate notifications
+     */
+    public long getInitialDelay() {
+        return initialDelay;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        String delim = "=";
+        String delim2 = ";";
+        return builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2)
+                .append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim)
+                .append(initialDelay).toString();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (!(other instanceof PeriodicNotification)) {
+            return false;
+        }
+
+        PeriodicNotification notification = (PeriodicNotification) other;
+        return Objects.equals(this.id, notification.id) && (this.period == notification.period) 
+                && Objects.equals(this.periodTimeUnit, notification.periodTimeUnit) && (this.initialDelay == notification.initialDelay);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, period, periodTimeUnit, initialDelay);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private String id;
+        private long period;
+        private TimeUnit periodTimeUnit;
+        private long initialDelay = 0;
+
+        /**
+         * @param id - periodic query id
+         * @return - builder to chain method calls
+         */
+        public Builder id(String id) {
+            this.id = id;
+            return this;
+        }
+
+        /**
+         * @param period of the periodic notification for generating regular notifications
+         * @return - builder to chain method calls
+         */
+        public Builder period(long period) {
+            this.period = period;
+            return this;
+        }
+
+        /**
+         * @param timeUnit of period and initial delay
+          * @return - builder to chain method calls
+         */
+        public Builder timeUnit(TimeUnit timeUnit) {
+            this.periodTimeUnit = timeUnit;
+            return this;
+        }
+
+        /**
+         * @param initialDelay - amount of time to wait before generating notifications
+         * @return - builder to chain method calls
+         */
+        public Builder initialDelay(long initialDelay) {
+            this.initialDelay = initialDelay;
+            return this;
+        }
+
+        /**
+         * Builds PeriodicNotification
+         * @return PeriodicNotification constructed from Builder specified parameters
+         */
+        public PeriodicNotification build() {
+            return new PeriodicNotification(id, period, periodTimeUnit, initialDelay);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
new file mode 100644
index 0000000..38073ce
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link PeriodicNotification} Object used by the Periodic Query Service to inform workers to
+ * process results for a given Periodic Query with the indicated id.  Additionally
+ * this Object contains a {@link Date} object to indicate the date time at which this
+ * notification was generated.
+ *
+ */
+public class TimestampedNotification extends PeriodicNotification {
+
+    private Date date;
+
+    /**
+     * Constructs a TimestampedNotification
+     * @param id - Fluo Query Id associated with this Notification
+     * @param period - period at which notifications are generated
+     * @param periodTimeUnit - time unit associated with period and initial delay
+     * @param initialDelay - amount of time to wait before generating first notification
+     */
+    public TimestampedNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) {
+        super(id, period, periodTimeUnit, initialDelay);
+        date = new Date();
+    }
+    
+    /**
+     * Creates a TimestampedNotification
+     * @param notification - PeriodicNotification used to create this TimestampedNotification.  
+     * This constructor creates a time stamp for the TimestampedNotification.
+     */
+    public TimestampedNotification(PeriodicNotification notification) {
+        super(notification);
+        date = new Date();
+    }
+
+    /**
+     * @return timestamp at which this notification was generated
+     */
+    public Date getTimestamp() {
+        return date;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString() + ";date=" + date;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
new file mode 100644
index 0000000..bb438be
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.registration;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ *  Implementation of {@link PeriodicNotificaitonClient} used to register new notification
+ *  requests with the PeriodicQueryService. 
+ *
+ */
+public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient {
+
+    private KafkaProducer<String, CommandNotification> producer;
+    private String topic;
+    
+    public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) {
+        this.topic = topic;
+        this.producer = producer;
+    }
+    
+    @Override
+    public void addNotification(PeriodicNotification notification) {
+        processNotification(new CommandNotification(Command.ADD, notification));
+
+    }
+
+    @Override
+    public void deleteNotification(BasicNotification notification) {
+        processNotification(new CommandNotification(Command.DELETE, notification));
+    }
+
+    @Override
+    public void deleteNotification(String notificationId) {
+        processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId)));
+    }
+
+    @Override
+    public void addNotification(String id, long period, long delay, TimeUnit unit) {
+        Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build();
+        processNotification(new CommandNotification(Command.ADD, notification));
+    }
+    
+   
+    private void processNotification(CommandNotification notification) {
+        producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification));
+    }
+    
+    @Override
+    public void close() {
+        producer.close();
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
new file mode 100644
index 0000000..bd29d29
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} for {@link BasicNotification}s.  Used in {@link CommandNotificationTypeAdapter} to
+ * serialize {@link CommandNotification}s.  
+ *
+ */
+public class BasicNotificationTypeAdapter implements JsonDeserializer<BasicNotification>, JsonSerializer<BasicNotification> {
+
+    @Override
+    public JsonElement serialize(BasicNotification arg0, Type arg1, JsonSerializationContext arg2) {
+        JsonObject result = new JsonObject();
+        result.add("id", new JsonPrimitive(arg0.getId()));
+        return result;
+    }
+
+    @Override
+    public BasicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException {
+        JsonObject json = arg0.getAsJsonObject();
+        String id = json.get("id").getAsString();
+        return new BasicNotification(id);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
new file mode 100644
index 0000000..50180ad
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Joiner;
+import com.google.common.primitives.Bytes;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming messages
+ * from Kafka.
+ *
+ */
+public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> {
+
+    private static final Logger log = Logger.getLogger(BindingSetSerDe.class);
+    private static final AccumuloPcjSerializer serializer =  new AccumuloPcjSerializer();
+    private static final byte[] DELIM_BYTE = "\u0002".getBytes();
+    
+    private byte[] toBytes(BindingSet bindingSet) {
+        try {
+            return getBytes(getVarOrder(bindingSet), bindingSet);
+        } catch(Exception e) {
+            log.trace("Unable to serialize BindingSet: " + bindingSet);
+            return new byte[0];
+        }
+    }
+
+    private BindingSet fromBytes(byte[] bsBytes) {
+        try{
+        int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE);
+        byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex);
+        byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length);
+        VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";"));
+        return getBindingSet(varOrder, bsBytesNoVarOrder);
+        } catch(Exception e) {
+            log.trace("Unable to deserialize BindingSet: " + bsBytes);
+            return new QueryBindingSet();
+        }
+    }
+    
+    private VariableOrder getVarOrder(BindingSet bs) {
+        return new VariableOrder(bs.getBindingNames());
+    }
+    
+    private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException {
+        byte[] bsBytes = serializer.convert(bs, varOrder);
+        String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders());
+        byte[] varOrderBytes = varOrderString.getBytes("UTF-8");
+        return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes);
+    }
+    
+    private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) throws BindingSetConversionException {
+        return serializer.convert(bsBytes, varOrder);
+    }
+
+    @Override
+    public BindingSet deserialize(String topic, byte[] bytes) {
+        return fromBytes(bytes);
+    }
+
+    @Override
+    public void close() {
+        // Do nothing. Nothing to close.
+    }
+
+    @Override
+    public void configure(Map<String, ?> arg0, boolean arg1) {
+        // Do nothing.  Nothing to configure.
+    }
+
+    @Override
+    public byte[] serialize(String topic, BindingSet bs) {
+        return toBytes(bs);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
new file mode 100644
index 0000000..302e1be
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s
+ * to and from Kafka.
+ *
+ */
+public class CommandNotificationSerializer implements Serializer<CommandNotification>, Deserializer<CommandNotification> {
+
+    private static Gson gson = new GsonBuilder()
+            .registerTypeHierarchyAdapter(Notification.class, new CommandNotificationTypeAdapter()).create();
+    private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class);
+
+    @Override
+    public CommandNotification deserialize(String topic, byte[] bytes) {
+        String json = null;
+        try {
+            json = new String(bytes, "UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            LOG.info("Unable to deserialize notification for topic: " + topic);
+        }
+        return gson.fromJson(json, CommandNotification.class);
+    }
+
+    @Override
+    public byte[] serialize(String topic, CommandNotification command) {
+        try {
+            return gson.toJson(command).getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            LOG.info("Unable to serialize notification: " + command  + "for topic: " + topic);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        // Do nothing. Nothing to close
+    }
+    
+    @Override
+    public void configure(Map<String, ?> arg0, boolean arg1) {
+        // Do nothing. Nothing to configure
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
new file mode 100644
index 0000000..a9fb7e1
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} used to serialize and deserialize {@link CommandNotification}s.
+ * This TypeAdapter is used in {@link CommandNotificationSerializer} for producing and 
+ * consuming messages to and from Kafka.
+ *
+ */
+public class CommandNotificationTypeAdapter
+        implements JsonDeserializer<CommandNotification>, JsonSerializer<CommandNotification> {
+
+    @Override
+    public JsonElement serialize(CommandNotification arg0, Type arg1, JsonSerializationContext arg2) {
+        JsonObject result = new JsonObject();
+        result.add("command", new JsonPrimitive(arg0.getCommand().name()));
+        Notification notification = arg0.getNotification();
+        if (notification instanceof PeriodicNotification) {
+            result.add("type", new JsonPrimitive(PeriodicNotification.class.getSimpleName()));
+            PeriodicNotificationTypeAdapter adapter = new PeriodicNotificationTypeAdapter();
+            result.add("notification",
+                    adapter.serialize((PeriodicNotification) notification, PeriodicNotification.class, arg2));
+        } else if (notification instanceof BasicNotification) {
+            result.add("type", new JsonPrimitive(BasicNotification.class.getSimpleName()));
+            BasicNotificationTypeAdapter adapter = new BasicNotificationTypeAdapter();
+            result.add("notification",
+                    adapter.serialize((BasicNotification) notification, BasicNotification.class, arg2));
+        } else {
+            throw new IllegalArgumentException("Invalid notification type.");
+        }
+        return result;
+    }
+
+    @Override
+    public CommandNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2)
+            throws JsonParseException {
+
+        JsonObject json = arg0.getAsJsonObject();
+        Command command = Command.valueOf(json.get("command").getAsString());
+        String type = json.get("type").getAsString();
+        Notification notification = null;
+        if (type.equals(PeriodicNotification.class.getSimpleName())) {
+            notification = (new PeriodicNotificationTypeAdapter()).deserialize(json.get("notification"),
+                    PeriodicNotification.class, arg2);
+        } else if (type.equals(BasicNotification.class.getSimpleName())) {
+            notification = (new BasicNotificationTypeAdapter()).deserialize(json.get("notification"),
+                    BasicNotification.class, arg2);
+        } else {
+            throw new JsonParseException("Cannot deserialize Json");
+        }
+
+        return new CommandNotification(command, notification);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
new file mode 100644
index 0000000..fcc0ba2
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification.Builder;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} used to serialize and deserialize {@link PeriodicNotification}s.
+ * This TypeAdapter is used in {@link CommandNotificationTypeAdapter} which is used in
+ * {@link CommandNotificationSerializer} for producing and consuming messages to and from
+ * Kafka.
+ *
+ */
+public class PeriodicNotificationTypeAdapter
+        implements JsonSerializer<PeriodicNotification>, JsonDeserializer<PeriodicNotification> {
+
+    @Override
+    public PeriodicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2)
+            throws JsonParseException {
+
+        JsonObject json = arg0.getAsJsonObject();
+        String id = json.get("id").getAsString();
+        long period = json.get("period").getAsLong();
+        TimeUnit periodTimeUnit = TimeUnit.valueOf(json.get("timeUnit").getAsString());
+        long initialDelay = json.get("initialDelay").getAsLong();
+        Builder builder = PeriodicNotification.builder().id(id).period(period)
+                .initialDelay(initialDelay).timeUnit(periodTimeUnit);
+
+        return builder.build();
+    }
+
+    @Override
+    public JsonElement serialize(PeriodicNotification arg0, Type arg1, JsonSerializationContext arg2) {
+
+        JsonObject result = new JsonObject();
+        result.add("id", new JsonPrimitive(arg0.getId()));
+        result.add("period", new JsonPrimitive(arg0.getPeriod()));
+        result.add("initialDelay", new JsonPrimitive(arg0.getInitialDelay()));
+        result.add("timeUnit", new JsonPrimitive(arg0.getTimeUnit().name()));
+
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/pom.xml b/extras/periodic.notification/pom.xml
new file mode 100644
index 0000000..814f14f
--- /dev/null
+++ b/extras/periodic.notification/pom.xml
@@ -0,0 +1,40 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>rya.periodic.notification.parent</artifactId>
+  
+   <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.extras</artifactId>
+        <version>3.2.11-incubating-SNAPSHOT</version>
+    </parent>
+  
+  <name>Apache Rya Periodic Notification Parent</name>
+  <description>Parent POM for Rya Periodic Notification Projects</description>
+  
+  <packaging>pom</packaging>
+
+    <modules>
+        <module>api</module>
+        <module>service</module>
+        <module>tests</module>
+    </modules>
+  
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/pom.xml b/extras/periodic.notification/service/pom.xml
new file mode 100644
index 0000000..cc78646
--- /dev/null
+++ b/extras/periodic.notification/service/pom.xml
@@ -0,0 +1,102 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+        contributor license agreements. See the NOTICE file distributed with this 
+        work for additional information regarding copyright ownership. The ASF licenses 
+        this file to you under the Apache License, Version 2.0 (the "License"); you 
+        may not use this file except in compliance with the License. You may obtain 
+        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+        required by applicable law or agreed to in writing, software distributed 
+        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+        OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+        the specific language governing permissions and limitations under the License. -->
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.periodic.notification.parent</artifactId>
+        <version>3.2.11-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.periodic.notification.service</artifactId>
+
+    <name>Apache Rya Periodic Notification Service</name>
+    <description>Notifications for Rya Periodic Service</description>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.twill</groupId>
+            <artifactId>twill-api</artifactId>
+            <version>0.11.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.twill</groupId>
+            <artifactId>twill-yarn</artifactId>
+            <version>0.11.0</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>kafka_2.10</artifactId>
+                    <groupId>org.apache.kafka</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.8.0</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-query</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing.pcj</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.app</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.periodic.notification.api</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
new file mode 100644
index 0000000..b2c3709
--- /dev/null
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+/**
+ * Exception thrown when attempting to create a {@link PeriodicNotificationApplication}.
+ * Indicates that a factory was unable to create some component of the application 
+ * because something was configured incorrectly.
+ *
+ */
+public class PeriodicApplicationException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Creates a PeriodicApplicationException.
+     * @param message - message contained in Exception
+     */
+    public PeriodicApplicationException(String message) {
+        super(message);
+    }
+    
+    /**
+     * Creates a PeriodicApplicationException.
+     * @param message - message contained in Exception
+     * @param t - Exception that spawned this PeriodicApplicationException
+     */
+    public PeriodicApplicationException(String message, Throwable t) {
+        super(message, t);
+    }
+}