You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/09/04 13:14:07 UTC

[3/4] incubator-rya git commit: RYA-319-Integration of Periodic Query with CLI. Closes #220.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java
new file mode 100644
index 0000000..9724704
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.indexing.pcj.fluo.api.ListFluoQueries.FluoQueryStringBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+
+public class ListFluoQueriesIT extends RyaExportITBase {
+
+    @Test
+    public void queryMetadataTest() throws Exception {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        String sparql1 = "select ?x ?y ?z where {?x <uri:p1> ?y; <uri:p2> 'literal1'. ?y <uri:p3> ?z }";
+        String sparql2 = "select ?x ?y ?z where {{select ?x ?y ?z {?x <uri:p1> ?y; <uri:p2> ?z. ?y <uri:p3> ?z }}}";
+        
+        // Create the object that will be serialized.
+        String queryId1 = NodeType.generateNewFluoIdForType(NodeType.QUERY);
+        final QueryMetadata.Builder builder = QueryMetadata.builder(queryId1);
+        builder.setQueryType(QueryType.PROJECTION);
+        builder.setVarOrder(new VariableOrder("y;s;d"));
+        builder.setSparql(sparql1);
+        builder.setChildNodeId("childNodeId");
+        builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.KAFKA)));
+        final QueryMetadata meta1 = builder.build();
+
+        String queryId2 = NodeType.generateNewFluoIdForType(NodeType.QUERY);
+        final QueryMetadata.Builder builder2 = QueryMetadata.builder(queryId2);
+        builder2.setQueryType(QueryType.PROJECTION);
+        builder2.setVarOrder(new VariableOrder("y;s;d"));
+        builder2.setSparql(sparql2);
+        builder2.setChildNodeId("childNodeId");
+        builder2.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.RYA)));
+        final QueryMetadata meta2 = builder2.build();
+
+        try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try (Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, meta1);
+                dao.write(tx, meta2);
+                tx.commit();
+            }
+            ListFluoQueries listFluoQueries = new ListFluoQueries();
+            List<String> queries = listFluoQueries.listFluoQueries(fluoClient);
+            
+            FluoQueryStringBuilder queryBuilder1 = new FluoQueryStringBuilder();
+            String expected1 = queryBuilder1.setQueryId(queryId1).setQueryType(QueryType.PROJECTION).setQuery(sparql1)
+                    .setExportStrategies(Sets.newHashSet(ExportStrategy.KAFKA)).build();
+
+            FluoQueryStringBuilder queryBuilder2 = new FluoQueryStringBuilder();
+            String expected2 = queryBuilder2.setQueryId(queryId2).setQueryType(QueryType.PROJECTION).setQuery(sparql2)
+                    .setExportStrategies(Sets.newHashSet(ExportStrategy.RYA)).build();
+            
+            Set<String> expected = new HashSet<>();
+            expected.add(expected1);
+            expected.add(expected2);
+            
+            Assert.assertEquals(expected, Sets.newHashSet(queries));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
index 47a2f29..66aa04b 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
@@ -90,7 +90,7 @@ public class BatchIT extends RyaExportITBase {
 
             // Tell the Fluo app to maintain the PCJ.
             String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName());
+                    getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
             List<String> prefixes = Arrays.asList("urn:subject_1", "urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
@@ -130,7 +130,7 @@ public class BatchIT extends RyaExportITBase {
 
             // Tell the Fluo app to maintain the PCJ.
             String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName());
+                    getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
             String joinId = ids.get(2);
@@ -176,7 +176,7 @@ public class BatchIT extends RyaExportITBase {
 
             // Tell the Fluo app to maintain the PCJ.
             String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName());
+                    getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
             String joinId = ids.get(2);
@@ -225,7 +225,7 @@ public class BatchIT extends RyaExportITBase {
             // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
             // batch size of joins to 5.
             String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName());
+                    getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
 
@@ -264,7 +264,7 @@ public class BatchIT extends RyaExportITBase {
             // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
             // batch size of joins to 5.
             String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName());
+                    getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
 
@@ -305,7 +305,7 @@ public class BatchIT extends RyaExportITBase {
             // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
             // batch size of joins to 5.
             String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName());
+                    getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index a1d76cb..27b8222 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -33,7 +33,6 @@ import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.client.scanner.RowScanner;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Span;
-import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj;
@@ -131,7 +130,7 @@ public class CreateDeleteIT extends RyaExportITBase {
         // Register the PCJ with Rya.
         final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), getAccumuloConnector());
 
