You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:01:56 UTC
[3/9] incubator-rya git commit: RYA-280-Periodic Query Service.
Closes #177.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
new file mode 100644
index 0000000..cb7557c
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.apache.rya.periodic.notification.api.CreatePeriodicQuery;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+public class PeriodicNotificationApplicationIT extends RyaExportITBase {
+
+ private PeriodicNotificationApplication app;
+ private KafkaNotificationRegistrationClient registrar;
+ private KafkaProducer<String, CommandNotification> producer;
+ private Properties props;
+ private Properties kafkaProps;
+ PeriodicNotificationApplicationConfiguration conf;
+
+ private static final String ZKHOST = "127.0.0.1";
+ private static final String BROKERHOST = "127.0.0.1";
+ private static final String BROKERPORT = "9092";
+ private ZkUtils zkUtils;
+ private KafkaServer kafkaServer;
+ private EmbeddedZookeeper zkServer;
+ private ZkClient zkClient;
+
+ @Before
+ public void init() throws Exception {
+ setUpKafka();
+ props = getProps();
+ conf = new PeriodicNotificationApplicationConfiguration(props);
+ kafkaProps = getKafkaProperties(conf);
+ app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
+ producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer());
+ registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
+ }
+
+ private void setUpKafka() throws Exception {
+ // Setup Kafka.
+ zkServer = new EmbeddedZookeeper();
+ final String zkConnect = ZKHOST + ":" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Brokersparql
+ final Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+ final KafkaConfig config = new KafkaConfig(brokerProps);
+ final Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+ }
+
+ @Test
+ public void periodicApplicationWithAggAndGroupByTest() throws Exception {
+
+ String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ + "prefix time: <http://www.w3.org/2006/time#> " // n
+ + "select ?type (count(?obs) as ?total) where {" // n
+ + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+ + "?obs <uri:hasTime> ?time. " // n
+ + "?obs <uri:hasObsType> ?type } group by ?type"; // n
+
+ //make data
+ int periodMult = 15;
+ final ValueFactory vf = new ValueFactoryImpl();
+ final DatatypeFactory dtf = DatatypeFactory.newInstance();
+ //Sleep until current time aligns nicely with period to makell
+ //results more predictable
+ while(System.currentTimeMillis() % (periodMult*1000) > 500);
+ ZonedDateTime time = ZonedDateTime.now();
+
+ ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+ String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+ String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+ String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+ final Collection<Statement> statements = Sets.newHashSet(
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasObsType"), vf.createLiteral("ship")),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")),
+ vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+ vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasObsType"), vf.createLiteral("ship")),
+ vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+ vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")),
+ vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+ vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasObsType"), vf.createLiteral("automobile")));
+
+ try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
+ Connector connector = ConfigUtils.getConnector(conf);
+ PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+ CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+ String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+ addData(statements);
+ app.start();
+//
+ Multimap<Long, BindingSet> actual = HashMultimap.create();
+ try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
+ consumer.subscribe(Arrays.asList(id));
+ long end = System.currentTimeMillis() + 4*periodMult*1000;
+ long lastBinId = 0L;
+ long binId = 0L;
+ List<Long> ids = new ArrayList<>();
+ while (System.currentTimeMillis() < end) {
+ ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+ for(ConsumerRecord<String, BindingSet> record: records){
+ BindingSet result = record.value();
+ binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
+ if(lastBinId != binId) {
+ lastBinId = binId;
+ ids.add(binId);
+ }
+ actual.put(binId, result);
+ }
+ }
+
+ Map<Long, Set<BindingSet>> expected = new HashMap<>();
+
+ Set<BindingSet> expected1 = new HashSet<>();
+ QueryBindingSet bs1 = new QueryBindingSet();
+ bs1.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
+ bs1.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
+ bs1.addBinding("type", vf.createLiteral("airplane"));
+
+ QueryBindingSet bs2 = new QueryBindingSet();
+ bs2.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
+ bs2.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
+ bs2.addBinding("type", vf.createLiteral("ship"));
+
+ QueryBindingSet bs3 = new QueryBindingSet();
+ bs3.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
+ bs3.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
+ bs3.addBinding("type", vf.createLiteral("automobile"));
+
+ expected1.add(bs1);
+ expected1.add(bs2);
+ expected1.add(bs3);
+
+ Set<BindingSet> expected2 = new HashSet<>();
+ QueryBindingSet bs4 = new QueryBindingSet();
+ bs4.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1)));
+ bs4.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
+ bs4.addBinding("type", vf.createLiteral("airplane"));
+
+ QueryBindingSet bs5 = new QueryBindingSet();
+ bs5.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1)));
+ bs5.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
+ bs5.addBinding("type", vf.createLiteral("ship"));
+
+ expected2.add(bs4);
+ expected2.add(bs5);
+
+ Set<BindingSet> expected3 = new HashSet<>();
+ QueryBindingSet bs6 = new QueryBindingSet();
+ bs6.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2)));
+ bs6.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
+ bs6.addBinding("type", vf.createLiteral("ship"));
+
+ QueryBindingSet bs7 = new QueryBindingSet();
+ bs7.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2)));
+ bs7.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
+ bs7.addBinding("type", vf.createLiteral("airplane"));
+
+ expected3.add(bs6);
+ expected3.add(bs7);
+
+ expected.put(ids.get(0), expected1);
+ expected.put(ids.get(1), expected2);
+ expected.put(ids.get(2), expected3);
+
+ Assert.assertEquals(3, actual.asMap().size());
+ for(Long ident: ids) {
+ Assert.assertEquals(expected.get(ident), actual.get(ident));
+ }
+ }
+
+ Set<BindingSet> expectedResults = new HashSet<>();
+ try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
+ results.forEachRemaining(x -> expectedResults.add(x));
+ Assert.assertEquals(0, expectedResults.size());
+ }
+ }
+ }
+
+
+ @Test
+ public void periodicApplicationWithAggTest() throws Exception {
+
+ String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ + "prefix time: <http://www.w3.org/2006/time#> " // n
+ + "select (count(?obs) as ?total) where {" // n
+ + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+ + "?obs <uri:hasTime> ?time. " // n
+ + "?obs <uri:hasId> ?id } "; // n
+
+ //make data
+ int periodMult = 15;
+ final ValueFactory vf = new ValueFactoryImpl();
+ final DatatypeFactory dtf = DatatypeFactory.newInstance();
+ //Sleep until current time aligns nicely with period to make
+ //results more predictable
+ while(System.currentTimeMillis() % (periodMult*1000) > 500);
+ ZonedDateTime time = ZonedDateTime.now();
+
+ ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+ String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+ String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+ String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+ final Collection<Statement> statements = Sets.newHashSet(
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+ vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+ vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
+
+ try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
+ Connector connector = ConfigUtils.getConnector(conf);
+ PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+ CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+ String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+ addData(statements);
+ app.start();
+//
+ Multimap<Long, BindingSet> expected = HashMultimap.create();
+ try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
+ consumer.subscribe(Arrays.asList(id));
+ long end = System.currentTimeMillis() + 4*periodMult*1000;
+ long lastBinId = 0L;
+ long binId = 0L;
+ List<Long> ids = new ArrayList<>();
+ while (System.currentTimeMillis() < end) {
+ ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+ for(ConsumerRecord<String, BindingSet> record: records){
+ BindingSet result = record.value();
+ binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
+ if(lastBinId != binId) {
+ lastBinId = binId;
+ ids.add(binId);
+ }
+ expected.put(binId, result);
+ }
+ }
+
+ Assert.assertEquals(3, expected.asMap().size());
+ int i = 0;
+ for(Long ident: ids) {
+ Assert.assertEquals(1, expected.get(ident).size());
+ BindingSet bs = expected.get(ident).iterator().next();
+ Value val = bs.getValue("total");
+ int total = Integer.parseInt(val.stringValue());
+ Assert.assertEquals(3-i, total);
+ i++;
+ }
+ }
+
+
+ Set<BindingSet> expectedResults = new HashSet<>();
+ try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
+ results.forEachRemaining(x -> expectedResults.add(x));
+ Assert.assertEquals(0, expectedResults.size());
+ }
+ }
+
+ }
+
+
+ @Test
+ public void periodicApplicationTest() throws Exception {
+
+ String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ + "prefix time: <http://www.w3.org/2006/time#> " // n
+ + "select ?obs ?id where {" // n
+ + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+ + "?obs <uri:hasTime> ?time. " // n
+ + "?obs <uri:hasId> ?id } "; // n
+
+ //make data
+ int periodMult = 15;
+ final ValueFactory vf = new ValueFactoryImpl();
+ final DatatypeFactory dtf = DatatypeFactory.newInstance();
+ //Sleep until current time aligns nicely with period to make
+ //results more predictable
+ while(System.currentTimeMillis() % (periodMult*1000) > 500);
+ ZonedDateTime time = ZonedDateTime.now();
+
+ ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+ String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+ String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+ String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+ final Collection<Statement> statements = Sets.newHashSet(
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+ vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+ vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
+
+ try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
+ Connector connector = ConfigUtils.getConnector(conf);
+ PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+ CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+ String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+ addData(statements);
+ app.start();
+//
+ Multimap<Long, BindingSet> expected = HashMultimap.create();
+ try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
+ consumer.subscribe(Arrays.asList(id));
+ long end = System.currentTimeMillis() + 4*periodMult*1000;
+ long lastBinId = 0L;
+ long binId = 0L;
+ List<Long> ids = new ArrayList<>();
+ while (System.currentTimeMillis() < end) {
+ ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+ for(ConsumerRecord<String, BindingSet> record: records){
+ BindingSet result = record.value();
+ binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
+ if(lastBinId != binId) {
+ lastBinId = binId;
+ ids.add(binId);
+ }
+ expected.put(binId, result);
+ }
+ }
+
+ Assert.assertEquals(3, expected.asMap().size());
+ int i = 0;
+ for(Long ident: ids) {
+ Assert.assertEquals(3-i, expected.get(ident).size());
+ i++;
+ }
+ }
+
+
+ Set<BindingSet> expectedResults = new HashSet<>();
+ try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
+ results.forEachRemaining(x -> expectedResults.add(x));
+ Assert.assertEquals(0, expectedResults.size());
+ }
+ }
+
+ }
+
+
+ @After
+ public void shutdown() {
+ registrar.close();
+ app.stop();
+ teardownKafka();
+ }
+
+ private void teardownKafka() {
+ kafkaServer.shutdown();
+ zkClient.close();
+ zkServer.shutdown();
+ }
+
+ private void addData(Collection<Statement> statements) throws DatatypeConfigurationException {
+ // add statements to Fluo
+ try (FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
+ InsertTriples inserter = new InsertTriples();
+ statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x)));
+ getMiniFluo().waitForObservers();
+// FluoITHelper.printFluoTable(fluo);
+ }
+
+ }
+
+ private Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) {
+ Properties kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
+ kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId());
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
+ kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return kafkaProps;
+ }
+
+
+ private Properties getProps() throws IOException {
+
+ Properties props = new Properties();
+ try(InputStream in = new FileInputStream("src/test/resources/notification.properties")) {
+ props.load(in);
+ }
+
+ FluoConfiguration fluoConf = getFluoConfiguration();
+ props.setProperty("accumulo.user", getUsername());
+ props.setProperty("accumulo.password", getPassword());
+ props.setProperty("accumulo.instance", getMiniAccumuloCluster().getInstanceName());
+ props.setProperty("accumulo.zookeepers", getMiniAccumuloCluster().getZooKeepers());
+ props.setProperty("accumulo.rya.prefix", getRyaInstanceName());
+ props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_APP_NAME, fluoConf.getApplicationName());
+ props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_TABLE_NAME, fluoConf.getAccumuloTable());
+ return props;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
new file mode 100644
index 0000000..1902248
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
+import org.junit.Test;
+import org.openrdf.query.MalformedQueryException;
+
+import org.junit.Assert;
+
+public class PeriodicNotificationProviderIT extends AccumuloExportITBase {
+
+ @Test
+ public void testProvider() throws MalformedQueryException, InterruptedException {
+
+ String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ + "prefix time: <http://www.w3.org/2006/time#> " // n
+ + "select ?id (count(?obs) as ?total) where {" // n
+ + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+ + "?obs <uri:hasTime> ?time. " // n
+ + "?obs <uri:hasId> ?id } group by ?id"; // n
+
+ BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+ PeriodicNotificationCoordinatorExecutor coord = new PeriodicNotificationCoordinatorExecutor(2, notifications);
+ PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
+ CreatePcj pcj = new CreatePcj();
+
+ String id = null;
+ try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
+ id = pcj.createPcj(sparql, fluo);
+ provider.processRegisteredNotifications(coord, fluo.newSnapshot());
+ }
+
+ TimestampedNotification notification = notifications.take();
+ Assert.assertEquals(5000, notification.getInitialDelay());
+ Assert.assertEquals(15000, notification.getPeriod());
+ Assert.assertEquals(TimeUnit.MILLISECONDS, notification.getTimeUnit());
+ Assert.assertEquals(id, notification.getId());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
new file mode 100644
index 0000000..c0efc4f
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.exporter;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.kafka.base.KafkaITBase;
+import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class PeriodicNotificationExporterIT extends KafkaITBase {
+
+ private static final ValueFactory vf = new ValueFactoryImpl();
+
+ @Test
+ public void testExporter() throws InterruptedException {
+
+ BlockingQueue<BindingSetRecord> records = new LinkedBlockingQueue<>();
+ Properties props = createKafkaConfig();
+
+ KafkaExporterExecutor exporter = new KafkaExporterExecutor(new KafkaProducer<String, BindingSet>(props), 1, records);
+ exporter.start();
+
+ QueryBindingSet bs1 = new QueryBindingSet();
+ bs1.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(1L));
+ bs1.addBinding("name", vf.createURI("uri:Bob"));
+ BindingSetRecord record1 = new BindingSetRecord(bs1, "topic1");
+
+ QueryBindingSet bs2 = new QueryBindingSet();
+ bs2.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(2L));
+ bs2.addBinding("name", vf.createURI("uri:Joe"));
+ BindingSetRecord record2 = new BindingSetRecord(bs2, "topic2");
+
+ records.add(record1);
+ records.add(record2);
+
+ Set<BindingSet> expected1 = new HashSet<>();
+ expected1.add(bs1);
+ Set<BindingSet> expected2 = new HashSet<>();
+ expected2.add(bs2);
+
+ Set<BindingSet> actual1 = getBindingSetsFromKafka("topic1");
+ Set<BindingSet> actual2 = getBindingSetsFromKafka("topic2");
+
+ Assert.assertEquals(expected1, actual1);
+ Assert.assertEquals(expected2, actual2);
+
+ exporter.stop();
+
+ }
+
+
+ private Properties createKafkaConfig() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName());
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName());
+
+ return props;
+ }
+
+
+ private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final String TopicName) {
+ // setup consumer
+ final Properties consumerProps = createKafkaConfig();
+ final KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Arrays.asList(TopicName));
+ return consumer;
+ }
+
+ private Set<BindingSet> getBindingSetsFromKafka(String topic) {
+ KafkaConsumer<String, BindingSet> consumer = null;
+
+ try {
+ consumer = makeBindingSetConsumer(topic);
+ ConsumerRecords<String, BindingSet> records = consumer.poll(5000);
+
+ Set<BindingSet> bindingSets = new HashSet<>();
+ records.forEach(x -> bindingSets.add(x.value()));
+
+ return bindingSets;
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
new file mode 100644
index 0000000..fa60e48
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.processor;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class PeriodicNotificationProcessorIT extends AccumuloExportITBase {
+
+ private static final ValueFactory vf = new ValueFactoryImpl();
+ private static final String RYA_INSTANCE_NAME = "rya_";
+
+ @Test
+ public void periodicProcessorTest() throws Exception {
+
+ String id = UUID.randomUUID().toString().replace("-", "");
+ BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+ BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
+ BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();
+
+ TimestampedNotification ts1 = new TimestampedNotification(
+ PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build());
+ long binId1 = (ts1.getTimestamp().getTime()/ts1.getPeriod())*ts1.getPeriod();
+
+ Thread.sleep(2000);
+
+ TimestampedNotification ts2 = new TimestampedNotification(
+ PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build());
+ long binId2 = (ts2.getTimestamp().getTime()/ts2.getPeriod())*ts2.getPeriod();
+
+ Set<NodeBin> expectedBins = new HashSet<>();
+ expectedBins.add(new NodeBin(id, binId1));
+ expectedBins.add(new NodeBin(id, binId2));
+
+ Set<BindingSet> expected = new HashSet<>();
+ Set<VisibilityBindingSet> storageResults = new HashSet<>();
+
+ QueryBindingSet bs1 = new QueryBindingSet();
+ bs1.addBinding("periodicBinId", vf.createLiteral(binId1));
+ bs1.addBinding("id", vf.createLiteral(1));
+ expected.add(bs1);
+ storageResults.add(new VisibilityBindingSet(bs1));
+
+ QueryBindingSet bs2 = new QueryBindingSet();
+ bs2.addBinding("periodicBinId", vf.createLiteral(binId1));
+ bs2.addBinding("id", vf.createLiteral(2));
+ expected.add(bs2);
+ storageResults.add(new VisibilityBindingSet(bs2));
+
+ QueryBindingSet bs3 = new QueryBindingSet();
+ bs3.addBinding("periodicBinId", vf.createLiteral(binId2));
+ bs3.addBinding("id", vf.createLiteral(3));
+ expected.add(bs3);
+ storageResults.add(new VisibilityBindingSet(bs3));
+
+ QueryBindingSet bs4 = new QueryBindingSet();
+ bs4.addBinding("periodicBinId", vf.createLiteral(binId2));
+ bs4.addBinding("id", vf.createLiteral(4));
+ expected.add(bs4);
+ storageResults.add(new VisibilityBindingSet(bs4));
+
+ PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
+ RYA_INSTANCE_NAME);
+ periodicStorage.createPeriodicQuery(id, "select ?id where {?obs <urn:hasId> ?id.}", new VariableOrder("periodicBinId", "id"));
+ periodicStorage.addPeriodicQueryResults(id, storageResults);
+
+ NotificationProcessorExecutor processor = new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, 1);
+ processor.start();
+
+ notifications.add(ts1);
+ notifications.add(ts2);
+
+ Thread.sleep(5000);
+
+ Assert.assertEquals(expectedBins.size(), bins.size());
+ Assert.assertEquals(true, bins.containsAll(expectedBins));
+
+ Set<BindingSet> actual = new HashSet<>();
+ bindingSets.forEach(x -> actual.add(x.getBindingSet()));
+ Assert.assertEquals(expected, actual);
+
+ processor.stop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
new file mode 100644
index 0000000..27acc9c
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.xml.datatype.DatatypeFactory;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.recipes.test.FluoITHelper;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.apache.rya.periodic.notification.api.CreatePeriodicQuery;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Sets;
+
+public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
+
+
+ @Test
+ public void periodicPrunerTest() throws Exception {
+
+ String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ + "prefix time: <http://www.w3.org/2006/time#> " // n
+ + "select ?id (count(?obs) as ?total) where {" // n
+ + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+ + "?obs <uri:hasTime> ?time. " // n
+ + "?obs <uri:hasId> ?id } group by ?id"; // n
+
+ FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration());
+
+ // initialize resources and create pcj
+ PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
+ getRyaInstanceName());
+ CreatePeriodicQuery createPeriodicQuery = new CreatePeriodicQuery(fluo, periodicStorage);
+ PeriodicNotification notification = createPeriodicQuery.createPeriodicQuery(sparql);
+ String queryId = notification.getId();
+
+ // create statements to ingest into Fluo
+ final ValueFactory vf = new ValueFactoryImpl();
+ final DatatypeFactory dtf = DatatypeFactory.newInstance();
+ ZonedDateTime time = ZonedDateTime.now();
+ long currentTime = time.toInstant().toEpochMilli();
+
+ ZonedDateTime zTime1 = time.minusMinutes(30);
+ String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime2 = zTime1.minusMinutes(30);
+ String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime3 = zTime2.minusMinutes(30);
+ String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime4 = zTime3.minusMinutes(30);
+ String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
+
+ final Collection<Statement> statements = Sets.newHashSet(
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+ vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+ vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
+ vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+ vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")),
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")));
+
+ // add statements to Fluo
+ InsertTriples inserter = new InsertTriples();
+ statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x)));
+
+ super.getMiniFluo().waitForObservers();
+
+ // FluoITHelper.printFluoTable(fluo);
+
+ // Create the expected results of the SPARQL query once the PCJ has been
+ // computed.
+ final Set<BindingSet> expected1 = new HashSet<>();
+ final Set<BindingSet> expected2 = new HashSet<>();
+ final Set<BindingSet> expected3 = new HashSet<>();
+ final Set<BindingSet> expected4 = new HashSet<>();
+
+ long period = 1800000;
+ long binId = (currentTime / period) * period;
+
+ long bin1 = binId;
+ long bin2 = binId + period;
+ long bin3 = binId + 2 * period;
+ long bin4 = binId + 3 * period;
+
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+ bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+ bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+ expected1.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+ bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+ bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+ expected1.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+ bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+ bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+ expected1.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+ bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
+ bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+ expected1.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+ bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+ bs.addBinding("periodicBinId", vf.createLiteral(bin2));
+ expected2.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+ bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+ bs.addBinding("periodicBinId", vf.createLiteral(bin2));
+ expected2.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+ bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+ bs.addBinding("periodicBinId", vf.createLiteral(bin2));
+ expected2.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+ bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+ bs.addBinding("periodicBinId", vf.createLiteral(bin3));
+ expected3.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+ bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+ bs.addBinding("periodicBinId", vf.createLiteral(bin3));
+ expected3.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+ bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+ bs.addBinding("periodicBinId", vf.createLiteral(bin4));
+ expected4.add(bs);
+
+ // make sure that expected and actual results align after ingest
+ compareResults(periodicStorage, queryId, bin1, expected1);
+ compareResults(periodicStorage, queryId, bin2, expected2);
+ compareResults(periodicStorage, queryId, bin3, expected3);
+ compareResults(periodicStorage, queryId, bin4, expected4);
+
+ BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
+ PeriodicQueryPrunerExecutor pruner = new PeriodicQueryPrunerExecutor(periodicStorage, fluo, 1, bins);
+ pruner.start();
+
+ bins.add(new NodeBin(queryId, bin1));
+ bins.add(new NodeBin(queryId, bin2));
+ bins.add(new NodeBin(queryId, bin3));
+ bins.add(new NodeBin(queryId, bin4));
+
+ Thread.sleep(10000);
+
+ compareResults(periodicStorage, queryId, bin1, new HashSet<>());
+ compareResults(periodicStorage, queryId, bin2, new HashSet<>());
+ compareResults(periodicStorage, queryId, bin3, new HashSet<>());
+ compareResults(periodicStorage, queryId, bin4, new HashSet<>());
+
+ compareFluoCounts(fluo, queryId, bin1);
+ compareFluoCounts(fluo, queryId, bin2);
+ compareFluoCounts(fluo, queryId, bin3);
+ compareFluoCounts(fluo, queryId, bin4);
+
+ pruner.stop();
+
+ }
+
+ private void compareResults(PeriodicQueryResultStorage periodicStorage, String queryId, long bin, Set<BindingSet> expected) throws PeriodicQueryStorageException, Exception {
+ try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(bin))) {
+ Set<BindingSet> actual = new HashSet<>();
+ while(iter.hasNext()) {
+ actual.add(iter.next());
+ }
+ Assert.assertEquals(expected, actual);
+ }
+ }
+
+ private void compareFluoCounts(FluoClient client, String queryId, long bin) {
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG));
+
+ VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID);
+
+ try(Snapshot sx = client.newSnapshot()) {
+ String fluoQueryId = sx.get(Bytes.of(queryId), FluoQueryColumns.PCJ_ID_QUERY_ID).toString();
+ Set<String> ids = new HashSet<>();
+ PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, ids);
+ for(String id: ids) {
+ NodeType optNode = NodeType.fromNodeId(id).orNull();
+ if(optNode == null) throw new RuntimeException("Invalid NodeType.");
+ Bytes prefix = RowKeyUtil.makeRowKey(id,varOrder, bs);
+ RowScanner scanner = sx.scanner().fetch(optNode.getResultColumn()).over(Span.prefix(prefix)).byRow().build();
+ int count = 0;
+ Iterator<ColumnScanner> colScannerIter = scanner.iterator();
+ while(colScannerIter.hasNext()) {
+ ColumnScanner colScanner = colScannerIter.next();
+ String row = colScanner.getRow().toString();
+ Iterator<ColumnValue> values = colScanner.iterator();
+ while(values.hasNext()) {
+ values.next();
+ count++;
+ }
+ }
+ Assert.assertEquals(0, count);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
new file mode 100644
index 0000000..bde406f
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.periodic.notification.registration.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase {
+
+ private static final String topic = "topic";
+ private KafkaNotificationRegistrationClient registration;
+ private PeriodicNotificationCoordinatorExecutor coord;
+ private KafkaNotificationProvider provider;
+
+ @Test
+ public void kafkaNotificationProviderTest() throws InterruptedException {
+
+ BasicConfigurator.configure();
+
+ BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+ Properties props = createKafkaConfig();
+ KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+ registration = new KafkaNotificationRegistrationClient(topic, producer);
+ coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
+ provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
+ provider.start();
+
+ registration.addNotification("1", 1, 0, TimeUnit.SECONDS);
+ Thread.sleep(4000);
+ // check that notifications are being added to the blocking queue
+ Assert.assertEquals(true, notifications.size() > 0);
+
+ registration.deleteNotification("1");
+ Thread.sleep(2000);
+ int size = notifications.size();
+ // sleep for 2 seconds to ensure no more messages being produced
+ Thread.sleep(2000);
+ Assert.assertEquals(size, notifications.size());
+
+ tearDown();
+ }
+
+ @Test
+ public void kafkaNotificationMillisProviderTest() throws InterruptedException {
+
+ BasicConfigurator.configure();
+
+ BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+ Properties props = createKafkaConfig();
+ KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+ registration = new KafkaNotificationRegistrationClient(topic, producer);
+ coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
+ provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
+ provider.start();
+
+ registration.addNotification("1", 1000, 0, TimeUnit.MILLISECONDS);
+ Thread.sleep(4000);
+ // check that notifications are being added to the blocking queue
+ Assert.assertEquals(true, notifications.size() > 0);
+
+ registration.deleteNotification("1");
+ Thread.sleep(2000);
+ int size = notifications.size();
+ // sleep for 2 seconds to ensure no more messages being produced
+ Thread.sleep(2000);
+ Assert.assertEquals(size, notifications.size());
+
+ tearDown();
+ }
+
+ private void tearDown() {
+ registration.close();
+ provider.stop();
+ coord.stop();
+ }
+
+ private Properties createKafkaConfig() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
+
+ return props;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties
new file mode 100644
index 0000000..4b25b93
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#/
+accumulo.auths=
+accumulo.instance="instance"
+accumulo.user="root"
+accumulo.password="secret"
+accumulo.rya.prefix="rya_"
+accumulo.zookeepers=
+fluo.app.name="fluo_app"
+fluo.table.name="fluo_table"
+kafka.bootstrap.servers=127.0.0.1:9092
+kafka.notification.topic=notifications
+kafka.notification.client.id=consumer0
+kafka.notification.group.id=group0
+cep.coordinator.threads=1
+cep.producer.threads=1
+cep.exporter.threads=1
+cep.processor.threads=1
+cep.pruner.threads=1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/pom.xml b/extras/rya.periodic.service/periodic.service.notification/pom.xml
new file mode 100644
index 0000000..2173888
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/pom.xml
@@ -0,0 +1,107 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <!-- Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with this
+ work for additional information regarding copyright ownership. The ASF licenses
+ this file to you under the Apache License, Version 2.0 (the "License"); you
+ may not use this file except in compliance with the License. You may obtain
+ a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+ required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.service</artifactId>
+ <version>3.2.11-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.periodic.service.notification</artifactId>
+
+ <name>Apache Rya Periodic Service Notification</name>
+ <description>Notifications for Rya Periodic Service</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.twill</groupId>
+ <artifactId>twill-api</artifactId>
+ <version>0.11.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.twill</groupId>
+ <artifactId>twill-yarn</artifactId>
+ <version>0.11.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>kafka_2.10</artifactId>
+ <groupId>org.apache.kafka</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-query</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing.pcj</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.pcj.fluo.app</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
new file mode 100644
index 0000000..571ee1c
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.openrdf.query.Binding;
+
+/**
+ * Object that cleans up old {@link BindingSet}s corresponding to the specified
+ * {@link NodeBin}. This class deletes all BindingSets with the bin
+ * indicated by {@link NodeBin#getBin()}. A BindingSet corresponds to a given
+ * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}
+ * and value equal to the given bin.
+ *
+ */
+public interface BinPruner {
+
+ /**
+ * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}.
+ * @param bin - NodeBin that indicates which BindingSets to delete..
+ */
+ public void pruneBindingSetBin(NodeBin bin);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
new file mode 100644
index 0000000..500a435
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+
+/**
+ * An Object that is used to export {@link BindingSet}s to an external repository or queuing system.
+ *
+ */
+public interface BindingSetExporter {
+
+ /**
+ * This method exports the BindingSet to the external repository or queuing system
+ * that this BindingSetExporter is configured to export to.
+ * @param bindingSet - {@link BindingSet} to be exported
+ * @throws ResultExportException
+ */
+ public void exportNotification(BindingSetRecord bindingSet) throws ResultExportException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
new file mode 100644
index 0000000..7f71b52
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.Optional;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.periodic.notification.application.PeriodicNotificationApplication;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+/**
+ * Object that creates a Periodic Query. A Periodic Query is any query
+ * requesting periodic updates about events that occurred within a given
+ * window of time of this instant. This is also known as a rolling window
+ * query. Period Queries can be expressed using SPARQL by including the
+ * {@link Function} indicated by the URI {@link PeriodicQueryUtil#PeriodicQueryURI}
+ * in the query. The user must provide this Function with the following arguments:
+ * the temporal variable in the query that will be filtered on, the window of time
+ * that events must occur within, the period at which the user wants to receive updates,
+ * and the time unit. The following query requests all observations that occurred
+ * within the last minute and requests updates every 15 seconds. It also performs
+ * a count on those observations.
+ * <li>
+ * <li> prefix function: http://org.apache.rya/function#
+ * <li> "prefix time: http://www.w3.org/2006/time#
+ * <li> "select (count(?obs) as ?total) where {
+ * <li> "Filter(function:periodic(?time, 1, .25, time:minutes))
+ * <li> "?obs uri:hasTime ?time.
+ * <li> "?obs uri:hasId ?id }
+ * <li>
+ *
+ * This class is responsible for taking a Periodic Query expressed as a SPARQL query
+ * and adding to Fluo and Kafka so that it can be processed by the {@link PeriodicNotificationApplication}.
+ */
+public class CreatePeriodicQuery {
+
+ private FluoClient fluoClient;
+ private PeriodicQueryResultStorage periodicStorage;
+ Function funciton;
+ PeriodicQueryUtil util;
+
+
+ public CreatePeriodicQuery(FluoClient fluoClient, PeriodicQueryResultStorage periodicStorage) {
+ this.fluoClient = fluoClient;
+ this.periodicStorage = periodicStorage;
+ }
+
+ /**
+ * Creates a Periodic Query by adding the query to Fluo and using the resulting
+ * Fluo id to create a {@link PeriodicQueryResultStorage} table.
+ * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
+ * @return PeriodicNotification that can be used to register register this query with the {@link PeriodicNotificationApplication}.
+ */
+ public PeriodicNotification createPeriodicQuery(String sparql) {
+ try {
+ Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
+ if(optNode.isPresent()) {
+ PeriodicQueryNode periodicNode = optNode.get();
+ CreatePcj createPcj = new CreatePcj();
+ String queryId = createPcj.createPcj(sparql, fluoClient);
+ periodicStorage.createPeriodicQuery(queryId, sparql);
+ PeriodicNotification notification = PeriodicNotification.builder().id(queryId).period(periodicNode.getPeriod())
+ .timeUnit(periodicNode.getUnit()).build();
+ return notification;
+ } else {
+ throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter.");
+ }
+ } catch (MalformedQueryException | PeriodicQueryStorageException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Creates a Periodic Query by adding the query to Fluo and using the resulting
+ * Fluo id to create a {@link PeriodicQueryResultStorage} table. In addition, this
+ * method registers the PeriodicQuery with the PeriodicNotificationApplication to poll
+ * the PeriodicQueryResultStorage table at regular intervals and export results to Kafka.
+ * The PeriodicNotificationApp queries the result table at a regular interval indicated by the Period of
+ * the PeriodicQuery.
+ * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
+ * @param PeriodicNotificationClient - registers the PeriodicQuery with the {@link PeriodicNotificationApplication}
+ * @return id of the PeriodicQuery and PeriodicQueryResultStorage table (these are the same)
+ */
+ public String createQueryAndRegisterWithKafka(String sparql, PeriodicNotificationClient periodicClient) {
+ PeriodicNotification notification = createPeriodicQuery(sparql);
+ periodicClient.addNotification(notification);
+ return notification.getId();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
new file mode 100644
index 0000000..b1e8bad
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Interface providing basic life cycle functionality,
+ * including stopping and starting any class implementing this
+ * interface and checking whether is it running.
+ *
+ */
+public interface LifeCycle {
+
+ /**
+ * Starts a running application.
+ */
+ public void start();
+
+ /**
+ * Stops a running application.
+ */
+ public void stop();
+
+ /**
+ * Determine if application is currently running.
+ * @return true if application is running and false otherwise.
+ */
+ public boolean currentlyRunning();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
new file mode 100644
index 0000000..3ed7979
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.Objects;
+
+/**
+ * Object used to indicate the id of a given Periodic Query
+ * along with a particular bin of results. This Object is used
+ * by the {@link BinPruner} to clean up old query results after
+ * they have been processed.
+ *
+ */
+public class NodeBin {
+
+ private long bin;
+ private String nodeId;
+
+ public NodeBin(String nodeId, long bin) {
+ this.bin = bin;
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return id of Periodic Query
+ */
+ public String getNodeId() {
+ return nodeId;
+ }
+/**
+ * @return bin id of results for a given Periodic Query
+ */
+ public long getBin() {
+ return bin;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other instanceof NodeBin) {
+ NodeBin bin = (NodeBin) other;
+ return this.bin == bin.bin && this.nodeId.equals(bin.nodeId);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bin, nodeId);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("Node Bin \n").append(" QueryId: " + nodeId + "\n").append(" Bin: " + bin + "\n").toString();
+ }
+
+}