You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/09/04 13:14:07 UTC
[3/4] incubator-rya git commit: RYA-319-Integration of Periodic Query
with CLI. Closes #220.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java
new file mode 100644
index 0000000..9724704
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.indexing.pcj.fluo.api.ListFluoQueries.FluoQueryStringBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+
+public class ListFluoQueriesIT extends RyaExportITBase {
+
+ @Test
+ public void queryMetadataTest() throws Exception {
+ final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ String sparql1 = "select ?x ?y ?z where {?x <uri:p1> ?y; <uri:p2> 'literal1'. ?y <uri:p3> ?z }";
+ String sparql2 = "select ?x ?y ?z where {{select ?x ?y ?z {?x <uri:p1> ?y; <uri:p2> ?z. ?y <uri:p3> ?z }}}";
+
+ // Create the object that will be serialized.
+ String queryId1 = NodeType.generateNewFluoIdForType(NodeType.QUERY);
+ final QueryMetadata.Builder builder = QueryMetadata.builder(queryId1);
+ builder.setQueryType(QueryType.PROJECTION);
+ builder.setVarOrder(new VariableOrder("y;s;d"));
+ builder.setSparql(sparql1);
+ builder.setChildNodeId("childNodeId");
+ builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.KAFKA)));
+ final QueryMetadata meta1 = builder.build();
+
+ String queryId2 = NodeType.generateNewFluoIdForType(NodeType.QUERY);
+ final QueryMetadata.Builder builder2 = QueryMetadata.builder(queryId2);
+ builder2.setQueryType(QueryType.PROJECTION);
+ builder2.setVarOrder(new VariableOrder("y;s;d"));
+ builder2.setSparql(sparql2);
+ builder2.setChildNodeId("childNodeId");
+ builder2.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.RYA)));
+ final QueryMetadata meta2 = builder2.build();
+
+ try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try (Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, meta1);
+ dao.write(tx, meta2);
+ tx.commit();
+ }
+ ListFluoQueries listFluoQueries = new ListFluoQueries();
+ List<String> queries = listFluoQueries.listFluoQueries(fluoClient);
+
+ FluoQueryStringBuilder queryBuilder1 = new FluoQueryStringBuilder();
+ String expected1 = queryBuilder1.setQueryId(queryId1).setQueryType(QueryType.PROJECTION).setQuery(sparql1)
+ .setExportStrategies(Sets.newHashSet(ExportStrategy.KAFKA)).build();
+
+ FluoQueryStringBuilder queryBuilder2 = new FluoQueryStringBuilder();
+ String expected2 = queryBuilder2.setQueryId(queryId2).setQueryType(QueryType.PROJECTION).setQuery(sparql2)
+ .setExportStrategies(Sets.newHashSet(ExportStrategy.RYA)).build();
+
+ Set<String> expected = new HashSet<>();
+ expected.add(expected1);
+ expected.add(expected2);
+
+ Assert.assertEquals(expected, Sets.newHashSet(queries));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
index 47a2f29..66aa04b 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
@@ -90,7 +90,7 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ.
String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName());
+ getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
List<String> prefixes = Arrays.asList("urn:subject_1", "urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
@@ -130,7 +130,7 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ.
String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName());
+ getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
String joinId = ids.get(2);
@@ -176,7 +176,7 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ.
String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName());
+ getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
String joinId = ids.get(2);
@@ -225,7 +225,7 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
// batch size of joins to 5.
String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName());
+ getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
@@ -264,7 +264,7 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
// batch size of joins to 5.
String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName());
+ getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
@@ -305,7 +305,7 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
// batch size of joins to 5.
String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName());
+ getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index a1d76cb..27b8222 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -33,7 +33,6 @@ import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Span;
-import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj;
@@ -131,7 +130,7 @@ public class CreateDeleteIT extends RyaExportITBase {
// Register the PCJ with Rya.
final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), getAccumuloConnector());
- final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet(ExportStrategy.NO_OP_EXPORT));
+ final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet());
// Write the data to Rya.
final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
new file mode 100644
index 0000000..e61104a
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.xml.datatype.DatatypeFactory;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Span;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
+import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.google.common.collect.Sets;
+
+public class CreateDeletePeriodicPCJ extends KafkaExportITBase {
+
+ @Test
+ public void deletePeriodicPCJ() throws Exception {
+ String query = "prefix function: <http://org.apache.rya/function#> " // n
+ + "prefix time: <http://www.w3.org/2006/time#> " // n
+ + "select (count(?obs) as ?total) where {" // n
+ + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+ + "?obs <uri:hasTime> ?time. " // n
+ + "?obs <uri:hasId> ?id }"; // n
+
+ // Create the Statements that will be loaded into Rya.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final DatatypeFactory dtf = DatatypeFactory.newInstance();
+ ZonedDateTime time = ZonedDateTime.now();
+
+ ZonedDateTime zTime1 = time.minusMinutes(30);
+ String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime2 = zTime1.minusMinutes(30);
+ String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime3 = zTime2.minusMinutes(30);
+ String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+ ZonedDateTime zTime4 = zTime3.minusMinutes(30);
+ String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
+
+ final Collection<Statement> statements = Sets.newHashSet(
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+ vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+ vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+ vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+ vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
+ vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"),
+ vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+ vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")));
+
+ runTest(query, statements, 29);
+
+ }
+
+
+
+ private void runTest(String query, Collection<Statement> statements, int expectedEntries) throws Exception {
+ try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+
+ String topic = "notification_topic";
+ PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), RYA_INSTANCE_NAME);
+ PeriodicNotificationClient notificationClient = new KafkaNotificationRegistrationClient(topic,
+ getNotificationProducer("localhost:9092"));
+
+ CreatePeriodicQuery periodicPCJ = new CreatePeriodicQuery(fluoClient, storage);
+ String id = periodicPCJ.createPeriodicQuery(query, notificationClient).getQueryId();
+
+ loadData(statements);
+
+ // Ensure the data was loaded.
+ final List<Bytes> rows = getFluoTableEntries(fluoClient);
+ assertEquals(expectedEntries, rows.size());
+
+ DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, storage);
+ deletePeriodic.deletePeriodicQuery(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notificationClient);
+
+ // Ensure all data related to the query has been removed.
+ final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
+ assertEquals(0, empty_rows.size());
+
+ // Ensure that Periodic Service notified to add and delete PeriodicNotification
+ Set<CommandNotification> notifications;
+ try (KafkaConsumer<String, CommandNotification> consumer = makeNotificationConsumer(topic)) {
+ notifications = getKafkaNotifications(topic, 7000, consumer);
+ }
+ assertEquals(2, notifications.size());
+
+ String notificationId = "";
+ boolean addCalled = false;
+ boolean deleteCalled = false;
+ for (CommandNotification notification : notifications) {
+ if (notificationId.length() == 0) {
+ notificationId = notification.getId();
+ } else {
+ assertEquals(notificationId, notification.getId());
+ }
+
+ if (notification.getCommand() == Command.ADD) {
+ addCalled = true;
+ }
+
+ if (notification.getCommand() == Command.DELETE) {
+ deleteCalled = true;
+ }
+ }
+
+ assertEquals(true, addCalled);
+ assertEquals(true, deleteCalled);
+ }
+ }
+
+ private List<Bytes> getFluoTableEntries(final FluoClient fluoClient) {
+ try (Snapshot snapshot = fluoClient.newSnapshot()) {
+ final List<Bytes> rows = new ArrayList<>();
+ final RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build();
+
+ for (final ColumnScanner cscanner : rscanner) {
+ rows.add(cscanner.getRow());
+ }
+
+ return rows;
+ }
+ }
+
+ private KafkaProducer<String, CommandNotification> getNotificationProducer(String bootStrapServers) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
+ return new KafkaProducer<>(props);
+ }
+
+ private KafkaConsumer<String, CommandNotification> makeNotificationConsumer(final String topic) {
+ // setup consumer
+ final Properties consumerProps = new Properties();
+ consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+ consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
+
+ // to make sure the consumer starts from the beginning of the topic
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ final KafkaConsumer<String, CommandNotification> consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Arrays.asList(topic));
+ return consumer;
+ }
+
+ private Set<CommandNotification> getKafkaNotifications(String topic, int pollTime,
+ KafkaConsumer<String, CommandNotification> consumer) {
+ requireNonNull(topic);
+
+ // Read all of the results from the Kafka topic.
+ final Set<CommandNotification> results = new HashSet<>();
+
+ final ConsumerRecords<String, CommandNotification> records = consumer.poll(pollTime);
+ final Iterator<ConsumerRecord<String, CommandNotification>> recordIterator = records.iterator();
+ while (recordIterator.hasNext()) {
+ results.add(recordIterator.next().value());
+ }
+
+ return results;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index 8911f56..dbedfb3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -90,7 +90,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
FluoITHelper.printFluoTable(super.getFluoConfiguration());
@@ -136,7 +136,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
// Create the expected results of the SPARQL query once the PCJ has been computed.
final MapBindingSet expectedResult = new MapBindingSet();
@@ -163,7 +163,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
// Create the expected results of the SPARQL query once the PCJ has been computed.
final MapBindingSet expectedResult = new MapBindingSet();
@@ -194,7 +194,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(3.99)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
// Create the expected results of the SPARQL query once the PCJ has been computed.
final MapBindingSet expectedResult = new MapBindingSet();
@@ -221,7 +221,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:count"), vf.createLiteral(2)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
// Create the expected results of the SPARQL query once the PCJ has been computed.
final MapBindingSet expectedResult = new MapBindingSet();
@@ -248,7 +248,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(8)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
try(FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) {
FluoITHelper.printFluoTable(fluo);
@@ -280,7 +280,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
// Create the expected results of the SPARQL query once the PCJ has been computed.
final MapBindingSet expectedResult = new MapBindingSet();
@@ -307,7 +307,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(2.75)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
// Create the expected results of the SPARQL query once the PCJ has been computed.
final MapBindingSet expectedResult = new MapBindingSet();
@@ -338,7 +338,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:banana"), vf.createURI("urn:price"), vf.createLiteral(1.99)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
// Create the expected results of the SPARQL query once the PCJ has been computed.
final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
@@ -399,7 +399,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
// Create the expected results of the SPARQL query once the PCJ has been computed.
final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
@@ -477,7 +477,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
// Create the expected results of the SPARQL query once the PCJ has been computed.
final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
@@ -554,7 +554,7 @@ public class KafkaExportIT extends KafkaExportITBase {
vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
// Create the PCJ in Fluo and load the statements into Rya.
- final String pcjId = loadData(sparql, statements);
+ final String pcjId = loadDataAndCreateQuery(sparql, statements);
// Create the expected results of the SPARQL query once the PCJ has been computed.
final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 0aefaca..4974aee 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -70,10 +70,6 @@ import com.google.common.collect.Sets;
*/
public class QueryIT extends RyaExportITBase {
- private enum ExporterType {
- Pcj, Periodic
- };
-
@Test
public void optionalStatements() throws Exception {
// A query that has optional statement patterns. This query is looking for all
@@ -112,7 +108,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+ runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
}
/**
@@ -187,7 +183,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+ runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
}
@Test
@@ -241,7 +237,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+ runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
}
@Test
@@ -278,7 +274,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+ runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
}
@Test
@@ -359,7 +355,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+ runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
}
@Test
@@ -424,7 +420,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(sparql, statements, expectedResults, ExporterType.Pcj);
+ runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
}
@Test
@@ -525,7 +521,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(query, statements, expectedResults, ExporterType.Periodic);
+ runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
}
@Test
@@ -596,7 +592,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(query, statements, expectedResults, ExporterType.Periodic);
+ runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
}
@Test
@@ -713,7 +709,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(query, statements, expectedResults, ExporterType.Periodic);
+ runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
}
@@ -792,7 +788,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(query, statements, expectedResults, ExporterType.Periodic);
+ runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
}
@Test
@@ -876,7 +872,7 @@ public class QueryIT extends RyaExportITBase {
expectedResults.add(bs);
// Verify the end results of the query match the expected results.
- runTest(query, statements, expectedResults, ExporterType.Periodic);
+ runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
}
@Test(expected= UnsupportedQueryException.class)
@@ -896,11 +892,11 @@ public class QueryIT extends RyaExportITBase {
final Set<BindingSet> expectedResults = new HashSet<>();
// Verify the end results of the query match the expected results.
- runTest(query, statements, expectedResults, ExporterType.Periodic);
+ runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
}
public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults,
- ExporterType type) throws Exception {
+ ExportStrategy strategy) throws Exception {
requireNonNull(sparql);
requireNonNull(statements);
requireNonNull(expectedResults);
@@ -910,8 +906,8 @@ public class QueryIT extends RyaExportITBase {
final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn);
- switch (type) {
- case Pcj:
+ switch (strategy) {
+ case RYA:
ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
addStatementsAndWait(statements);
// Fetch the value that is stored within the PCJ table.
@@ -922,11 +918,11 @@ public class QueryIT extends RyaExportITBase {
assertEquals(expectedResults, results);
}
break;
- case Periodic:
+ case PERIODIC:
PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName());
String periodicId = periodicStorage.createPeriodicQuery(sparql);
try (FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) {
- new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo);
+ new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluo);
}
addStatementsAndWait(statements);
@@ -938,6 +934,8 @@ public class QueryIT extends RyaExportITBase {
}
assertEquals(expectedResults, results);
break;
+ default:
+ throw new RuntimeException("Invalid export option");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index ed9ce60..59fe54f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -321,7 +321,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
return consumer;
}
- protected String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
+ protected String loadDataAndCreateQuery(final String sparql, final Collection<Statement> statements) throws Exception {
requireNonNull(sparql);
requireNonNull(statements);
@@ -334,7 +334,16 @@ public class KafkaExportITBase extends AccumuloExportITBase {
final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA));
- // Write the data to Rya.
+ loadData(statements);
+
+ // The PCJ Id is the topic name the results will be written to.
+ return pcjId;
+ }
+
+ protected void loadData(final Collection<Statement> statements) throws Exception {
+
+ requireNonNull(statements);
+
final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
ryaConn.begin();
ryaConn.add(statements);
@@ -343,9 +352,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
// Wait for the Fluo application to finish computing the end result.
super.getMiniFluo().waitForObservers();
-
- // The PCJ Id is the topic name the results will be written to.
- return pcjId;
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/.gitignore
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/.gitignore b/extras/rya.periodic.service/periodic.service.api/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/.gitignore
@@ -0,0 +1 @@
+/target/
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/pom.xml b/extras/rya.periodic.service/periodic.service.api/pom.xml
new file mode 100644
index 0000000..b57beaf
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/pom.xml
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!-- Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with this
+ work for additional information regarding copyright ownership. The ASF licenses
+ this file to you under the Apache License, Version 2.0 (the "License"); you
+ may not use this file except in compliance with the License. You may obtain
+ a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+ required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.service</artifactId>
+ <version>3.2.11-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.periodic.service.api</artifactId>
+
+ <name>Apache Rya Periodic Service API</name>
+ <description>API for Periodic Service Application</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-query</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing.pcj</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
new file mode 100644
index 0000000..f4a083c
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Object that cleans up old {@link BindingSet}s corresponding to the specified
+ * {@link NodeBin}. This class deletes all BindingSets with the bin
+ * indicated by {@link NodeBin#getBin()}. A BindingSet corresponds to a given
+ * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}
+ * and value equal to the given bin.
+ *
+ */
+public interface BinPruner {
+
+ /**
+ * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}.
+ * @param bin - NodeBin that indicates which BindingSets to delete..
+ */
+ public void pruneBindingSetBin(NodeBin bin);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
new file mode 100644
index 0000000..491576b
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.BindingSet;
+
+/**
+ * An Object that is used to export {@link BindingSet}s to an external repository or queuing system.
+ *
+ */
+public interface BindingSetExporter {
+
+ /**
+ * This method exports the BindingSet to the external repository or queuing system
+ * that this BindingSetExporter is configured to export to.
+ * @param bindingSet - {@link BindingSet} to be exported
+ * @throws ResultExportException
+ */
+ public void exportNotification(BindingSetRecord bindingSet) throws BindingSetRecordExportException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
new file mode 100644
index 0000000..c3f70f1
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Objects;
+
+/**
+ * Object that associates a {@link BindingSet} with a given Kafka topic.
+ * This ensures that the {@link KafkaPeriodicBindingSetExporter} can export
+ * each BindingSet to its appropriate topic.
+ *
+ */
+public class BindingSetRecord {
+
+ private BindingSet bs;
+ private String topic;
+
+ public BindingSetRecord(BindingSet bs, String topic) {
+ this.bs = bs;
+ this.topic = topic;
+ }
+
+ /**
+ * @return BindingSet in this BindingSetRecord
+ */
+ public BindingSet getBindingSet() {
+ return bs;
+ }
+
+ /**
+ * @return Kafka topic for this BindingSetRecord
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if(this == o) {
+ return true;
+ }
+
+ if(o instanceof BindingSetRecord) {
+ BindingSetRecord record = (BindingSetRecord) o;
+ return Objects.equal(this.bs, record.bs)&&Objects.equal(this.topic,record.topic);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(bs, topic);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("Binding Set Record \n").append(" Topic: " + topic + "\n").append(" BindingSet: " + bs + "\n")
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
new file mode 100644
index 0000000..94e4980
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * A result could not be exported.
+ */
+public class BindingSetRecordExportException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs an instance of {@link BindingSetRecordExportException}.
+ *
+ * @param message - Explains why the exception was thrown.
+ */
+ public BindingSetRecordExportException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an instance of {@link BindingSetRecordExportException}.
+ *
+ * @param message - Explains why the exception was thrown.
+ * @param cause - The exception that caused this one to be thrown.
+ */
+ public BindingSetRecordExportException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
new file mode 100644
index 0000000..b1e8bad
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Interface providing basic life cycle functionality,
+ * including stopping and starting any class implementing this
+ * interface and checking whether is it running.
+ *
+ */
+public interface LifeCycle {
+
+ /**
+ * Starts a running application.
+ */
+ public void start();
+
+ /**
+ * Stops a running application.
+ */
+ public void stop();
+
+ /**
+ * Determine if application is currently running.
+ * @return true if application is running and false otherwise.
+ */
+ public boolean currentlyRunning();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
new file mode 100644
index 0000000..3ed7979
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.Objects;
+
+/**
+ * Object used to indicate the id of a given Periodic Query
+ * along with a particular bin of results. This Object is used
+ * by the {@link BinPruner} to clean up old query results after
+ * they have been processed.
+ *
+ */
+public class NodeBin {
+
+ private long bin;
+ private String nodeId;
+
+ public NodeBin(String nodeId, long bin) {
+ this.bin = bin;
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return id of Periodic Query
+ */
+ public String getNodeId() {
+ return nodeId;
+ }
+/**
+ * @return bin id of results for a given Periodic Query
+ */
+ public long getBin() {
+ return bin;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other instanceof NodeBin) {
+ NodeBin bin = (NodeBin) other;
+ return this.bin == bin.bin && this.nodeId.equals(bin.nodeId);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bin, nodeId);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("Node Bin \n").append(" QueryId: " + nodeId + "\n").append(" Bin: " + bin + "\n").toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
new file mode 100644
index 0000000..3e9e0d1
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public interface Notification {
+
+ /**
+ * @return id of a Periodic Query
+ */
+ public String getId();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
new file mode 100644
index 0000000..d53dc17
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+
+/**
+ * Object that manages the periodic notifications for the Periodic Query Service.
+ * This Object processes new requests for periodic updates by registering them with
+ * some sort of service that generates periodic updates (such as a {@link ScheduledExecutorService}).
+ *
+ */
+public interface NotificationCoordinatorExecutor extends LifeCycle {
+
+ /**
+ * Registers or deletes a {@link CommandNotification}s with the periodic service to
+ * generate notifications at a regular interval indicated by the CommandNotification.
+ * @param notification - CommandNotification to be registered or deleted from the periodic update
+ * service.
+ */
+ public void processNextCommandNotification(CommandNotification notification);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
new file mode 100644
index 0000000..4ac9089
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
@@ -0,0 +1,41 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+
+/**
+ * Object that processes new {@link TimestampedNotification}s generated by {@link NotificationCoordinatorExecutor}.
+ * It is expected that the NotificationCoordinatorExecutor will this Object with notifications to perform work via some sort
+ * sort of queuing service such as a BlockingQueue or Kafka. This Object processes the notifications by retrieving
+ * query results associated with the Periodic Query id given by {@link TimestampedNotification#getId()}, parsing them
+ * and then providing them to another service to be exported.
+ *
+ */
+public interface NotificationProcessor {
+
+ /**
+ * Processes {@link TimestampedNotification}s by retrieving the Periodic Query results
+ * associated the query id given by {@link TimestampedNotification#getId()}.
+ * @param notification - contains information about which query results to retrieve
+ */
+ public void processNotification(TimestampedNotification notification);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
new file mode 100644
index 0000000..ff08733
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ * Object to register {@link PeriodicNotification}s with an external queuing
+ * service to be handled by a {@link NotificationCoordinatorExecutor} service.
+ * The service will generate notifications to process Periodic Query results at regular
+ * intervals corresponding the period of the PeriodicNotification.
+ *
+ */
+public interface PeriodicNotificationClient extends AutoCloseable {
+
+ /**
+ * Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor}
+ * @param notification - notification to be added
+ */
+ public void addNotification(PeriodicNotification notification);
+
+ /**
+ * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
+ * @param notification - notification to be deleted
+ */
+ public void deleteNotification(BasicNotification notification);
+
+ /**
+ * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
+ * @param notification - id corresponding to the notification to be deleted
+ */
+ public void deleteNotification(String notificationId);
+
+ /**
+ * Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor}
+ * @param id - Periodic Query id
+ * @param period - period indicating frequency at which notifications will be generated
+ * @param delay - initial delay for starting periodic notifications
+ * @param unit - time unit of delay and period
+ */
+ public void addNotification(String id, long period, long delay, TimeUnit unit);
+
+ public void close();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
new file mode 100644
index 0000000..c31a5c0
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public class BasicNotification implements Notification {
+
+ private String id;
+
+ /**
+ * Creates a BasicNotification
+ * @param id - Fluo query id associated with this Notification
+ */
+ public BasicNotification(String id) {
+ this.id = id;
+ }
+
+ /**
+ * @return the Fluo Query Id that this notification will generate results for
+ */
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other instanceof BasicNotification) {
+ BasicNotification not = (BasicNotification) other;
+ return Objects.equal(this.id, not.id);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ return builder.append("id").append("=").append(id).toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
new file mode 100644
index 0000000..597b228
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This Object contains a Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic Query with the
+ * indicated id. Additionally, the CommandNotification contains a
+ * {@link Command} about which action the
+ * {@link NotificationCoordinatorExecutor} should take (adding or deleting).
+ * CommandNotifications are meant to be added to an external work queue (such as
+ * Kafka) to be processed by the NotificationCoordinatorExecutor.
+ *
+ */
+public class CommandNotification implements Notification {
+
+ private Notification notification;
+ private Command command;
+
+ public enum Command {
+ ADD, DELETE
+ };
+
+ /**
+ * Creates a new CommandNotification
+ * @param command - the command associated with this notification (either add, update, or delete)
+ * @param notification - the underlying notification associated with this command
+ */
+ public CommandNotification(Command command, Notification notification) {
+ this.notification = Preconditions.checkNotNull(notification);
+ this.command = Preconditions.checkNotNull(command);
+ }
+
+ @Override
+ public String getId() {
+ return notification.getId();
+ }
+
+ /**
+ * Returns {@link Notification} contained by this CommmandNotification.
+ * @return - Notification contained by this Object
+ */
+ public Notification getNotification() {
+ return this.notification;
+ }
+
+ /**
+ * @return Command contained by this Object (either add or delete)
+ */
+ public Command getCommand() {
+ return this.command;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other instanceof CommandNotification) {
+ CommandNotification cn = (CommandNotification) other;
+ return Objects.equal(this.command, cn.command) && Objects.equal(this.notification, cn.notification);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(command, notification);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("command").append("=").append(command.toString()).append(";")
+ .append(notification.toString()).toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
new file mode 100644
index 0000000..aa9e581
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Notification Object used by the Periodic Query Service to inform workers to
+ * process results for a given Periodic Query with the indicated id.
+ * Additionally, this Object contains a period that indicates a frequency at
+ * which regular updates are generated.
+ *
+ */
+public class PeriodicNotification implements Notification {
+
+ private String id;
+ private long period;
+ private TimeUnit periodTimeUnit;
+ private long initialDelay;
+
+ /**
+ * Creates a PeriodicNotification.
+ * @param id - Fluo Query Id that this notification is associated with
+ * @param period - period at which notifications are generated
+ * @param periodTimeUnit - time unit associated with the period and delay
+ * @param initialDelay - amount of time to wait before generating the first notification
+ */
+ public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) {
+ this.id = Preconditions.checkNotNull(id);
+ this.periodTimeUnit = Preconditions.checkNotNull(periodTimeUnit);
+ Preconditions.checkArgument(period > 0 && initialDelay >= 0);
+ this.period = period;
+ this.initialDelay = initialDelay;
+ }
+
+
+ /**
+ * Create a PeriodicNotification
+ * @param other - other PeriodicNotification used in copy constructor
+ */
+ public PeriodicNotification(PeriodicNotification other) {
+ this(other.id, other.period, other.periodTimeUnit, other.initialDelay);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * @return - period at which regular notifications are generated
+ */
+ public long getPeriod() {
+ return period;
+ }
+
+ /**
+ * @return time unit of period and initial delay
+ */
+ public TimeUnit getTimeUnit() {
+ return periodTimeUnit;
+ }
+
+ /**
+ * @return amount of time to delay before beginning to generate notifications
+ */
+ public long getInitialDelay() {
+ return initialDelay;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ String delim = "=";
+ String delim2 = ";";
+ return builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2)
+ .append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim)
+ .append(initialDelay).toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (!(other instanceof PeriodicNotification)) {
+ return false;
+ }
+
+ PeriodicNotification notification = (PeriodicNotification) other;
+ return Objects.equals(this.id, notification.id) && (this.period == notification.period)
+ && Objects.equals(this.periodTimeUnit, notification.periodTimeUnit) && (this.initialDelay == notification.initialDelay);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, period, periodTimeUnit, initialDelay);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private String id;
+ private long period;
+ private TimeUnit periodTimeUnit;
+ private long initialDelay = 0;
+
+ /**
+ * @param id - periodic query id
+ * @return - builder to chain method calls
+ */
+ public Builder id(String id) {
+ this.id = id;
+ return this;
+ }
+
+ /**
+ * @param period of the periodic notification for generating regular notifications
+ * @return - builder to chain method calls
+ */
+ public Builder period(long period) {
+ this.period = period;
+ return this;
+ }
+
+ /**
+ * @param timeUnit of period and initial delay
+ * @return - builder to chain method calls
+ */
+ public Builder timeUnit(TimeUnit timeUnit) {
+ this.periodTimeUnit = timeUnit;
+ return this;
+ }
+
+ /**
+ * @param initialDelay - amount of time to wait before generating notifications
+ * @return - builder to chain method calls
+ */
+ public Builder initialDelay(long initialDelay) {
+ this.initialDelay = initialDelay;
+ return this;
+ }
+
+ /**
+ * Builds PeriodicNotification
+ * @return PeriodicNotification constructed from Builder specified parameters
+ */
+ public PeriodicNotification build() {
+ return new PeriodicNotification(id, period, periodTimeUnit, initialDelay);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
new file mode 100644
index 0000000..38073ce
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link PeriodicNotification} Object used by the Periodic Query Service to inform workers to
+ * process results for a given Periodic Query with the indicated id. Additionally
+ * this Object contains a {@link Date} object to indicate the date time at which this
+ * notification was generated.
+ *
+ */
+public class TimestampedNotification extends PeriodicNotification {
+
+ private Date date;
+
+ /**
+ * Constructs a TimestampedNotification
+ * @param id - Fluo Query Id associated with this Notification
+ * @param period - period at which notifications are generated
+ * @param periodTimeUnit - time unit associated with period and initial delay
+ * @param initialDelay - amount of time to wait before generating first notification
+ */
+ public TimestampedNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) {
+ super(id, period, periodTimeUnit, initialDelay);
+ date = new Date();
+ }
+
+ /**
+ * Creates a TimestampedNotification
+ * @param notification - PeriodicNotification used to create this TimestampedNotification.
+ * This constructor creates a time stamp for the TimestampedNotification.
+ */
+ public TimestampedNotification(PeriodicNotification notification) {
+ super(notification);
+ date = new Date();
+ }
+
+ /**
+ * @return timestamp at which this notification was generated
+ */
+ public Date getTimestamp() {
+ return date;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ";date=" + date;
+ }
+
+}