You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/09/11 16:14:40 UTC
[7/7] incubator-rya git commit: RYA-355 Refactored the periodic
notification service structure. Closes #221.
RYA-355 Refactored the periodic notification service structure. Closes #221.
New artifactIds align better with code packaging, semantics.
Light pom cleaning.
Eliminates redundant text in file paths.
Makes adding the twill app for RYA-356 a little cleaner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/de365c17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/de365c17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/de365c17
Branch: refs/heads/master
Commit: de365c1795b700cc5383be1927c4e598c37d948a
Parents: 28b0a52
Author: jdasch <hc...@gmail.com>
Authored: Fri Sep 1 20:51:25 2017 -0400
Committer: Caleb Meier <ca...@parsons.com>
Committed: Mon Sep 11 09:13:53 2017 -0700
----------------------------------------------------------------------
extras/indexing/pom.xml | 2 +-
extras/periodic.notification/api/.gitignore | 1 +
extras/periodic.notification/api/pom.xml | 52 ++
.../periodic/notification/api/BinPruner.java | 40 ++
.../notification/api/BindingSetExporter.java | 37 ++
.../notification/api/BindingSetRecord.java | 80 +++
.../api/BindingSetRecordExportException.java | 45 ++
.../periodic/notification/api/LifeCycle.java | 45 ++
.../rya/periodic/notification/api/NodeBin.java | 77 +++
.../periodic/notification/api/Notification.java | 34 ++
.../api/NotificationCoordinatorExecutor.java | 41 ++
.../notification/api/NotificationProcessor.java | 41 ++
.../api/PeriodicNotificationClient.java | 64 +++
.../notification/BasicNotification.java | 76 +++
.../notification/CommandNotification.java | 99 ++++
.../notification/PeriodicNotification.java | 178 +++++++
.../notification/TimestampedNotification.java | 69 +++
.../KafkaNotificationRegistrationClient.java | 80 +++
.../BasicNotificationTypeAdapter.java | 55 +++
.../serialization/BindingSetSerDe.java | 105 ++++
.../CommandNotificationSerializer.java | 76 +++
.../CommandNotificationTypeAdapter.java | 89 ++++
.../PeriodicNotificationTypeAdapter.java | 73 +++
extras/periodic.notification/pom.xml | 40 ++
extras/periodic.notification/service/pom.xml | 102 ++++
.../PeriodicApplicationException.java | 47 ++
.../PeriodicNotificationApplication.java | 207 ++++++++
...dicNotificationApplicationConfiguration.java | 254 ++++++++++
.../PeriodicNotificationApplicationFactory.java | 140 ++++++
...PeriodicNotificationCoordinatorExecutor.java | 159 ++++++
.../exporter/KafkaExporterExecutor.java | 110 +++++
.../KafkaPeriodicBindingSetExporter.java | 99 ++++
.../NotificationProcessorExecutor.java | 114 +++++
.../TimestampedNotificationProcessor.java | 203 ++++++++
.../notification/pruner/AccumuloBinPruner.java | 66 +++
.../notification/pruner/FluoBinPruner.java | 76 +++
.../pruner/PeriodicQueryPruner.java | 107 ++++
.../pruner/PeriodicQueryPrunerExecutor.java | 104 ++++
.../recovery/PeriodicNotificationProvider.java | 142 ++++++
.../kafka/KafkaNotificationProvider.java | 123 +++++
.../kafka/PeriodicNotificationConsumer.java | 88 ++++
.../CommandNotificationSerializerTest.java | 60 +++
extras/periodic.notification/tests/pom.xml | 62 +++
.../PeriodicNotificationApplicationIT.java | 493 +++++++++++++++++++
.../PeriodicNotificationProviderIT.java | 71 +++
.../PeriodicNotificationExporterIT.java | 143 ++++++
.../PeriodicNotificationProcessorIT.java | 121 +++++
.../pruner/PeriodicNotificationBinPrunerIT.java | 283 +++++++++++
.../PeriodicCommandNotificationConsumerIT.java | 139 ++++++
.../tests/src/test/resources/log4j.properties | 37 ++
.../src/test/resources/notification.properties | 35 ++
extras/pom.xml | 2 +-
extras/rya.pcj.fluo/pcj.fluo.api/pom.xml | 12 +-
.../periodic.service.api/.gitignore | 1 -
.../periodic.service.api/pom.xml | 52 --
.../periodic/notification/api/BinPruner.java | 40 --
.../notification/api/BindingSetExporter.java | 37 --
.../notification/api/BindingSetRecord.java | 80 ---
.../api/BindingSetRecordExportException.java | 45 --
.../periodic/notification/api/LifeCycle.java | 45 --
.../rya/periodic/notification/api/NodeBin.java | 77 ---
.../periodic/notification/api/Notification.java | 34 --
.../api/NotificationCoordinatorExecutor.java | 41 --
.../notification/api/NotificationProcessor.java | 41 --
.../api/PeriodicNotificationClient.java | 64 ---
.../notification/BasicNotification.java | 76 ---
.../notification/CommandNotification.java | 99 ----
.../notification/PeriodicNotification.java | 178 -------
.../notification/TimestampedNotification.java | 69 ---
.../KafkaNotificationRegistrationClient.java | 80 ---
.../BasicNotificationTypeAdapter.java | 55 ---
.../serialization/BindingSetSerDe.java | 105 ----
.../CommandNotificationSerializer.java | 76 ---
.../CommandNotificationTypeAdapter.java | 89 ----
.../PeriodicNotificationTypeAdapter.java | 73 ---
.../periodic.service.integration.tests/pom.xml | 62 ---
.../PeriodicNotificationApplicationIT.java | 493 -------------------
.../PeriodicNotificationProviderIT.java | 71 ---
.../PeriodicNotificationExporterIT.java | 143 ------
.../PeriodicNotificationProcessorIT.java | 121 -----
.../pruner/PeriodicNotificationBinPrunerIT.java | 283 -----------
.../PeriodicCommandNotificationConsumerIT.java | 139 ------
.../src/test/resources/log4j.properties | 37 --
.../src/test/resources/notification.properties | 35 --
.../periodic.service.notification/pom.xml | 112 -----
.../PeriodicApplicationException.java | 47 --
.../PeriodicNotificationApplication.java | 207 --------
...dicNotificationApplicationConfiguration.java | 254 ----------
.../PeriodicNotificationApplicationFactory.java | 140 ------
...PeriodicNotificationCoordinatorExecutor.java | 159 ------
.../exporter/KafkaExporterExecutor.java | 110 -----
.../KafkaPeriodicBindingSetExporter.java | 99 ----
.../NotificationProcessorExecutor.java | 114 -----
.../TimestampedNotificationProcessor.java | 203 --------
.../notification/pruner/AccumuloBinPruner.java | 66 ---
.../notification/pruner/FluoBinPruner.java | 76 ---
.../pruner/PeriodicQueryPruner.java | 107 ----
.../pruner/PeriodicQueryPrunerExecutor.java | 104 ----
.../recovery/PeriodicNotificationProvider.java | 142 ------
.../kafka/KafkaNotificationProvider.java | 123 -----
.../kafka/PeriodicNotificationConsumer.java | 88 ----
.../CommandNotificationSerializerTest.java | 60 ---
extras/rya.periodic.service/pom.xml | 40 --
pom.xml | 11 +-
104 files changed, 5093 insertions(+), 5108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml
index 16a205f..1d3a32b 100644
--- a/extras/indexing/pom.xml
+++ b/extras/indexing/pom.xml
@@ -83,7 +83,7 @@
</dependency>
<dependency>
<groupId>org.apache.rya</groupId>
- <artifactId>rya.periodic.service.api</artifactId>
+ <artifactId>rya.periodic.notification.api</artifactId>
</dependency>
<!-- OpenRDF -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/.gitignore
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/.gitignore b/extras/periodic.notification/api/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/extras/periodic.notification/api/.gitignore
@@ -0,0 +1 @@
+/target/
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/pom.xml b/extras/periodic.notification/api/pom.xml
new file mode 100644
index 0000000..aecd723
--- /dev/null
+++ b/extras/periodic.notification/api/pom.xml
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!-- Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with this
+ work for additional information regarding copyright ownership. The ASF licenses
+ this file to you under the Apache License, Version 2.0 (the "License"); you
+ may not use this file except in compliance with the License. You may obtain
+ a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+ required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.notification.parent</artifactId>
+ <version>3.2.11-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.periodic.notification.api</artifactId>
+
+ <name>Apache Rya Periodic Notification API</name>
+ <description>API for Periodic Notification Applications</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-query</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing.pcj</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
new file mode 100644
index 0000000..f4a083c
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Object that cleans up old {@link BindingSet}s corresponding to the specified
+ * {@link NodeBin}. This class deletes all BindingSets with the bin
+ * indicated by {@link NodeBin#getBin()}. A BindingSet corresponds to a given
+ * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}
+ * and value equal to the given bin.
+ *
+ */
+public interface BinPruner {
+
+ /**
+ * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}.
+ * @param bin - NodeBin that indicates which BindingSets to delete..
+ */
+ public void pruneBindingSetBin(NodeBin bin);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
new file mode 100644
index 0000000..491576b
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.BindingSet;
+
+/**
+ * An Object that is used to export {@link BindingSet}s to an external repository or queuing system.
+ *
+ */
+public interface BindingSetExporter {
+
+ /**
+ * This method exports the BindingSet to the external repository or queuing system
+ * that this BindingSetExporter is configured to export to.
+ * @param bindingSet - {@link BindingSet} to be exported
+ * @throws ResultExportException
+ */
+ public void exportNotification(BindingSetRecord bindingSet) throws BindingSetRecordExportException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
new file mode 100644
index 0000000..c3f70f1
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Objects;
+
+/**
+ * Object that associates a {@link BindingSet} with a given Kafka topic.
+ * This ensures that the {@link KafkaPeriodicBindingSetExporter} can export
+ * each BindingSet to its appropriate topic.
+ *
+ */
+public class BindingSetRecord {
+
+ private BindingSet bs;
+ private String topic;
+
+ public BindingSetRecord(BindingSet bs, String topic) {
+ this.bs = bs;
+ this.topic = topic;
+ }
+
+ /**
+ * @return BindingSet in this BindingSetRecord
+ */
+ public BindingSet getBindingSet() {
+ return bs;
+ }
+
+ /**
+ * @return Kafka topic for this BindingSetRecord
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if(this == o) {
+ return true;
+ }
+
+ if(o instanceof BindingSetRecord) {
+ BindingSetRecord record = (BindingSetRecord) o;
+ return Objects.equal(this.bs, record.bs)&&Objects.equal(this.topic,record.topic);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(bs, topic);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("Binding Set Record \n").append(" Topic: " + topic + "\n").append(" BindingSet: " + bs + "\n")
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
new file mode 100644
index 0000000..94e4980
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * A result could not be exported.
+ */
+public class BindingSetRecordExportException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs an instance of {@link BindingSetRecordExportException}.
+ *
+ * @param message - Explains why the exception was thrown.
+ */
+ public BindingSetRecordExportException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an instance of {@link BindingSetRecordExportException}.
+ *
+ * @param message - Explains why the exception was thrown.
+ * @param cause - The exception that caused this one to be thrown.
+ */
+ public BindingSetRecordExportException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
new file mode 100644
index 0000000..b1e8bad
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Interface providing basic life cycle functionality,
+ * including stopping and starting any class implementing this
+ * interface and checking whether is it running.
+ *
+ */
+public interface LifeCycle {
+
+ /**
+ * Starts a running application.
+ */
+ public void start();
+
+ /**
+ * Stops a running application.
+ */
+ public void stop();
+
+ /**
+ * Determine if application is currently running.
+ * @return true if application is running and false otherwise.
+ */
+ public boolean currentlyRunning();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
new file mode 100644
index 0000000..3ed7979
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.Objects;
+
+/**
+ * Object used to indicate the id of a given Periodic Query
+ * along with a particular bin of results. This Object is used
+ * by the {@link BinPruner} to clean up old query results after
+ * they have been processed.
+ *
+ */
+public class NodeBin {
+
+ private long bin;
+ private String nodeId;
+
+ public NodeBin(String nodeId, long bin) {
+ this.bin = bin;
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return id of Periodic Query
+ */
+ public String getNodeId() {
+ return nodeId;
+ }
+/**
+ * @return bin id of results for a given Periodic Query
+ */
+ public long getBin() {
+ return bin;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other instanceof NodeBin) {
+ NodeBin bin = (NodeBin) other;
+ return this.bin == bin.bin && this.nodeId.equals(bin.nodeId);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bin, nodeId);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("Node Bin \n").append(" QueryId: " + nodeId + "\n").append(" Bin: " + bin + "\n").toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
new file mode 100644
index 0000000..3e9e0d1
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public interface Notification {
+
+ /**
+ * @return id of a Periodic Query
+ */
+ public String getId();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
new file mode 100644
index 0000000..d53dc17
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+
+/**
+ * Object that manages the periodic notifications for the Periodic Query Service.
+ * This Object processes new requests for periodic updates by registering them with
+ * some sort of service that generates periodic updates (such as a {@link ScheduledExecutorService}).
+ *
+ */
+public interface NotificationCoordinatorExecutor extends LifeCycle {
+
+ /**
+ * Registers or deletes a {@link CommandNotification}s with the periodic service to
+ * generate notifications at a regular interval indicated by the CommandNotification.
+ * @param notification - CommandNotification to be registered or deleted from the periodic update
+ * service.
+ */
+ public void processNextCommandNotification(CommandNotification notification);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
new file mode 100644
index 0000000..4ac9089
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
@@ -0,0 +1,41 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+
+/**
+ * Object that processes new {@link TimestampedNotification}s generated by {@link NotificationCoordinatorExecutor}.
+ * It is expected that the NotificationCoordinatorExecutor will this Object with notifications to perform work via some sort
+ * sort of queuing service such as a BlockingQueue or Kafka. This Object processes the notifications by retrieving
+ * query results associated with the Periodic Query id given by {@link TimestampedNotification#getId()}, parsing them
+ * and then providing them to another service to be exported.
+ *
+ */
+public interface NotificationProcessor {
+
+ /**
+ * Processes {@link TimestampedNotification}s by retrieving the Periodic Query results
+ * associated the query id given by {@link TimestampedNotification#getId()}.
+ * @param notification - contains information about which query results to retrieve
+ */
+ public void processNotification(TimestampedNotification notification);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
new file mode 100644
index 0000000..ff08733
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ * Object to register {@link PeriodicNotification}s with an external queuing
+ * service to be handled by a {@link NotificationCoordinatorExecutor} service.
+ * The service will generate notifications to process Periodic Query results at regular
+ * intervals corresponding the period of the PeriodicNotification.
+ *
+ */
+public interface PeriodicNotificationClient extends AutoCloseable {
+
+ /**
+ * Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor}
+ * @param notification - notification to be added
+ */
+ public void addNotification(PeriodicNotification notification);
+
+ /**
+ * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
+ * @param notification - notification to be deleted
+ */
+ public void deleteNotification(BasicNotification notification);
+
+ /**
+ * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
+ * @param notification - id corresponding to the notification to be deleted
+ */
+ public void deleteNotification(String notificationId);
+
+ /**
+ * Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor}
+ * @param id - Periodic Query id
+ * @param period - period indicating frequency at which notifications will be generated
+ * @param delay - initial delay for starting periodic notifications
+ * @param unit - time unit of delay and period
+ */
+ public void addNotification(String id, long period, long delay, TimeUnit unit);
+
+ public void close();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
new file mode 100644
index 0000000..c31a5c0
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public class BasicNotification implements Notification {
+
+ private String id;
+
+ /**
+ * Creates a BasicNotification
+ * @param id - Fluo query id associated with this Notification
+ */
+ public BasicNotification(String id) {
+ this.id = id;
+ }
+
+ /**
+ * @return the Fluo Query Id that this notification will generate results for
+ */
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other instanceof BasicNotification) {
+ BasicNotification not = (BasicNotification) other;
+ return Objects.equal(this.id, not.id);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ return builder.append("id").append("=").append(id).toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
new file mode 100644
index 0000000..597b228
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This Object contains a Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic Query with the
+ * indicated id. Additionally, the CommandNotification contains a
+ * {@link Command} about which action the
+ * {@link NotificationCoordinatorExecutor} should take (adding or deleting).
+ * CommandNotifications are meant to be added to an external work queue (such as
+ * Kafka) to be processed by the NotificationCoordinatorExecutor.
+ *
+ */
+public class CommandNotification implements Notification {
+
+ private Notification notification;
+ private Command command;
+
+ public enum Command {
+ ADD, DELETE
+ };
+
+ /**
+ * Creates a new CommandNotification
+ * @param command - the command associated with this notification (either add, update, or delete)
+ * @param notification - the underlying notification associated with this command
+ */
+ public CommandNotification(Command command, Notification notification) {
+ this.notification = Preconditions.checkNotNull(notification);
+ this.command = Preconditions.checkNotNull(command);
+ }
+
+ @Override
+ public String getId() {
+ return notification.getId();
+ }
+
+ /**
+ * Returns {@link Notification} contained by this CommmandNotification.
+ * @return - Notification contained by this Object
+ */
+ public Notification getNotification() {
+ return this.notification;
+ }
+
+ /**
+ * @return Command contained by this Object (either add or delete)
+ */
+ public Command getCommand() {
+ return this.command;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other instanceof CommandNotification) {
+ CommandNotification cn = (CommandNotification) other;
+ return Objects.equal(this.command, cn.command) && Objects.equal(this.notification, cn.notification);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(command, notification);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("command").append("=").append(command.toString()).append(";")
+ .append(notification.toString()).toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
new file mode 100644
index 0000000..aa9e581
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Notification Object used by the Periodic Query Service to inform workers to
+ * process results for a given Periodic Query with the indicated id.
+ * Additionally, this Object contains a period that indicates a frequency at
+ * which regular updates are generated.
+ *
+ */
+public class PeriodicNotification implements Notification {
+
+ private String id;
+ private long period;
+ private TimeUnit periodTimeUnit;
+ private long initialDelay;
+
+ /**
+ * Creates a PeriodicNotification.
+ * @param id - Fluo Query Id that this notification is associated with
+ * @param period - period at which notifications are generated
+ * @param periodTimeUnit - time unit associated with the period and delay
+ * @param initialDelay - amount of time to wait before generating the first notification
+ */
+ public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) {
+ this.id = Preconditions.checkNotNull(id);
+ this.periodTimeUnit = Preconditions.checkNotNull(periodTimeUnit);
+ Preconditions.checkArgument(period > 0 && initialDelay >= 0);
+ this.period = period;
+ this.initialDelay = initialDelay;
+ }
+
+
+ /**
+ * Create a PeriodicNotification
+ * @param other - other PeriodicNotification used in copy constructor
+ */
+ public PeriodicNotification(PeriodicNotification other) {
+ this(other.id, other.period, other.periodTimeUnit, other.initialDelay);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * @return - period at which regular notifications are generated
+ */
+ public long getPeriod() {
+ return period;
+ }
+
+ /**
+ * @return time unit of period and initial delay
+ */
+ public TimeUnit getTimeUnit() {
+ return periodTimeUnit;
+ }
+
+ /**
+ * @return amount of time to delay before beginning to generate notifications
+ */
+ public long getInitialDelay() {
+ return initialDelay;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ String delim = "=";
+ String delim2 = ";";
+ return builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2)
+ .append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim)
+ .append(initialDelay).toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (!(other instanceof PeriodicNotification)) {
+ return false;
+ }
+
+ PeriodicNotification notification = (PeriodicNotification) other;
+ return Objects.equals(this.id, notification.id) && (this.period == notification.period)
+ && Objects.equals(this.periodTimeUnit, notification.periodTimeUnit) && (this.initialDelay == notification.initialDelay);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, period, periodTimeUnit, initialDelay);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private String id;
+ private long period;
+ private TimeUnit periodTimeUnit;
+ private long initialDelay = 0;
+
+ /**
+ * @param id - periodic query id
+ * @return - builder to chain method calls
+ */
+ public Builder id(String id) {
+ this.id = id;
+ return this;
+ }
+
+ /**
+ * @param period of the periodic notification for generating regular notifications
+ * @return - builder to chain method calls
+ */
+ public Builder period(long period) {
+ this.period = period;
+ return this;
+ }
+
+ /**
+ * @param timeUnit of period and initial delay
+ * @return - builder to chain method calls
+ */
+ public Builder timeUnit(TimeUnit timeUnit) {
+ this.periodTimeUnit = timeUnit;
+ return this;
+ }
+
+ /**
+ * @param initialDelay - amount of time to wait before generating notifications
+ * @return - builder to chain method calls
+ */
+ public Builder initialDelay(long initialDelay) {
+ this.initialDelay = initialDelay;
+ return this;
+ }
+
+ /**
+ * Builds PeriodicNotification
+ * @return PeriodicNotification constructed from Builder specified parameters
+ */
+ public PeriodicNotification build() {
+ return new PeriodicNotification(id, period, periodTimeUnit, initialDelay);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
new file mode 100644
index 0000000..38073ce
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link PeriodicNotification} Object used by the Periodic Query Service to inform workers to
+ * process results for a given Periodic Query with the indicated id. Additionally
+ * this Object contains a {@link Date} object to indicate the date time at which this
+ * notification was generated.
+ *
+ */
+public class TimestampedNotification extends PeriodicNotification {
+
+ private Date date;
+
+ /**
+ * Constructs a TimestampedNotification
+ * @param id - Fluo Query Id associated with this Notification
+ * @param period - period at which notifications are generated
+ * @param periodTimeUnit - time unit associated with period and initial delay
+ * @param initialDelay - amount of time to wait before generating first notification
+ */
+ public TimestampedNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) {
+ super(id, period, periodTimeUnit, initialDelay);
+ date = new Date();
+ }
+
+ /**
+ * Creates a TimestampedNotification
+ * @param notification - PeriodicNotification used to create this TimestampedNotification.
+ * This constructor creates a time stamp for the TimestampedNotification.
+ */
+ public TimestampedNotification(PeriodicNotification notification) {
+ super(notification);
+ date = new Date();
+ }
+
+ /**
+ * @return timestamp at which this notification was generated
+ */
+ public Date getTimestamp() {
+ return date;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ";date=" + date;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
new file mode 100644
index 0000000..bb438be
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.registration;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ * Implementation of {@link PeriodicNotificaitonClient} used to register new notification
+ * requests with the PeriodicQueryService.
+ *
+ */
+public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient {
+
+ private KafkaProducer<String, CommandNotification> producer;
+ private String topic;
+
+ public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) {
+ this.topic = topic;
+ this.producer = producer;
+ }
+
+ @Override
+ public void addNotification(PeriodicNotification notification) {
+ processNotification(new CommandNotification(Command.ADD, notification));
+
+ }
+
+ @Override
+ public void deleteNotification(BasicNotification notification) {
+ processNotification(new CommandNotification(Command.DELETE, notification));
+ }
+
+ @Override
+ public void deleteNotification(String notificationId) {
+ processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId)));
+ }
+
+ @Override
+ public void addNotification(String id, long period, long delay, TimeUnit unit) {
+ Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build();
+ processNotification(new CommandNotification(Command.ADD, notification));
+ }
+
+
+ private void processNotification(CommandNotification notification) {
+ producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification));
+ }
+
+ @Override
+ public void close() {
+ producer.close();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
new file mode 100644
index 0000000..bd29d29
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} for {@link BasicNotification}s. Used in {@link CommandNotificationTypeAdapter} to
+ * serialize {@link CommandNotification}s.
+ *
+ */
+public class BasicNotificationTypeAdapter implements JsonDeserializer<BasicNotification>, JsonSerializer<BasicNotification> {
+
+ @Override
+ public JsonElement serialize(BasicNotification arg0, Type arg1, JsonSerializationContext arg2) {
+ JsonObject result = new JsonObject();
+ result.add("id", new JsonPrimitive(arg0.getId()));
+ return result;
+ }
+
+ @Override
+ public BasicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException {
+ JsonObject json = arg0.getAsJsonObject();
+ String id = json.get("id").getAsString();
+ return new BasicNotification(id);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
new file mode 100644
index 0000000..50180ad
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Joiner;
+import com.google.common.primitives.Bytes;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming messages
+ * from Kafka.
+ *
+ */
+public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> {
+
+ private static final Logger log = Logger.getLogger(BindingSetSerDe.class);
+ private static final AccumuloPcjSerializer serializer = new AccumuloPcjSerializer();
+ private static final byte[] DELIM_BYTE = "\u0002".getBytes();
+
+ private byte[] toBytes(BindingSet bindingSet) {
+ try {
+ return getBytes(getVarOrder(bindingSet), bindingSet);
+ } catch(Exception e) {
+ log.trace("Unable to serialize BindingSet: " + bindingSet);
+ return new byte[0];
+ }
+ }
+
+ private BindingSet fromBytes(byte[] bsBytes) {
+ try{
+ int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE);
+ byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex);
+ byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length);
+ VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";"));
+ return getBindingSet(varOrder, bsBytesNoVarOrder);
+ } catch(Exception e) {
+ log.trace("Unable to deserialize BindingSet: " + bsBytes);
+ return new QueryBindingSet();
+ }
+ }
+
+ private VariableOrder getVarOrder(BindingSet bs) {
+ return new VariableOrder(bs.getBindingNames());
+ }
+
+ private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException {
+ byte[] bsBytes = serializer.convert(bs, varOrder);
+ String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders());
+ byte[] varOrderBytes = varOrderString.getBytes("UTF-8");
+ return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes);
+ }
+
+ private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) throws BindingSetConversionException {
+ return serializer.convert(bsBytes, varOrder);
+ }
+
+ @Override
+ public BindingSet deserialize(String topic, byte[] bytes) {
+ return fromBytes(bytes);
+ }
+
+ @Override
+ public void close() {
+ // Do nothing. Nothing to close.
+ }
+
+ @Override
+ public void configure(Map<String, ?> arg0, boolean arg1) {
+ // Do nothing. Nothing to configure.
+ }
+
+ @Override
+ public byte[] serialize(String topic, BindingSet bs) {
+ return toBytes(bs);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
new file mode 100644
index 0000000..302e1be
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s
+ * to and from Kafka.
+ *
+ */
+public class CommandNotificationSerializer implements Serializer<CommandNotification>, Deserializer<CommandNotification> {
+
+ private static Gson gson = new GsonBuilder()
+ .registerTypeHierarchyAdapter(Notification.class, new CommandNotificationTypeAdapter()).create();
+ private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class);
+
+ @Override
+ public CommandNotification deserialize(String topic, byte[] bytes) {
+ String json = null;
+ try {
+ json = new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ LOG.info("Unable to deserialize notification for topic: " + topic);
+ }
+ return gson.fromJson(json, CommandNotification.class);
+ }
+
+ @Override
+ public byte[] serialize(String topic, CommandNotification command) {
+ try {
+ return gson.toJson(command).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ LOG.info("Unable to serialize notification: " + command + "for topic: " + topic);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ // Do nothing. Nothing to close
+ }
+
+ @Override
+ public void configure(Map<String, ?> arg0, boolean arg1) {
+ // Do nothing. Nothing to configure
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
new file mode 100644
index 0000000..a9fb7e1
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} used to serialize and deserialize {@link CommandNotification}s.
+ * This TypeAdapter is used in {@link CommandNotificationSerializer} for producing and
+ * consuming messages to and from Kafka.
+ *
+ */
+public class CommandNotificationTypeAdapter
+ implements JsonDeserializer<CommandNotification>, JsonSerializer<CommandNotification> {
+
+ @Override
+ public JsonElement serialize(CommandNotification arg0, Type arg1, JsonSerializationContext arg2) {
+ JsonObject result = new JsonObject();
+ result.add("command", new JsonPrimitive(arg0.getCommand().name()));
+ Notification notification = arg0.getNotification();
+ if (notification instanceof PeriodicNotification) {
+ result.add("type", new JsonPrimitive(PeriodicNotification.class.getSimpleName()));
+ PeriodicNotificationTypeAdapter adapter = new PeriodicNotificationTypeAdapter();
+ result.add("notification",
+ adapter.serialize((PeriodicNotification) notification, PeriodicNotification.class, arg2));
+ } else if (notification instanceof BasicNotification) {
+ result.add("type", new JsonPrimitive(BasicNotification.class.getSimpleName()));
+ BasicNotificationTypeAdapter adapter = new BasicNotificationTypeAdapter();
+ result.add("notification",
+ adapter.serialize((BasicNotification) notification, BasicNotification.class, arg2));
+ } else {
+ throw new IllegalArgumentException("Invalid notification type.");
+ }
+ return result;
+ }
+
+ @Override
+ public CommandNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2)
+ throws JsonParseException {
+
+ JsonObject json = arg0.getAsJsonObject();
+ Command command = Command.valueOf(json.get("command").getAsString());
+ String type = json.get("type").getAsString();
+ Notification notification = null;
+ if (type.equals(PeriodicNotification.class.getSimpleName())) {
+ notification = (new PeriodicNotificationTypeAdapter()).deserialize(json.get("notification"),
+ PeriodicNotification.class, arg2);
+ } else if (type.equals(BasicNotification.class.getSimpleName())) {
+ notification = (new BasicNotificationTypeAdapter()).deserialize(json.get("notification"),
+ BasicNotification.class, arg2);
+ } else {
+ throw new JsonParseException("Cannot deserialize Json");
+ }
+
+ return new CommandNotification(command, notification);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
new file mode 100644
index 0000000..fcc0ba2
--- /dev/null
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification.Builder;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} used to serialize and deserialize {@link PeriodicNotification}s.
+ * This TypeAdapter is used in {@link CommandNotificationTypeAdapter} which is used in
+ * {@link CommandNotificationSerializer} for producing and consuming messages to and from
+ * Kafka.
+ *
+ */
+public class PeriodicNotificationTypeAdapter
+ implements JsonSerializer<PeriodicNotification>, JsonDeserializer<PeriodicNotification> {
+
+ @Override
+ public PeriodicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2)
+ throws JsonParseException {
+
+ JsonObject json = arg0.getAsJsonObject();
+ String id = json.get("id").getAsString();
+ long period = json.get("period").getAsLong();
+ TimeUnit periodTimeUnit = TimeUnit.valueOf(json.get("timeUnit").getAsString());
+ long initialDelay = json.get("initialDelay").getAsLong();
+ Builder builder = PeriodicNotification.builder().id(id).period(period)
+ .initialDelay(initialDelay).timeUnit(periodTimeUnit);
+
+ return builder.build();
+ }
+
+ @Override
+ public JsonElement serialize(PeriodicNotification arg0, Type arg1, JsonSerializationContext arg2) {
+
+ JsonObject result = new JsonObject();
+ result.add("id", new JsonPrimitive(arg0.getId()));
+ result.add("period", new JsonPrimitive(arg0.getPeriod()));
+ result.add("initialDelay", new JsonPrimitive(arg0.getInitialDelay()));
+ result.add("timeUnit", new JsonPrimitive(arg0.getTimeUnit().name()));
+
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/pom.xml b/extras/periodic.notification/pom.xml
new file mode 100644
index 0000000..814f14f
--- /dev/null
+++ b/extras/periodic.notification/pom.xml
@@ -0,0 +1,40 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rya.periodic.notification.parent</artifactId>
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.extras</artifactId>
+ <version>3.2.11-incubating-SNAPSHOT</version>
+ </parent>
+
+ <name>Apache Rya Periodic Notification Parent</name>
+ <description>Parent POM for Rya Periodic Notification Projects</description>
+
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>api</module>
+ <module>service</module>
+ <module>tests</module>
+ </modules>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/pom.xml b/extras/periodic.notification/service/pom.xml
new file mode 100644
index 0000000..cc78646
--- /dev/null
+++ b/extras/periodic.notification/service/pom.xml
@@ -0,0 +1,102 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <!-- Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with this
+ work for additional information regarding copyright ownership. The ASF licenses
+ this file to you under the Apache License, Version 2.0 (the "License"); you
+ may not use this file except in compliance with the License. You may obtain
+ a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+ required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.notification.parent</artifactId>
+ <version>3.2.11-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.periodic.notification.service</artifactId>
+
+ <name>Apache Rya Periodic Notification Service</name>
+ <description>Notifications for Rya Periodic Service</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.twill</groupId>
+ <artifactId>twill-api</artifactId>
+ <version>0.11.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.twill</groupId>
+ <artifactId>twill-yarn</artifactId>
+ <version>0.11.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>kafka_2.10</artifactId>
+ <groupId>org.apache.kafka</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-query</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing.pcj</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.pcj.fluo.app</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.notification.api</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
new file mode 100644
index 0000000..b2c3709
--- /dev/null
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+/**
+ * Exception thrown when attempting to create a {@link PeriodicNotificationApplication}.
+ * Indicates that a factory was unable to create some component of the application
+ * because something was configured incorrectly.
+ *
+ */
+public class PeriodicApplicationException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a PeriodicApplicationException.
+ * @param message - message contained in Exception
+ */
+ public PeriodicApplicationException(String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a PeriodicApplicationException.
+ * @param message - message contained in Exception
+ * @param t - Exception that spawned this PeriodicApplicationException
+ */
+ public PeriodicApplicationException(String message, Throwable t) {
+ super(message, t);
+ }
+}