-        final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet(ExportStrategy.NO_OP_EXPORT));
+        final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet());
 
         // Write the data to Rya.
         final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
new file mode 100644
index 0000000..e61104a
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.xml.datatype.DatatypeFactory;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Span;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
+import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.google.common.collect.Sets;
+
+public class CreateDeletePeriodicPCJ extends KafkaExportITBase {
+
+    @Test
+    public void deletePeriodicPCJ() throws Exception {
+        String query = "prefix function: <http://org.apache.rya/function#> " // n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id }"; // n
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        ZonedDateTime time = ZonedDateTime.now();
+
+        ZonedDateTime zTime1 = time.minusMinutes(30);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusMinutes(30);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusMinutes(30);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime4 = zTime3.minusMinutes(30);
+        String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")));
+
+        runTest(query, statements, 29);
+
+    }
+
+   
+
+    private void runTest(String query, Collection<Statement> statements, int expectedEntries) throws Exception {
+        try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+
+            String topic = "notification_topic";
+            PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), RYA_INSTANCE_NAME);
+            PeriodicNotificationClient notificationClient = new KafkaNotificationRegistrationClient(topic,
+                    getNotificationProducer("localhost:9092"));
+
+            CreatePeriodicQuery periodicPCJ = new CreatePeriodicQuery(fluoClient, storage);
+            String id = periodicPCJ.createPeriodicQuery(query, notificationClient).getQueryId();
+
+            loadData(statements);
+
+            // Ensure the data was loaded.
+            final List<Bytes> rows = getFluoTableEntries(fluoClient);
+            assertEquals(expectedEntries, rows.size());
+
+            DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, storage);
+            deletePeriodic.deletePeriodicQuery(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notificationClient);
+
+            // Ensure all data related to the query has been removed.
+            final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
+            assertEquals(0, empty_rows.size());
+
+            // Ensure that Periodic Service notified to add and delete PeriodicNotification
+            Set<CommandNotification> notifications;
+            try (KafkaConsumer<String, CommandNotification> consumer = makeNotificationConsumer(topic)) {
+                notifications = getKafkaNotifications(topic, 7000, consumer);
+            }
+            assertEquals(2, notifications.size());
+
+            String notificationId = "";
+            boolean addCalled = false;
+            boolean deleteCalled = false;
+            for (CommandNotification notification : notifications) {
+                if (notificationId.length() == 0) {
+                    notificationId = notification.getId();
+                } else {
+                    assertEquals(notificationId, notification.getId());
+                }
+
+                if (notification.getCommand() == Command.ADD) {
+                    addCalled = true;
+                }
+
+                if (notification.getCommand() == Command.DELETE) {
+                    deleteCalled = true;
+                }
+            }
+
+            assertEquals(true, addCalled);
+            assertEquals(true, deleteCalled);
+        }
+    }
+
+    private List<Bytes> getFluoTableEntries(final FluoClient fluoClient) {
+        try (Snapshot snapshot = fluoClient.newSnapshot()) {
+            final List<Bytes> rows = new ArrayList<>();
+            final RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build();
+
+            for (final ColumnScanner cscanner : rscanner) {
+                rows.add(cscanner.getRow());
+            }
+
+            return rows;
+        }
+    }
+
+    private KafkaProducer<String, CommandNotification> getNotificationProducer(String bootStrapServers) {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
+        return new KafkaProducer<>(props);
+    }
+
+    private KafkaConsumer<String, CommandNotification> makeNotificationConsumer(final String topic) {
+        // setup consumer
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
+
+        // to make sure the consumer starts from the beginning of the topic
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        final KafkaConsumer<String, CommandNotification> consumer = new KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Arrays.asList(topic));
+        return consumer;
+    }
+
+    private Set<CommandNotification> getKafkaNotifications(String topic, int pollTime,
+            KafkaConsumer<String, CommandNotification> consumer) {
+        requireNonNull(topic);
+
+        // Read all of the results from the Kafka topic.
+        final Set<CommandNotification> results = new HashSet<>();
+
+        final ConsumerRecords<String, CommandNotification> records = consumer.poll(pollTime);
+        final Iterator<ConsumerRecord<String, CommandNotification>> recordIterator = records.iterator();
+        while (recordIterator.hasNext()) {
+            results.add(recordIterator.next().value());
+        }
+
+        return results;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index 8911f56..dbedfb3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -90,7 +90,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                         vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         FluoITHelper.printFluoTable(super.getFluoConfiguration());
         
@@ -136,7 +136,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final MapBindingSet expectedResult = new MapBindingSet();
@@ -163,7 +163,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final MapBindingSet expectedResult = new MapBindingSet();
@@ -194,7 +194,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(3.99)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final MapBindingSet expectedResult = new MapBindingSet();
@@ -221,7 +221,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:count"), vf.createLiteral(2)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final MapBindingSet expectedResult = new MapBindingSet();
@@ -248,7 +248,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(8)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
         
         try(FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) {
             FluoITHelper.printFluoTable(fluo);
@@ -280,7 +280,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final MapBindingSet expectedResult = new MapBindingSet();
@@ -307,7 +307,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(2.75)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final MapBindingSet expectedResult = new MapBindingSet();
@@ -338,7 +338,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:banana"), vf.createURI("urn:price"), vf.createLiteral(1.99)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
@@ -399,7 +399,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
@@ -477,7 +477,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
@@ -554,7 +554,7 @@ public class KafkaExportIT extends KafkaExportITBase {
                 vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
 
         // Create the PCJ in Fluo and load the statements into Rya.
-        final String pcjId = loadData(sparql, statements);
+        final String pcjId = loadDataAndCreateQuery(sparql, statements);
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final Set<VisibilityBindingSet> expectedResults = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 0aefaca..4974aee 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -70,10 +70,6 @@ import com.google.common.collect.Sets;
  */
 public class QueryIT extends RyaExportITBase {
 
-    private enum ExporterType {
-        Pcj, Periodic
-    };
-
     @Test
     public void optionalStatements() throws Exception {
         // A query that has optional statement patterns. This query is looking for all
@@ -112,7 +108,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+        runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
     }
 
     /**
@@ -187,7 +183,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+        runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
     }
 
     @Test
@@ -241,7 +237,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+        runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
     }
 
     @Test
@@ -278,7 +274,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+        runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
     }
 
     @Test
@@ -359,7 +355,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+        runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
     }
 
     @Test
@@ -424,7 +420,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+        runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
     }
 
     @Test
@@ -525,7 +521,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(query, statements, expectedResults, ExporterType.Periodic);
+        runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
     }
 
     @Test
@@ -596,7 +592,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(query, statements, expectedResults, ExporterType.Periodic);
+        runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
     }
 
     @Test
@@ -713,7 +709,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(query, statements, expectedResults, ExporterType.Periodic);
+        runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
     }
     
     
@@ -792,7 +788,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(query, statements, expectedResults, ExporterType.Periodic);
+        runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
     }
     
     @Test
@@ -876,7 +872,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(query, statements, expectedResults, ExporterType.Periodic);
+        runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
     }
 
     @Test(expected= UnsupportedQueryException.class)
@@ -896,11 +892,11 @@ public class QueryIT extends RyaExportITBase {
         final Set<BindingSet> expectedResults = new HashSet<>();
 
         // Verify the end results of the query match the expected results.
-        runTest(query, statements, expectedResults, ExporterType.Periodic);
+        runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
     }
 
     public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults,
-            ExporterType type) throws Exception {
+            ExportStrategy strategy) throws Exception {
         requireNonNull(sparql);
         requireNonNull(statements);
         requireNonNull(expectedResults);
@@ -910,8 +906,8 @@ public class QueryIT extends RyaExportITBase {
 
         final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn);
 
-        switch (type) {
-        case Pcj:
+        switch (strategy) {
+        case RYA:
             ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
             addStatementsAndWait(statements);
             // Fetch the value that is stored within the PCJ table.
@@ -922,11 +918,11 @@ public class QueryIT extends RyaExportITBase {
                 assertEquals(expectedResults, results);
             }
             break;
-        case Periodic:
+        case PERIODIC:
             PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName());
             String periodicId = periodicStorage.createPeriodicQuery(sparql);
             try (FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) {
-                new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo);
+                new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluo);
             }
             addStatementsAndWait(statements);
             
@@ -938,6 +934,8 @@ public class QueryIT extends RyaExportITBase {
             }
             assertEquals(expectedResults, results);
             break;
+        default:
+            throw new RuntimeException("Invalid export option");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index ed9ce60..59fe54f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -321,7 +321,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         return consumer;
     }
 
-    protected String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
+    protected String loadDataAndCreateQuery(final String sparql, final Collection<Statement> statements) throws Exception {
         requireNonNull(sparql);
         requireNonNull(statements);
 
@@ -334,7 +334,16 @@ public class KafkaExportITBase extends AccumuloExportITBase {
 
         final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA));
 
-        // Write the data to Rya.
+        loadData(statements);
+
+        // The PCJ Id is the topic name the results will be written to.
+        return pcjId;
+    }
+    
+    protected void loadData(final Collection<Statement> statements) throws Exception {
+        
+        requireNonNull(statements);
+
         final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
         ryaConn.begin();
         ryaConn.add(statements);
@@ -343,9 +352,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
 
         // Wait for the Fluo application to finish computing the end result.
         super.getMiniFluo().waitForObservers();
-
-        // The PCJ Id is the topic name the results will be written to.
-        return pcjId;
+        
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/.gitignore
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/.gitignore b/extras/rya.periodic.service/periodic.service.api/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/pom.xml b/extras/rya.periodic.service/periodic.service.api/pom.xml
new file mode 100644
index 0000000..b57beaf
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/pom.xml
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+        contributor license agreements. See the NOTICE file distributed with this 
+        work for additional information regarding copyright ownership. The ASF licenses 
+        this file to you under the Apache License, Version 2.0 (the "License"); you 
+        may not use this file except in compliance with the License. You may obtain 
+        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+        required by applicable law or agreed to in writing, software distributed 
+        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+        OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+        the specific language governing permissions and limitations under the License. -->
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.periodic.service</artifactId>
+        <version>3.2.11-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.periodic.service.api</artifactId>
+
+    <name>Apache Rya Periodic Service API</name>
+    <description>API for Periodic Service Application</description>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.8.0</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-query</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing.pcj</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
new file mode 100644
index 0000000..f4a083c
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Object that cleans up old {@link BindingSet}s corresponding to the specified
+ * {@link NodeBin}. This class deletes all BindingSets with the bin 
+ * indicated by {@link NodeBin#getBin()}.  A BindingSet corresponds to a given
+ * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}
+ * and value equal to the given bin.
+ *
+ */
+public interface BinPruner {
+    
+    /**
+     * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}.
+     * @param bin - NodeBin that indicates which BindingSets to delete..
+     */
+    public void pruneBindingSetBin(NodeBin bin);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
new file mode 100644
index 0000000..491576b
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.BindingSet;
+
+/**
+ * An Object that is used to export {@link BindingSet}s to an external repository or queuing system.
+ *
+ */
+public interface BindingSetExporter {
+
+    /**
+     * This method exports the BindingSet to the external repository or queuing system
+     * that this BindingSetExporter is configured to export to.
+     * @param bindingSet - {@link BindingSet} to be exported
+     * @throws ResultExportException
+     */
+    public void exportNotification(BindingSetRecord bindingSet) throws BindingSetRecordExportException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
new file mode 100644
index 0000000..c3f70f1
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Objects;
+
+/**
+ * Object that associates a {@link BindingSet} with a given Kafka topic.
+ * This ensures that the {@link KafkaPeriodicBindingSetExporter} can export
+ * each BindingSet to its appropriate topic.
+ *
+ */
+public class BindingSetRecord {
+
+    private BindingSet bs;
+    private String topic;
+    
+    public BindingSetRecord(BindingSet bs, String topic) {
+        this.bs = bs;
+        this.topic = topic;
+    }
+    
+    /**
+     * @return BindingSet in this BindingSetRecord
+     */
+    public BindingSet getBindingSet() {
+        return bs;
+    }
+    
+    /**
+     * @return Kafka topic for this BindingSetRecord
+     */
+    public String getTopic() {
+        return topic;
+    }
+    
+    @Override 
+    public boolean equals(Object o) {
+        if(this == o) {
+            return true;
+        }
+        
+        if(o instanceof BindingSetRecord) {
+            BindingSetRecord record = (BindingSetRecord) o;
+            return Objects.equal(this.bs, record.bs)&&Objects.equal(this.topic,record.topic);
+        }
+        
+        return false;
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(bs, topic);
+    }
+    
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Binding Set Record \n").append("  Topic: " + topic + "\n").append("  BindingSet: " + bs + "\n")
+                .toString();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
new file mode 100644
index 0000000..94e4980
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * A result could not be exported.
+ */
+public class BindingSetRecordExportException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs an instance of {@link BindingSetRecordExportException}.
+     *
+     * @param message - Explains why the exception was thrown.
+     */
+    public BindingSetRecordExportException(final String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs an instance of {@link BindingSetRecordExportException}.
+     *
+     * @param message - Explains why the exception was thrown.
+     * @param cause - The exception that caused this one to be thrown.
+     */
+    public BindingSetRecordExportException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
new file mode 100644
index 0000000..b1e8bad
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Interface providing basic life cycle functionality,
+ * including stopping and starting any class implementing this
+ * interface and checking whether is it running.
+ *
+ */
+public interface LifeCycle {
+
+    /**
+     * Starts a running application.
+     */
+    public void start();
+
+    /**
+     * Stops a running application.
+     */
+    public void stop();
+    
+    /**
+     * Determine if application is currently running.
+     * @return true if application is running and false otherwise.
+     */
+    public boolean currentlyRunning();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
new file mode 100644
index 0000000..3ed7979
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.Objects;
+
+/**
+ * Object used to indicate the id of a given Periodic Query
+ * along with a particular bin of results.  This Object is used
+ * by the {@link BinPruner} to clean up old query results after
+ * they have been processed.
+ *
+ */
+public class NodeBin {
+
+    private long bin;
+    private String nodeId;
+
+    public NodeBin(String nodeId, long bin) {
+        this.bin = bin;
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return id of Periodic Query
+     */
+    public String getNodeId() {
+        return nodeId;
+    }
+/**
+ * @return bin id of results for a given Periodic Query 
+ */
+    public long getBin() {
+        return bin;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other instanceof NodeBin) {
+            NodeBin bin = (NodeBin) other;
+            return this.bin == bin.bin && this.nodeId.equals(bin.nodeId);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bin, nodeId);
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Node Bin \n").append("   QueryId: " + nodeId + "\n").append("   Bin: " + bin + "\n").toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
new file mode 100644
index 0000000..3e9e0d1
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public interface Notification {
+
+    /**
+     * @return id of a Periodic Query
+     */
+    public String getId();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
new file mode 100644
index 0000000..d53dc17
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+
+/**
+ * Object that manages the periodic notifications for the Periodic Query Service.
+ * This Object processes new requests for periodic updates by registering them with
+ * some sort of service that generates periodic updates (such as a {@link ScheduledExecutorService}).
+ *
+ */
+public interface NotificationCoordinatorExecutor extends LifeCycle {
+
+    /**
+     * Registers or deletes a {@link CommandNotification}s with the periodic service to
+     * generate notifications at a regular interval indicated by the CommandNotification.
+     * @param notification - CommandNotification to be registered or deleted from the periodic update
+     * service.
+     */
+    public void processNextCommandNotification(CommandNotification notification);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
new file mode 100644
index 0000000..4ac9089
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
@@ -0,0 +1,41 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+
+/**
+ * Object that processes new {@link TimestampedNotification}s generated by {@link NotificationCoordinatorExecutor}.
+ * It is expected that the NotificationCoordinatorExecutor will this Object with notifications to perform work via some sort 
+ * sort of queuing service such as a BlockingQueue or Kafka.  This Object processes the notifications by retrieving
+ * query results associated with the Periodic Query id given by {@link TimestampedNotification#getId()}, parsing them
+ * and then providing them to another service to be exported.
+ *
+ */
+public interface NotificationProcessor {
+
+    /**
+     * Processes {@link TimestampedNotification}s by retrieving the Periodic Query results
+     * associated the query id given by {@link TimestampedNotification#getId()}.
+     * @param notification - contains information about which query results to retrieve
+     */
+    public void processNotification(TimestampedNotification notification);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
new file mode 100644
index 0000000..ff08733
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ * Object to register {@link PeriodicNotification}s with an external queuing
+ * service to be handled by a {@link NotificationCoordinatorExecutor} service.
+ * The service will generate notifications to process Periodic Query results at regular
+ * intervals corresponding the period of the PeriodicNotification.
+ *
+ */
+public interface PeriodicNotificationClient extends AutoCloseable {
+
+    /**
+     * Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor}
+     * @param notification - notification to be added
+     */
+    public void addNotification(PeriodicNotification notification);
+    
+    /**
+     * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
+     * @param notification - notification to be deleted
+     */
+    public void deleteNotification(BasicNotification notification);
+    
+    /**
+     * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
+     * @param notification - id corresponding to the notification to be deleted
+     */
+    public void deleteNotification(String notificationId);
+    
+    /**
+     * Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor}
+     * @param id - Periodic Query id
+     * @param period - period indicating frequency at which notifications will be generated
+     * @param delay - initial delay for starting periodic notifications
+     * @param unit - time unit of delay and period
+     */
+    public void addNotification(String id, long period, long delay, TimeUnit unit);
+    
+    public void close();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
new file mode 100644
index 0000000..c31a5c0
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public class BasicNotification implements Notification {
+
+    private String id;
+
+    /**
+     * Creates a BasicNotification
+     * @param id - Fluo query id associated with this Notification
+     */
+    public BasicNotification(String id) {
+        this.id = id;
+    }
+
+    /**
+     * @return the Fluo Query Id that this notification will generate results for
+     */
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other instanceof BasicNotification) {
+            BasicNotification not = (BasicNotification) other;
+            return Objects.equal(this.id, not.id);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(id);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        return builder.append("id").append("=").append(id).toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
new file mode 100644
index 0000000..597b228
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This Object contains a Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic Query with the
+ * indicated id. Additionally, the CommandNotification contains a
+ * {@link Command} about which action the
+ * {@link NotificationCoordinatorExecutor} should take (adding or deleting).
+ * CommandNotifications are meant to be added to an external work queue (such as
+ * Kafka) to be processed by the NotificationCoordinatorExecutor.
+ *
+ */
+public class CommandNotification implements Notification {
+
+    private Notification notification;
+    private Command command;
+
+    public enum Command {
+        ADD, DELETE
+    };
+
+    /**
+     * Creates a new CommandNotification
+     * @param command - the command associated with this notification (either add, update, or delete)
+     * @param notification - the underlying notification associated with this command
+     */
+    public CommandNotification(Command command, Notification notification) {
+        this.notification = Preconditions.checkNotNull(notification);
+        this.command = Preconditions.checkNotNull(command);
+    }
+
+    @Override
+    public String getId() {
+        return notification.getId();
+    }
+
+    /**
+     * Returns {@link Notification} contained by this CommmandNotification.
+     * @return - Notification contained by this Object
+     */
+    public Notification getNotification() {
+        return this.notification;
+    }
+
+    /**
+     * @return Command contained by this Object (either add or delete)
+     */
+    public Command getCommand() {
+        return this.command;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other instanceof CommandNotification) {
+            CommandNotification cn = (CommandNotification) other;
+            return Objects.equal(this.command, cn.command) && Objects.equal(this.notification, cn.notification);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(command, notification);
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("command").append("=").append(command.toString()).append(";")
+                .append(notification.toString()).toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
new file mode 100644
index 0000000..aa9e581
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Notification Object used by the Periodic Query Service to inform workers to
+ * process results for a given Periodic Query with the indicated id.
+ * Additionally, this Object contains a period that indicates a frequency at
+ * which regular updates are generated.
+ *
+ */
+public class PeriodicNotification implements Notification {
+
+    private String id;
+    private long period;
+    private TimeUnit periodTimeUnit;
+    private long initialDelay;
+
+    /**
+     * Creates a PeriodicNotification.
+     * @param id - Fluo Query Id that this notification is associated with
+     * @param period - period at which notifications are generated
+     * @param periodTimeUnit - time unit associated with the period and delay
+     * @param initialDelay - amount of time to wait before generating the first notification
+     */
+    public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) {
+        this.id = Preconditions.checkNotNull(id);
+        this.periodTimeUnit = Preconditions.checkNotNull(periodTimeUnit);
+        Preconditions.checkArgument(period > 0 && initialDelay >= 0);
+        this.period = period;
+        this.initialDelay = initialDelay;
+    }
+    
+
+    /**
+     * Create a PeriodicNotification
+     * @param other - other PeriodicNotification used in copy constructor
+     */
+    public PeriodicNotification(PeriodicNotification other) {
+        this(other.id, other.period, other.periodTimeUnit, other.initialDelay);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * @return - period at which regular notifications are generated
+     */
+    public long getPeriod() {
+        return period;
+    }
+
+    /**
+     * @return time unit of period and initial delay
+     */
+    public TimeUnit getTimeUnit() {
+        return periodTimeUnit;
+    }
+
+    /**
+     * @return amount of time to delay before beginning to generate notifications
+     */
+    public long getInitialDelay() {
+        return initialDelay;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        String delim = "=";
+        String delim2 = ";";
+        return builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2)
+                .append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim)
+                .append(initialDelay).toString();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (!(other instanceof PeriodicNotification)) {
+            return false;
+        }
+
+        PeriodicNotification notification = (PeriodicNotification) other;
+        return Objects.equals(this.id, notification.id) && (this.period == notification.period) 
+                && Objects.equals(this.periodTimeUnit, notification.periodTimeUnit) && (this.initialDelay == notification.initialDelay);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, period, periodTimeUnit, initialDelay);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private String id;
+        private long period;
+        private TimeUnit periodTimeUnit;
+        private long initialDelay = 0;
+
+        /**
+         * @param id - periodic query id
+         * @return - builder to chain method calls
+         */
+        public Builder id(String id) {
+            this.id = id;
+            return this;
+        }
+
+        /**
+         * @param period of the periodic notification for generating regular notifications
+         * @return - builder to chain method calls
+         */
+        public Builder period(long period) {
+            this.period = period;
+            return this;
+        }
+
+        /**
+         * @param timeUnit of period and initial delay
+          * @return - builder to chain method calls
+         */
+        public Builder timeUnit(TimeUnit timeUnit) {
+            this.periodTimeUnit = timeUnit;
+            return this;
+        }
+
+        /**
+         * @param initialDelay - amount of time to wait before generating notifications
+         * @return - builder to chain method calls
+         */
+        public Builder initialDelay(long initialDelay) {
+            this.initialDelay = initialDelay;
+            return this;
+        }
+
+        /**
+         * Builds PeriodicNotification
+         * @return PeriodicNotification constructed from Builder specified parameters
+         */
+        public PeriodicNotification build() {
+            return new PeriodicNotification(id, period, periodTimeUnit, initialDelay);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
new file mode 100644
index 0000000..38073ce
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link PeriodicNotification} Object used by the Periodic Query Service to inform workers to
+ * process results for a given Periodic Query with the indicated id.  Additionally
+ * this Object contains a {@link Date} object to indicate the date time at which this
+ * notification was generated.
+ *
+ */
+public class TimestampedNotification extends PeriodicNotification {
+
+    private Date date;
+
+    /**
+     * Constructs a TimestampedNotification
+     * @param id - Fluo Query Id associated with this Notification
+     * @param period - period at which notifications are generated
+     * @param periodTimeUnit - time unit associated with period and initial delay
+     * @param initialDelay - amount of time to wait before generating first notification
+     */
+    public TimestampedNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) {
+        super(id, period, periodTimeUnit, initialDelay);
+        date = new Date();
+    }
+    
+    /**
+     * Creates a TimestampedNotification
+     * @param notification - PeriodicNotification used to create this TimestampedNotification.  
+     * This constructor creates a time stamp for the TimestampedNotification.
+     */
+    public TimestampedNotification(PeriodicNotification notification) {
+        super(notification);
+        date = new Date();
+    }
+
+    /**
+     * @return timestamp at which this notification was generated
+     */
+    public Date getTimestamp() {
+        return date;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString() + ";date=" + date;
+    }
+
+}