You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:01:56 UTC

[3/9] incubator-rya git commit: RYA-280-Periodic Query Service. Closes #177.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
new file mode 100644
index 0000000..cb7557c
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.apache.rya.periodic.notification.api.CreatePeriodicQuery;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+public class PeriodicNotificationApplicationIT extends RyaExportITBase {
+
+    private PeriodicNotificationApplication app;
+    private KafkaNotificationRegistrationClient registrar;
+    private KafkaProducer<String, CommandNotification> producer;
+    private Properties props;
+    private Properties kafkaProps;
+    PeriodicNotificationApplicationConfiguration conf;
+    
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private ZkUtils zkUtils;
+    private KafkaServer kafkaServer;
+    private EmbeddedZookeeper zkServer;
+    private ZkClient zkClient;
+    
+    @Before
+    public void init() throws Exception {
+        setUpKafka();
+        props = getProps();
+        conf = new PeriodicNotificationApplicationConfiguration(props);
+        kafkaProps = getKafkaProperties(conf);
+        app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
+        producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer());
+        registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
+    }
+    
+    private void setUpKafka() throws Exception {
+        // Setup Kafka.
+        zkServer = new EmbeddedZookeeper();
+        final String zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+        zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Brokersparql
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+        final KafkaConfig config = new KafkaConfig(brokerProps);
+        final Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+    }
+    
+    @Test
+    public void periodicApplicationWithAggAndGroupByTest() throws Exception {
+
+        String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?type (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasObsType> ?type } group by ?type"; // n
+        
+        //make data
+        int periodMult = 15;
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        //Sleep until current time aligns nicely with period to makell
+        //results more predictable
+        while(System.currentTimeMillis() % (periodMult*1000) > 500);
+        ZonedDateTime time = ZonedDateTime.now();
+
+        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasObsType"), vf.createLiteral("ship")),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasObsType"), vf.createLiteral("ship")),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")),
+                vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasObsType"), vf.createLiteral("automobile")));
+        
+        try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
+            Connector connector = ConfigUtils.getConnector(conf);
+            PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+            String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+            addData(statements);
+            app.start();
+//            
+            Multimap<Long, BindingSet> actual = HashMultimap.create();
+            try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
+                consumer.subscribe(Arrays.asList(id));
+                long end = System.currentTimeMillis() + 4*periodMult*1000;
+                long lastBinId = 0L;
+                long binId = 0L;
+                List<Long> ids = new ArrayList<>();
+                while (System.currentTimeMillis() < end) {
+                    ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+                    for(ConsumerRecord<String, BindingSet> record: records){
+                        BindingSet result = record.value();
+                        binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
+                        if(lastBinId != binId) {
+                            lastBinId = binId;
+                            ids.add(binId);
+                        }
+                        actual.put(binId, result);
+                    }
+                }
+                
+                Map<Long, Set<BindingSet>> expected = new HashMap<>();
+                
+                Set<BindingSet> expected1 = new HashSet<>();
+                QueryBindingSet bs1 = new QueryBindingSet();
+                bs1.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
+                bs1.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
+                bs1.addBinding("type", vf.createLiteral("airplane"));
+                
+                QueryBindingSet bs2 = new QueryBindingSet();
+                bs2.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
+                bs2.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
+                bs2.addBinding("type", vf.createLiteral("ship"));
+                
+                QueryBindingSet bs3 = new QueryBindingSet();
+                bs3.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
+                bs3.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
+                bs3.addBinding("type", vf.createLiteral("automobile"));
+                
+                expected1.add(bs1);
+                expected1.add(bs2);
+                expected1.add(bs3);
+                
+                Set<BindingSet> expected2 = new HashSet<>();
+                QueryBindingSet bs4 = new QueryBindingSet();
+                bs4.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1)));
+                bs4.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
+                bs4.addBinding("type", vf.createLiteral("airplane"));
+                
+                QueryBindingSet bs5 = new QueryBindingSet();
+                bs5.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1)));
+                bs5.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
+                bs5.addBinding("type", vf.createLiteral("ship"));
+                
+                expected2.add(bs4);
+                expected2.add(bs5);
+                
+                Set<BindingSet> expected3 = new HashSet<>();
+                QueryBindingSet bs6 = new QueryBindingSet();
+                bs6.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2)));
+                bs6.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
+                bs6.addBinding("type", vf.createLiteral("ship"));
+                
+                QueryBindingSet bs7 = new QueryBindingSet();
+                bs7.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2)));
+                bs7.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
+                bs7.addBinding("type", vf.createLiteral("airplane"));
+                
+                expected3.add(bs6);
+                expected3.add(bs7);
+                
+                expected.put(ids.get(0), expected1);
+                expected.put(ids.get(1), expected2);
+                expected.put(ids.get(2), expected3);
+                
+                Assert.assertEquals(3, actual.asMap().size());
+                for(Long ident: ids) {
+                    Assert.assertEquals(expected.get(ident), actual.get(ident));
+                }
+            }
+            
+            Set<BindingSet> expectedResults = new HashSet<>();
+            try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
+                results.forEachRemaining(x -> expectedResults.add(x));
+                Assert.assertEquals(0, expectedResults.size());
+            }
+        }
+    }
+    
+    
+    @Test
+    public void periodicApplicationWithAggTest() throws Exception {
+
+        String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id } "; // n
+        
+        //make data
+        int periodMult = 15;
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        //Sleep until current time aligns nicely with period to make
+        //results more predictable
+        while(System.currentTimeMillis() % (periodMult*1000) > 500);
+        ZonedDateTime time = ZonedDateTime.now();
+
+        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
+        
+        try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
+            Connector connector = ConfigUtils.getConnector(conf);
+            PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+            String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+            addData(statements);
+            app.start();
+//            
+            Multimap<Long, BindingSet> expected = HashMultimap.create();
+            try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
+                consumer.subscribe(Arrays.asList(id));
+                long end = System.currentTimeMillis() + 4*periodMult*1000;
+                long lastBinId = 0L;
+                long binId = 0L;
+                List<Long> ids = new ArrayList<>();
+                while (System.currentTimeMillis() < end) {
+                    ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+                    for(ConsumerRecord<String, BindingSet> record: records){
+                        BindingSet result = record.value();
+                        binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
+                        if(lastBinId != binId) {
+                            lastBinId = binId;
+                            ids.add(binId);
+                        }
+                        expected.put(binId, result);
+                    }
+                }
+                
+                Assert.assertEquals(3, expected.asMap().size());
+                int i = 0;
+                for(Long ident: ids) {
+                    Assert.assertEquals(1, expected.get(ident).size());
+                    BindingSet bs = expected.get(ident).iterator().next();
+                    Value val = bs.getValue("total");
+                    int total = Integer.parseInt(val.stringValue());
+                    Assert.assertEquals(3-i, total);
+                    i++;
+                }
+            }
+            
+            
+            Set<BindingSet> expectedResults = new HashSet<>();
+            try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
+                results.forEachRemaining(x -> expectedResults.add(x));
+                Assert.assertEquals(0, expectedResults.size());
+            }
+        }
+
+    }
+    
+    
+    @Test
+    public void periodicApplicationTest() throws Exception {
+
+        String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?obs ?id where {" // n
+                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id } "; // n
+        
+        //make data
+        int periodMult = 15;
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        //Sleep until current time aligns nicely with period to make
+        //results more predictable
+        while(System.currentTimeMillis() % (periodMult*1000) > 500);
+        ZonedDateTime time = ZonedDateTime.now();
+
+        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
+        
+        try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
+            Connector connector = ConfigUtils.getConnector(conf);
+            PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+            String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+            addData(statements);
+            app.start();
+//            
+            Multimap<Long, BindingSet> expected = HashMultimap.create();
+            try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
+                consumer.subscribe(Arrays.asList(id));
+                long end = System.currentTimeMillis() + 4*periodMult*1000;
+                long lastBinId = 0L;
+                long binId = 0L;
+                List<Long> ids = new ArrayList<>();
+                while (System.currentTimeMillis() < end) {
+                    ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+                    for(ConsumerRecord<String, BindingSet> record: records){
+                        BindingSet result = record.value();
+                        binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
+                        if(lastBinId != binId) {
+                            lastBinId = binId;
+                            ids.add(binId);
+                        }
+                        expected.put(binId, result);
+                    }
+                }
+                
+                Assert.assertEquals(3, expected.asMap().size());
+                int i = 0;
+                for(Long ident: ids) {
+                    Assert.assertEquals(3-i, expected.get(ident).size());
+                    i++;
+                }
+            }
+            
+            
+            Set<BindingSet> expectedResults = new HashSet<>();
+            try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
+                results.forEachRemaining(x -> expectedResults.add(x));
+                Assert.assertEquals(0, expectedResults.size());
+            }
+        }
+
+    }
+    
+    
+    @After
+    public void shutdown() {
+        registrar.close();
+        app.stop();
+        teardownKafka();
+    }
+    
+    private void teardownKafka() {
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+    }
+    
+    private void addData(Collection<Statement> statements) throws DatatypeConfigurationException {
+        // add statements to Fluo
+        try (FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
+            InsertTriples inserter = new InsertTriples();
+            statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x)));
+            getMiniFluo().waitForObservers();
+//            FluoITHelper.printFluoTable(fluo);
+        }
+
+    }
+
+    private Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { 
+        Properties kafkaProps = new Properties();
+        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
+        kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId());
+        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
+        kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return kafkaProps;
+    }
+
+    
+    private Properties getProps() throws IOException {
+        
+        Properties props = new Properties();
+        try(InputStream in = new FileInputStream("src/test/resources/notification.properties")) {
+            props.load(in);
+        } 
+        
+        FluoConfiguration fluoConf = getFluoConfiguration();
+        props.setProperty("accumulo.user", getUsername());
+        props.setProperty("accumulo.password", getPassword());
+        props.setProperty("accumulo.instance", getMiniAccumuloCluster().getInstanceName());
+        props.setProperty("accumulo.zookeepers", getMiniAccumuloCluster().getZooKeepers());
+        props.setProperty("accumulo.rya.prefix", getRyaInstanceName());
+        props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_APP_NAME, fluoConf.getApplicationName());
+        props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_TABLE_NAME, fluoConf.getAccumuloTable());
+        return props;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
new file mode 100644
index 0000000..1902248
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
+import org.junit.Test;
+import org.openrdf.query.MalformedQueryException;
+
+import org.junit.Assert;
+
+public class PeriodicNotificationProviderIT extends AccumuloExportITBase {
+
+    @Test
+    public void testProvider() throws MalformedQueryException, InterruptedException {
+        
+        String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?id (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id } group by ?id"; // n
+        
+        BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+        PeriodicNotificationCoordinatorExecutor coord = new PeriodicNotificationCoordinatorExecutor(2, notifications);
+        PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
+        CreatePcj pcj = new CreatePcj();
+        
+        String id = null;
+        try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
+            id = pcj.createPcj(sparql, fluo);
+            provider.processRegisteredNotifications(coord, fluo.newSnapshot());
+        }
+        
+        TimestampedNotification notification = notifications.take();
+        Assert.assertEquals(5000, notification.getInitialDelay());
+        Assert.assertEquals(15000, notification.getPeriod());
+        Assert.assertEquals(TimeUnit.MILLISECONDS, notification.getTimeUnit());
+        Assert.assertEquals(id, notification.getId());
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
new file mode 100644
index 0000000..c0efc4f
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.exporter;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.kafka.base.KafkaITBase;
+import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class PeriodicNotificationExporterIT extends KafkaITBase {
+
+    private static final ValueFactory vf = new ValueFactoryImpl();
+    
+    @Test
+    public void testExporter() throws InterruptedException {
+        
+        BlockingQueue<BindingSetRecord> records = new LinkedBlockingQueue<>();
+        Properties props = createKafkaConfig();
+        
+        KafkaExporterExecutor exporter = new KafkaExporterExecutor(new KafkaProducer<String, BindingSet>(props), 1, records);
+        exporter.start();
+        
+        QueryBindingSet bs1 = new QueryBindingSet();
+        bs1.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(1L));
+        bs1.addBinding("name", vf.createURI("uri:Bob"));
+        BindingSetRecord record1 = new BindingSetRecord(bs1, "topic1");
+        
+        QueryBindingSet bs2 = new QueryBindingSet();
+        bs2.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(2L));
+        bs2.addBinding("name", vf.createURI("uri:Joe"));
+        BindingSetRecord record2 = new BindingSetRecord(bs2, "topic2");
+        
+        records.add(record1);
+        records.add(record2);
+        
+        Set<BindingSet> expected1 = new HashSet<>();
+        expected1.add(bs1);
+        Set<BindingSet> expected2 = new HashSet<>();
+        expected2.add(bs2);
+        
+        Set<BindingSet> actual1 = getBindingSetsFromKafka("topic1");
+        Set<BindingSet> actual2 = getBindingSetsFromKafka("topic2");
+        
+        Assert.assertEquals(expected1, actual1);
+        Assert.assertEquals(expected2, actual2);
+        
+        exporter.stop();
+        
+    }
+    
+    
+    private Properties createKafkaConfig() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName());
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName());
+
+        return props;
+    }
+    
+    
+    private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final String TopicName) {
+        // setup consumer
+        final Properties consumerProps = createKafkaConfig();
+        final KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Arrays.asList(TopicName));
+        return consumer;
+    }
+    
+    private Set<BindingSet> getBindingSetsFromKafka(String topic) {
+        KafkaConsumer<String, BindingSet> consumer = null;
+
+        try {
+            consumer = makeBindingSetConsumer(topic);
+            ConsumerRecords<String, BindingSet> records = consumer.poll(5000);
+
+            Set<BindingSet> bindingSets = new HashSet<>();
+            records.forEach(x -> bindingSets.add(x.value()));
+
+            return bindingSets;
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (consumer != null) {
+                consumer.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
new file mode 100644
index 0000000..fa60e48
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.processor;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class PeriodicNotificationProcessorIT extends AccumuloExportITBase {
+
+    private static final ValueFactory vf = new ValueFactoryImpl();
+    private static final String RYA_INSTANCE_NAME = "rya_";
+    
+    @Test
+    public void periodicProcessorTest() throws Exception {
+        
+        String id = UUID.randomUUID().toString().replace("-", "");
+        BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+        BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
+        BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();
+        
+        TimestampedNotification ts1 = new TimestampedNotification(
+                PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build());  
+        long binId1 = (ts1.getTimestamp().getTime()/ts1.getPeriod())*ts1.getPeriod();
+        
+        Thread.sleep(2000);
+        
+        TimestampedNotification ts2 = new TimestampedNotification(
+                PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build());  
+        long binId2 = (ts2.getTimestamp().getTime()/ts2.getPeriod())*ts2.getPeriod();
+        
+        Set<NodeBin> expectedBins = new HashSet<>();
+        expectedBins.add(new NodeBin(id, binId1));
+        expectedBins.add(new NodeBin(id, binId2));
+        
+        Set<BindingSet> expected = new HashSet<>();
+        Set<VisibilityBindingSet> storageResults = new HashSet<>();
+        
+        QueryBindingSet bs1 = new QueryBindingSet();
+        bs1.addBinding("periodicBinId", vf.createLiteral(binId1));
+        bs1.addBinding("id", vf.createLiteral(1));
+        expected.add(bs1);
+        storageResults.add(new VisibilityBindingSet(bs1));
+        
+        QueryBindingSet bs2 = new QueryBindingSet();
+        bs2.addBinding("periodicBinId", vf.createLiteral(binId1));
+        bs2.addBinding("id", vf.createLiteral(2));
+        expected.add(bs2);
+        storageResults.add(new VisibilityBindingSet(bs2));
+        
+        QueryBindingSet bs3 = new QueryBindingSet();
+        bs3.addBinding("periodicBinId", vf.createLiteral(binId2));
+        bs3.addBinding("id", vf.createLiteral(3));
+        expected.add(bs3);
+        storageResults.add(new VisibilityBindingSet(bs3));
+        
+        QueryBindingSet bs4 = new QueryBindingSet();
+        bs4.addBinding("periodicBinId", vf.createLiteral(binId2));
+        bs4.addBinding("id", vf.createLiteral(4));
+        expected.add(bs4);
+        storageResults.add(new VisibilityBindingSet(bs4));
+        
+        PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
+                RYA_INSTANCE_NAME);
+        periodicStorage.createPeriodicQuery(id, "select ?id where {?obs <urn:hasId> ?id.}", new VariableOrder("periodicBinId", "id"));
+        periodicStorage.addPeriodicQueryResults(id, storageResults);
+
+        NotificationProcessorExecutor processor = new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, 1);
+        processor.start();
+        
+        notifications.add(ts1);
+        notifications.add(ts2);
+
+        Thread.sleep(5000);
+        
+        Assert.assertEquals(expectedBins.size(), bins.size());
+        Assert.assertEquals(true, bins.containsAll(expectedBins));
+        
+        Set<BindingSet> actual = new HashSet<>();
+        bindingSets.forEach(x -> actual.add(x.getBindingSet()));
+        Assert.assertEquals(expected, actual);
+        
+        processor.stop();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
new file mode 100644
index 0000000..27acc9c
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.xml.datatype.DatatypeFactory;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.recipes.test.FluoITHelper;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.apache.rya.periodic.notification.api.CreatePeriodicQuery;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Sets;
+
+public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
+
+    
+    @Test
+    public void periodicPrunerTest() throws Exception {
+
+        String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?id (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id } group by ?id"; // n
+
+        FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration());
+
+        // initialize resources and create pcj
+        PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
+                getRyaInstanceName());
+        CreatePeriodicQuery createPeriodicQuery = new CreatePeriodicQuery(fluo, periodicStorage);
+        PeriodicNotification notification = createPeriodicQuery.createPeriodicQuery(sparql);
+        String queryId = notification.getId();
+
+        // create statements to ingest into Fluo
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        ZonedDateTime time = ZonedDateTime.now();
+        long currentTime = time.toInstant().toEpochMilli();
+
+        ZonedDateTime zTime1 = time.minusMinutes(30);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusMinutes(30);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusMinutes(30);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime4 = zTime3.minusMinutes(30);
+        String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")));
+
+        // add statements to Fluo
+        InsertTriples inserter = new InsertTriples();
+        statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x)));
+
+        super.getMiniFluo().waitForObservers();
+
+        // FluoITHelper.printFluoTable(fluo);
+
+        // Create the expected results of the SPARQL query once the PCJ has been
+        // computed.
+        final Set<BindingSet> expected1 = new HashSet<>();
+        final Set<BindingSet> expected2 = new HashSet<>();
+        final Set<BindingSet> expected3 = new HashSet<>();
+        final Set<BindingSet> expected4 = new HashSet<>();
+
+        long period = 1800000;
+        long binId = (currentTime / period) * period;
+
+        long bin1 = binId;
+        long bin2 = binId + period;
+        long bin3 = binId + 2 * period;
+        long bin4 = binId + 3 * period;
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+        expected1.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+        expected1.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+        expected1.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+        expected1.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin2));
+        expected2.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin2));
+        expected2.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin2));
+        expected2.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin3));
+        expected3.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin3));
+        expected3.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin4));
+        expected4.add(bs);
+
+        // make sure that expected and actual results align after ingest
+        compareResults(periodicStorage, queryId, bin1, expected1);
+        compareResults(periodicStorage, queryId, bin2, expected2);
+        compareResults(periodicStorage, queryId, bin3, expected3);
+        compareResults(periodicStorage, queryId, bin4, expected4);
+
+        BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
+        PeriodicQueryPrunerExecutor pruner = new PeriodicQueryPrunerExecutor(periodicStorage, fluo, 1, bins);
+        pruner.start();
+
+        bins.add(new NodeBin(queryId, bin1));
+        bins.add(new NodeBin(queryId, bin2));
+        bins.add(new NodeBin(queryId, bin3));
+        bins.add(new NodeBin(queryId, bin4));
+
+        Thread.sleep(10000);
+
+        compareResults(periodicStorage, queryId, bin1, new HashSet<>());
+        compareResults(periodicStorage, queryId, bin2, new HashSet<>());
+        compareResults(periodicStorage, queryId, bin3, new HashSet<>());
+        compareResults(periodicStorage, queryId, bin4, new HashSet<>());
+
+        compareFluoCounts(fluo, queryId, bin1);
+        compareFluoCounts(fluo, queryId, bin2);
+        compareFluoCounts(fluo, queryId, bin3);
+        compareFluoCounts(fluo, queryId, bin4);
+
+        pruner.stop();
+
+    }
+    
+    private void compareResults(PeriodicQueryResultStorage periodicStorage, String queryId, long bin, Set<BindingSet> expected) throws PeriodicQueryStorageException, Exception {
+        try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(bin))) {
+            Set<BindingSet> actual = new HashSet<>();
+            while(iter.hasNext()) {
+                actual.add(iter.next());
+            }
+            Assert.assertEquals(expected, actual);
+        }
+    }
+    
+    private void compareFluoCounts(FluoClient client, String queryId, long bin) {
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG));
+        
+        VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID);
+        
+        try(Snapshot sx = client.newSnapshot()) {
+            String fluoQueryId = sx.get(Bytes.of(queryId), FluoQueryColumns.PCJ_ID_QUERY_ID).toString();
+            Set<String> ids = new HashSet<>();
+            PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, ids);
+            for(String id: ids) {
+                NodeType optNode = NodeType.fromNodeId(id).orNull();
+                if(optNode == null) throw new RuntimeException("Invalid NodeType.");
+                Bytes prefix = RowKeyUtil.makeRowKey(id,varOrder, bs);
+                RowScanner scanner = sx.scanner().fetch(optNode.getResultColumn()).over(Span.prefix(prefix)).byRow().build();
+                int count = 0;
+                Iterator<ColumnScanner> colScannerIter = scanner.iterator();
+                while(colScannerIter.hasNext()) {
+                    ColumnScanner colScanner = colScannerIter.next();
+                    String row = colScanner.getRow().toString();
+                    Iterator<ColumnValue> values = colScanner.iterator();
+                    while(values.hasNext()) {
+                        values.next();
+                        count++;
+                    }
+                }
+                Assert.assertEquals(0, count);
+            }
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
new file mode 100644
index 0000000..bde406f
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.periodic.notification.registration.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase {
+
+    private static final String topic = "topic";
+    private KafkaNotificationRegistrationClient registration;
+    private PeriodicNotificationCoordinatorExecutor coord;
+    private KafkaNotificationProvider provider;
+
+    @Test
+    public void kafkaNotificationProviderTest() throws InterruptedException {
+
+        BasicConfigurator.configure();
+
+        BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+        Properties props = createKafkaConfig();
+        KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+        registration = new KafkaNotificationRegistrationClient(topic, producer);
+        coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
+        provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
+        provider.start();
+
+        registration.addNotification("1", 1, 0, TimeUnit.SECONDS);
+        Thread.sleep(4000);
+        // check that notifications are being added to the blocking queue
+        Assert.assertEquals(true, notifications.size() > 0);
+
+        registration.deleteNotification("1");
+        Thread.sleep(2000);
+        int size = notifications.size();
+        // sleep for 2 seconds to ensure no more messages being produced
+        Thread.sleep(2000);
+        Assert.assertEquals(size, notifications.size());
+        
+        tearDown();
+    }
+
+    @Test
+    public void kafkaNotificationMillisProviderTest() throws InterruptedException {
+
+        BasicConfigurator.configure();
+
+        BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+        Properties props = createKafkaConfig();
+        KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+        registration = new KafkaNotificationRegistrationClient(topic, producer);
+        coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
+        provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
+        provider.start();
+
+        registration.addNotification("1", 1000, 0, TimeUnit.MILLISECONDS);
+        Thread.sleep(4000);
+        // check that notifications are being added to the blocking queue
+        Assert.assertEquals(true, notifications.size() > 0);
+
+        registration.deleteNotification("1");
+        Thread.sleep(2000);
+        int size = notifications.size();
+        // sleep for 2 seconds to ensure no more messages being produced
+        Thread.sleep(2000);
+        Assert.assertEquals(size, notifications.size());
+        
+        tearDown();
+    }
+
+    private void tearDown() {
+        registration.close();
+        provider.stop();
+        coord.stop();
+    }
+
+    private Properties createKafkaConfig() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
+
+        return props;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties
new file mode 100644
index 0000000..4b25b93
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#/
+accumulo.auths=
+accumulo.instance="instance"
+accumulo.user="root"
+accumulo.password="secret"
+accumulo.rya.prefix="rya_"
+accumulo.zookeepers=
+fluo.app.name="fluo_app"
+fluo.table.name="fluo_table"
+kafka.bootstrap.servers=127.0.0.1:9092
+kafka.notification.topic=notifications
+kafka.notification.client.id=consumer0
+kafka.notification.group.id=group0
+cep.coordinator.threads=1
+cep.producer.threads=1
+cep.exporter.threads=1
+cep.processor.threads=1
+cep.pruner.threads=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/pom.xml b/extras/rya.periodic.service/periodic.service.notification/pom.xml
new file mode 100644
index 0000000..2173888
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/pom.xml
@@ -0,0 +1,107 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+		contributor license agreements. See the NOTICE file distributed with this 
+		work for additional information regarding copyright ownership. The ASF licenses 
+		this file to you under the Apache License, Version 2.0 (the "License"); you 
+		may not use this file except in compliance with the License. You may obtain 
+		a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+		required by applicable law or agreed to in writing, software distributed 
+		under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+		OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+		the specific language governing permissions and limitations under the License. -->
+	<parent>
+		<groupId>org.apache.rya</groupId>
+		<artifactId>rya.periodic.service</artifactId>
+		<version>3.2.11-incubating-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>rya.periodic.service.notification</artifactId>
+	
+	<name>Apache Rya Periodic Service Notification</name>
+    <description>Notifications for Rya Periodic Service</description>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.twill</groupId>
+			<artifactId>twill-api</artifactId>
+			<version>0.11.0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.twill</groupId>
+			<artifactId>twill-yarn</artifactId>
+			<version>0.11.0</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>kafka_2.10</artifactId>
+					<groupId>org.apache.kafka</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.google.code.gson</groupId>
+			<artifactId>gson</artifactId>
+			<version>2.8.0</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.fluo</groupId>
+			<artifactId>fluo-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.fluo</groupId>
+			<artifactId>fluo-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.indexing</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.openrdf.sesame</groupId>
+			<artifactId>sesame-query</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.indexing.pcj</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.pcj.fluo.app</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<encoding>UTF-8</encoding>
+					<source>1.8</source>
+					<target>1.8</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
new file mode 100644
index 0000000..571ee1c
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.openrdf.query.Binding;
+
+/**
+ * Object that cleans up old {@link BindingSet}s corresponding to the specified
+ * {@link NodeBin}. This class deletes all BindingSets with the bin 
+ * indicated by {@link NodeBin#getBin()}.  A BindingSet corresponds to a given
+ * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}
+ * and value equal to the given bin.
+ *
+ */
+public interface BinPruner {
+    
+    /**
+     * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}.
+     * @param bin - NodeBin that indicates which BindingSets to delete..
+     */
+    public void pruneBindingSetBin(NodeBin bin);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
new file mode 100644
index 0000000..500a435
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+
+/**
+ * An Object that is used to export {@link BindingSet}s to an external repository or queuing system.
+ *
+ */
+public interface BindingSetExporter {
+
+    /**
+     * This method exports the BindingSet to the external repository or queuing system
+     * that this BindingSetExporter is configured to export to.
+     * @param bindingSet - {@link BindingSet} to be exported
+     * @throws ResultExportException
+     */
+    public void exportNotification(BindingSetRecord bindingSet) throws ResultExportException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
new file mode 100644
index 0000000..7f71b52
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.Optional;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.periodic.notification.application.PeriodicNotificationApplication;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+/**
+ * Object that creates a Periodic Query.  A Periodic Query is any query
+ * requesting periodic updates about events that occurred within a given
+ * window of time of this instant. This is also known as a rolling window
+ * query.  Period Queries can be expressed using SPARQL by including the
+ * {@link Function} indicated by the URI {@link PeriodicQueryUtil#PeriodicQueryURI}
+ * in the query.  The user must provide this Function with the following arguments:
+ * the temporal variable in the query that will be filtered on, the window of time
+ * that events must occur within, the period at which the user wants to receive updates,
+ * and the time unit.  The following query requests all observations that occurred
+ * within the last minute and requests updates every 15 seconds.  It also performs
+ * a count on those observations.
+ * <li>
+ * <li> prefix function: http://org.apache.rya/function#
+ * <li>               "prefix time: http://www.w3.org/2006/time# 
+ * <li>               "select (count(?obs) as ?total) where {
+ * <li>               "Filter(function:periodic(?time, 1, .25, time:minutes))
+ * <li>               "?obs uri:hasTime ?time.
+ * <li>               "?obs uri:hasId ?id }
+ * <li>
+ * 
+ * This class is responsible for taking a Periodic Query expressed as a SPARQL query
+ * and adding to Fluo and Kafka so that it can be processed by the {@link PeriodicNotificationApplication}.
+ */
+public class CreatePeriodicQuery {
+
+    private FluoClient fluoClient;
+    private PeriodicQueryResultStorage periodicStorage;
+    Function funciton;
+    PeriodicQueryUtil util;
+    
+    
+    public CreatePeriodicQuery(FluoClient fluoClient, PeriodicQueryResultStorage periodicStorage) {
+        this.fluoClient = fluoClient;
+        this.periodicStorage = periodicStorage;
+    }
+    
+    /**
+     * Creates a Periodic Query by adding the query to Fluo and using the resulting
+     * Fluo id to create a {@link PeriodicQueryResultStorage} table.
+     * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
+     * @return PeriodicNotification that can be used to register register this query with the {@link PeriodicNotificationApplication}.
+     */
+    public PeriodicNotification createPeriodicQuery(String sparql) {
+        try {
+            Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
+            if(optNode.isPresent()) {
+                PeriodicQueryNode periodicNode = optNode.get();
+                CreatePcj createPcj = new CreatePcj();
+                String queryId = createPcj.createPcj(sparql, fluoClient);
+                periodicStorage.createPeriodicQuery(queryId, sparql);
+                PeriodicNotification notification = PeriodicNotification.builder().id(queryId).period(periodicNode.getPeriod())
+                        .timeUnit(periodicNode.getUnit()).build();
+                return notification;
+            } else {
+                throw new RuntimeException("Invalid PeriodicQuery.  Query must possess a PeriodicQuery Filter.");
+            }
+        } catch (MalformedQueryException | PeriodicQueryStorageException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    /**
+     * Creates a Periodic Query by adding the query to Fluo and using the resulting
+     * Fluo id to create a {@link PeriodicQueryResultStorage} table.  In addition, this
+     * method registers the PeriodicQuery with the PeriodicNotificationApplication to poll
+     * the PeriodicQueryResultStorage table at regular intervals and export results to Kafka.
+     * The PeriodicNotificationApp queries the result table at a regular interval indicated by the Period of
+     * the PeriodicQuery.
+     * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
+     * @param PeriodicNotificationClient - registers the PeriodicQuery with the {@link PeriodicNotificationApplication}
+     * @return id of the PeriodicQuery and PeriodicQueryResultStorage table (these are the same)
+     */
+    public String createQueryAndRegisterWithKafka(String sparql, PeriodicNotificationClient periodicClient) {
+        PeriodicNotification notification = createPeriodicQuery(sparql);
+        periodicClient.addNotification(notification);
+        return notification.getId();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
new file mode 100644
index 0000000..b1e8bad
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Interface providing basic life cycle functionality,
+ * including stopping and starting any class implementing this
+ * interface and checking whether is it running.
+ *
+ */
+public interface LifeCycle {
+
+    /**
+     * Starts a running application.
+     */
+    public void start();
+
+    /**
+     * Stops a running application.
+     */
+    public void stop();
+    
+    /**
+     * Determine if application is currently running.
+     * @return true if application is running and false otherwise.
+     */
+    public boolean currentlyRunning();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
new file mode 100644
index 0000000..3ed7979
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.Objects;
+
+/**
+ * Object used to indicate the id of a given Periodic Query
+ * along with a particular bin of results.  This Object is used
+ * by the {@link BinPruner} to clean up old query results after
+ * they have been processed.
+ *
+ */
+public class NodeBin {
+
+    private long bin;
+    private String nodeId;
+
+    public NodeBin(String nodeId, long bin) {
+        this.bin = bin;
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return id of Periodic Query
+     */
+    public String getNodeId() {
+        return nodeId;
+    }
+/**
+ * @return bin id of results for a given Periodic Query 
+ */
+    public long getBin() {
+        return bin;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other instanceof NodeBin) {
+            NodeBin bin = (NodeBin) other;
+            return this.bin == bin.bin && this.nodeId.equals(bin.nodeId);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bin, nodeId);
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Node Bin \n").append("   QueryId: " + nodeId + "\n").append("   Bin: " + bin + "\n").toString();
+    }
+
+}