You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:01:55 UTC
[2/9] incubator-rya git commit: RYA-280-Periodic Query Service.
Closes #177.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
new file mode 100644
index 0000000..3e9e0d1
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public interface Notification {
+
+ /**
+ * @return id of a Periodic Query
+ */
+ public String getId();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
new file mode 100644
index 0000000..d53dc17
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+
+/**
+ * Object that manages the periodic notifications for the Periodic Query Service.
+ * This Object processes new requests for periodic updates by registering them with
+ * some sort of service that generates periodic updates (such as a {@link ScheduledExecutorService}).
+ *
+ */
+public interface NotificationCoordinatorExecutor extends LifeCycle {
+
+ /**
+ * Registers or deletes a {@link CommandNotification}s with the periodic service to
+ * generate notifications at a regular interval indicated by the CommandNotification.
+ * @param notification - CommandNotification to be registered or deleted from the periodic update
+ * service.
+ */
+ public void processNextCommandNotification(CommandNotification notification);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
new file mode 100644
index 0000000..4ac9089
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
@@ -0,0 +1,41 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+
+/**
+ * Object that processes new {@link TimestampedNotification}s generated by {@link NotificationCoordinatorExecutor}.
+ * It is expected that the NotificationCoordinatorExecutor will this Object with notifications to perform work via some sort
+ * sort of queuing service such as a BlockingQueue or Kafka. This Object processes the notifications by retrieving
+ * query results associated with the Periodic Query id given by {@link TimestampedNotification#getId()}, parsing them
+ * and then providing them to another service to be exported.
+ *
+ */
+public interface NotificationProcessor {
+
+ /**
+ * Processes {@link TimestampedNotification}s by retrieving the Periodic Query results
+ * associated the query id given by {@link TimestampedNotification#getId()}.
+ * @param notification - contains information about which query results to retrieve
+ */
+ public void processNotification(TimestampedNotification notification);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
new file mode 100644
index 0000000..ff08733
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.api;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ * Object to register {@link PeriodicNotification}s with an external queuing
+ * service to be handled by a {@link NotificationCoordinatorExecutor} service.
+ * The service will generate notifications to process Periodic Query results at regular
+ * intervals corresponding the period of the PeriodicNotification.
+ *
+ */
+public interface PeriodicNotificationClient extends AutoCloseable {
+
+ /**
+ * Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor}
+ * @param notification - notification to be added
+ */
+ public void addNotification(PeriodicNotification notification);
+
+ /**
+ * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
+ * @param notification - notification to be deleted
+ */
+ public void deleteNotification(BasicNotification notification);
+
+ /**
+ * Deletes a notification from the {@link NotificationCoordinatorExecutor}.
+ * @param notification - id corresponding to the notification to be deleted
+ */
+ public void deleteNotification(String notificationId);
+
+ /**
+ * Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor}
+ * @param id - Periodic Query id
+ * @param period - period indicating frequency at which notifications will be generated
+ * @param delay - initial delay for starting periodic notifications
+ * @param unit - time unit of delay and period
+ */
+ public void addNotification(String id, long period, long delay, TimeUnit unit);
+
+ public void close();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
new file mode 100644
index 0000000..b2c3709
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+/**
+ * Exception thrown when attempting to create a {@link PeriodicNotificationApplication}.
+ * Indicates that a factory was unable to create some component of the application
+ * because something was configured incorrectly.
+ *
+ */
+public class PeriodicApplicationException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a PeriodicApplicationException.
+ * @param message - message contained in Exception
+ */
+ public PeriodicApplicationException(String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a PeriodicApplicationException.
+ * @param message - message contained in Exception
+ * @param t - Exception that spawned this PeriodicApplicationException
+ */
+ public PeriodicApplicationException(String message, Throwable t) {
+ super(message, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
new file mode 100644
index 0000000..6dd7126
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
+import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
+import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
+import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The PeriodicNotificationApplication runs the key components of the Periodic
+ * Query Service. It consists of a {@link KafkaNotificationProvider}, a
+ * {@link NotificationCoordinatorExecutor}, a
+ * {@link NotificationProcessorExecutor}, a {@link KafkaExporterExecutor}, and a
+ * {@link PeriodicQueryPrunerExecutor}. These services run in coordination with
+ * one another to perform the following tasks in the indicated order: <br>
+ * <li>Retrieve new requests to generate periodic notifications from Kafka
+ * <li>Register them with the {@link NotificationCoordinatorExecutor} to
+ * generate the periodic notifications
+ * <li>As notifications are generated, they are added to a work queue that is
+ * monitored by the {@link NotificationProcessorExecutor}.
+ * <li>The processor processes the notifications by reading all of the query
+ * results corresponding to the bin and query id indicated by the notification.
+ * <li>After reading the results, the processor adds a {@link BindingSetRecord}
+ * to a work queue monitored by the {@link KafkaExporterExecutor}.
+ * <li>The processor then adds a {@link NodeBin} to a workqueue monitored by the
+ * {@link BinPruner}
+ * <li>The exporter processes the BindingSetRecord by exporing the result to
+ * Kafka
+ * <li>The BinPruner processes the NodeBin by cleaning up the results for the
+ * indicated bin and query in Accumulo and Fluo. <br>
+ * <br>
+ * The purpose of this Periodic Query Service is to facilitate the ability to
+ * answer Periodic Queries using the Rya Fluo application, where a Periodic
+ * Query is any query requesting periodic updates about events that occurred
+ * within a given window of time of this instant. This is also known as a
+ * rolling window query. Period Queries can be expressed using SPARQL by
+ * including the {@link Function} indicated by the URI
+ * {@link PeriodicQueryUtil#PeriodicQueryURI}. The user must provide this
+ * Function with the following arguments: the temporal variable in the query
+ * that will be filtered on, the window of time that events must occur within,
+ * the period at which the user wants to receive updates, and the time unit. The
+ * following query requests all observations that occurred within the last
+ * minute and requests updates every 15 seconds. It also performs a count on
+ * those observations. <br>
+ * <br>
+ * <li>prefix function: http://org.apache.rya/function#
+ * <li>"prefix time: http://www.w3.org/2006/time#
+ * <li>"select (count(?obs) as ?total) where {
+ * <li>"Filter(function:periodic(?time, 1, .25, time:minutes))
+ * <li>"?obs uri:hasTime ?time.
+ * <li>"?obs uri:hasId ?id }
+ * <li>
+ */
+public class PeriodicNotificationApplication implements LifeCycle {
+
+ private static final Logger log = Logger.getLogger(PeriodicNotificationApplication.class);
+ private NotificationCoordinatorExecutor coordinator;
+ private KafkaNotificationProvider provider;
+ private PeriodicQueryPrunerExecutor pruner;
+ private NotificationProcessorExecutor processor;
+ private KafkaExporterExecutor exporter;
+ private boolean running = false;
+
+ /**
+ * Creates a PeriodicNotificationApplication
+ * @param provider - {@link KafkaNotificationProvider} that retrieves new Notificaiton requests from Kafka
+ * @param coordinator - {NotificationCoordinator} that manages PeriodicNotifications.
+ * @param processor - {@link NotificationProcessorExecutor} that processes PeriodicNotifications
+ * @param exporter - {@link KafkaExporterExecutor} that exports periodic results
+ * @param pruner - {@link PeriodicQueryPrunerExecutor} that cleans up old periodic bins
+ */
+ public PeriodicNotificationApplication(KafkaNotificationProvider provider, NotificationCoordinatorExecutor coordinator,
+ NotificationProcessorExecutor processor, KafkaExporterExecutor exporter, PeriodicQueryPrunerExecutor pruner) {
+ this.provider = Preconditions.checkNotNull(provider);
+ this.coordinator = Preconditions.checkNotNull(coordinator);
+ this.processor = Preconditions.checkNotNull(processor);
+ this.exporter = Preconditions.checkNotNull(exporter);
+ this.pruner = Preconditions.checkNotNull(pruner);
+ }
+
+ @Override
+ public void start() {
+ if (!running) {
+ log.info("Starting PeriodicNotificationApplication.");
+ coordinator.start();
+ provider.start();
+ processor.start();
+ pruner.start();
+ exporter.start();
+ running = true;
+ }
+ }
+
+ @Override
+ public void stop() {
+ log.info("Stopping PeriodicNotificationApplication.");
+ provider.stop();
+ coordinator.stop();
+ processor.stop();
+ pruner.stop();
+ exporter.stop();
+ running = false;
+ }
+
+ /**
+ * @return boolean indicating whether the application is running
+ */
+ @Override
+ public boolean currentlyRunning() {
+ return running;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private PeriodicQueryPrunerExecutor pruner;
+ private KafkaNotificationProvider provider;
+ private NotificationProcessorExecutor processor;
+ private KafkaExporterExecutor exporter;
+ private NotificationCoordinatorExecutor coordinator;
+
+ /**
+ * Sets the PeriodicQueryPrunerExecutor.
+ * @param pruner - PeriodicQueryPrunerExecutor for cleaning up old periodic bins
+ * @return this Builder for chaining method calls
+ */
+ public Builder setPruner(PeriodicQueryPrunerExecutor pruner) {
+ this.pruner = pruner;
+ return this;
+ }
+
+ /**
+ * Sets the KafkaNotificationProvider
+ * @param provider - KafkaNotificationProvider for retrieving new periodic notification requests from Kafka
+ * @return this Builder for chaining method calls
+ */
+ public Builder setProvider(KafkaNotificationProvider provider) {
+ this.provider = provider;
+ return this;
+ }
+
+ public Builder setProcessor(NotificationProcessorExecutor processor) {
+ this.processor = processor;
+ return this;
+ }
+
+ /**
+ * Sets KafkaExporterExecutor
+ * @param exporter for exporting periodic query results to Kafka
+ * @return this Builder for chaining method calls
+ */
+ public Builder setExporter(KafkaExporterExecutor exporter) {
+ this.exporter = exporter;
+ return this;
+ }
+
+ /**
+ * Sets NotificationCoordinatorExecutor
+ * @param coordinator for managing and generating periodic notifications
+ * @return this Builder for chaining method calls
+ */
+ public Builder setCoordinator(NotificationCoordinatorExecutor coordinator) {
+ this.coordinator = coordinator;
+ return this;
+ }
+
+ /**
+ * Creates a PeriodicNotificationApplication
+ * @return PeriodicNotificationApplication for periodically polling Rya Fluo Application
+ */
+ public PeriodicNotificationApplication build() {
+ return new PeriodicNotificationApplication(provider, coordinator, processor, exporter, pruner);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
new file mode 100644
index 0000000..d69efe5
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import java.util.Properties;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+
+import jline.internal.Preconditions;
+
+/**
+ * Configuration object for creating a {@link PeriodicNotificationApplication}.
+ */
+public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfConfiguration {
+
+ public static String FLUO_APP_NAME = "fluo.app.name";
+ public static String FLUO_TABLE_NAME = "fluo.table.name";
+ public static String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+ public static String NOTIFICATION_TOPIC = "kafka.notification.topic";
+ public static String NOTIFICATION_GROUP_ID = "kafka.notification.group.id";
+ public static String NOTIFICATION_CLIENT_ID = "kafka.notification.client.id";
+ public static String COORDINATOR_THREADS = "cep.coordinator.threads";
+ public static String PRODUCER_THREADS = "cep.producer.threads";
+ public static String EXPORTER_THREADS = "cep.exporter.threads";
+ public static String PROCESSOR_THREADS = "cep.processor.threads";
+ public static String PRUNER_THREADS = "cep.pruner.threads";
+
+ public PeriodicNotificationApplicationConfiguration() {}
+
+ /**
+ * Creates an PeriodicNotificationApplicationConfiguration object from a Properties file. This method assumes
+ * that all values in the Properties file are Strings and that the Properties file uses the keys below.
+ * See rya.cep/cep.integration.tests/src/test/resources/properties/notification.properties for an example.
+ * <br>
+ * <ul>
+ * <li>"accumulo.auths" - String of Accumulo authorizations. Default is empty String.
+ * <li>"accumulo.instance" - Accumulo instance name (required)
+ * <li>"accumulo.user" - Accumulo user (required)
+ * <li>"accumulo.password" - Accumulo password (required)
+ * <li>"accumulo.rya.prefix" - Prefix for Accumulo backed Rya instance. Default is "rya_"
+ * <li>"accumulo.zookeepers" - Zookeepers for underlying Accumulo instance (required)
+ * <li>"fluo.app.name" - Name of Fluo Application (required)
+ * <li>"fluo.table.name" - Name of Fluo Table (required)
+ * <li>"kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers and Consumers (required)
+ * <li>"kafka.notification.topic" - Topic to which new Periodic Notifications are published. Default is "notifications".
+ * <li>"kafka.notification.client.id" - Client Id for notification topic. Default is "consumer0"
+ * <li>"kafka.notification.group.id" - Group Id for notification topic. Default is "group0"
+ * <li>"cep.coordinator.threads" - Number of threads used by coordinator. Default is 1.
+ * <li>"cep.producer.threads" - Number of threads used by producer. Default is 1.
+ * <li>"cep.exporter.threads" - Number of threads used by exporter. Default is 1.
+ * <li>"cep.processor.threads" - Number of threads used by processor. Default is 1.
+ * <li>"cep.pruner.threads" - Number of threads used by pruner. Default is 1.
+ * </ul>
+ * <br>
+ * @param props - Properties file containing Accumulo specific configuration parameters
+ * @return AccumumuloRdfConfiguration with properties set
+ */
+ public PeriodicNotificationApplicationConfiguration(Properties props) {
+ super(fromProperties(props));
+ setFluoAppName(props.getProperty(FLUO_APP_NAME));
+ setFluoTableName(props.getProperty(FLUO_TABLE_NAME));
+ setBootStrapServers(props.getProperty(KAFKA_BOOTSTRAP_SERVERS));
+ setNotificationClientId(props.getProperty(NOTIFICATION_CLIENT_ID, "consumer0"));
+ setNotificationTopic(props.getProperty(NOTIFICATION_TOPIC, "notifications"));
+ setNotificationGroupId(props.getProperty(NOTIFICATION_GROUP_ID, "group0"));
+ setProducerThreads(Integer.parseInt(props.getProperty(PRODUCER_THREADS, "1")));
+ setProcessorThreads(Integer.parseInt(props.getProperty(PROCESSOR_THREADS, "1")));
+ setExporterThreads(Integer.parseInt(props.getProperty(EXPORTER_THREADS, "1")));
+ setPrunerThreads(Integer.parseInt(props.getProperty(PRUNER_THREADS, "1")));
+ setCoordinatorThreads(Integer.parseInt(props.getProperty(COORDINATOR_THREADS, "1")));
+ }
+
+ /**
+ * Sets the name of the Fluo Application
+ * @param fluoAppName
+ */
+ public void setFluoAppName(String fluoAppName) {
+ set(FLUO_APP_NAME, Preconditions.checkNotNull(fluoAppName));
+ }
+
+ /**
+ * Sets the name of the Fluo table
+ * @param fluoTableName
+ */
+ public void setFluoTableName(String fluoTableName) {
+ set(FLUO_TABLE_NAME, Preconditions.checkNotNull(fluoTableName));
+ }
+
+ /**
+ * Sets the Kafka bootstrap servers
+ * @param bootStrapServers
+ */
+ public void setBootStrapServers(String bootStrapServers) {
+ set(KAFKA_BOOTSTRAP_SERVERS, Preconditions.checkNotNull(bootStrapServers));
+ }
+
+ /**
+ * Sets the Kafka topic name for new notification requests
+ * @param notificationTopic
+ */
+ public void setNotificationTopic(String notificationTopic) {
+ set(NOTIFICATION_TOPIC, Preconditions.checkNotNull(notificationTopic));
+ }
+
+ /**
+ * Sets the GroupId for new notification request topic
+ * @param notificationGroupId
+ */
+ public void setNotificationGroupId(String notificationGroupId) {
+ set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationGroupId));
+ }
+
+ /**
+ * Sets the ClientId for the Kafka notification topic
+ * @param notificationClientId
+ */
+ public void setNotificationClientId(String notificationClientId) {
+ set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationClientId));
+ }
+
+ /**
+ * Sets the number of threads for the coordinator
+ * @param threads
+ */
+ public void setCoordinatorThreads(int threads) {
+ setInt(COORDINATOR_THREADS, threads);
+ }
+
+ /**
+ * Sets the number of threads for the exporter
+ * @param threads
+ */
+ public void setExporterThreads(int threads) {
+ setInt(EXPORTER_THREADS, threads);
+ }
+
+ /**
+ * Sets the number of threads for the producer for reading new periodic notifications
+ * @param threads
+ */
+ public void setProducerThreads(int threads) {
+ setInt(PRODUCER_THREADS, threads);
+ }
+
+ /**
+ * Sets the number of threads for the bin pruner
+ * @param threads
+ */
+ public void setPrunerThreads(int threads) {
+ setInt(PRUNER_THREADS, threads);
+ }
+
+ /**
+ * Sets the number of threads for the Notification processor
+ * @param threads
+ */
+ public void setProcessorThreads(int threads) {
+ setInt(PROCESSOR_THREADS, threads);
+ }
+
+ /**
+ * @return name of the Fluo application
+ */
+ public String getFluoAppName() {
+ return get(FLUO_APP_NAME);
+ }
+
+ /**
+ * @return name of the Fluo table
+ */
+ public String getFluoTableName() {
+ return get(FLUO_TABLE_NAME);
+ }
+
+ /**
+ * @return Kafka bootstrap servers
+ */
+ public String getBootStrapServers() {
+ return get(KAFKA_BOOTSTRAP_SERVERS);
+ }
+
+ /**
+ * @return notification topic
+ */
+ public String getNotificationTopic() {
+ return get(NOTIFICATION_TOPIC, "notifications");
+ }
+
+ /**
+ * @return Kafka GroupId for the notificaton topic
+ */
+ public String getNotificationGroupId() {
+ return get(NOTIFICATION_GROUP_ID, "group0");
+ }
+
+ /**
+ * @return Kafka ClientId for the notification topic
+ */
+ public String getNotificationClientId() {
+ return get(NOTIFICATION_CLIENT_ID, "consumer0");
+ }
+
+ /**
+ * @return the number of threads for the coordinator
+ */
+ public int getCoordinatorThreads() {
+ return getInt(COORDINATOR_THREADS, 1);
+ }
+
+ /**
+ * @return the number of threads for the exporter
+ */
+ public int getExporterThreads() {
+ return getInt(EXPORTER_THREADS, 1);
+ }
+
+ /**
+ * @return the number of threads for the notification producer
+ */
+ public int getProducerThreads() {
+ return getInt(PRODUCER_THREADS, 1);
+ }
+
+ /**
+ * @return the number of threads for the bin pruner
+ */
+ public int getPrunerThreads() {
+ return getInt(PRUNER_THREADS, 1);
+ }
+
+ /**
+ * @return number of threads for the processor
+ */
+ public int getProcessorThreads() {
+ return getInt(PROCESSOR_THREADS, 1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
new file mode 100644
index 0000000..248b2bf
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
+import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
+import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
+import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
+import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Factory for creating a {@link PeriodicNotificationApplication}.
+ */
+public class PeriodicNotificationApplicationFactory {
+
+ /**
+ * Create a PeriodicNotificationApplication.
+ * @param props - Properties file that specifies the parameters needed to create the application
+ * @return PeriodicNotificationApplication to periodically poll Rya Fluo for new results
+ * @throws PeriodicApplicationException
+ */
+ public static PeriodicNotificationApplication getPeriodicApplication(Properties props) throws PeriodicApplicationException {
+ PeriodicNotificationApplicationConfiguration conf = new PeriodicNotificationApplicationConfiguration(props);
+ Properties kafkaProps = getKafkaProperties(conf);
+
+ BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+ BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
+ BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();
+
+ FluoClient fluo = null;
+ try {
+ PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf);
+ fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf);
+ NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications);
+ addRegisteredNotices(coordinator, fluo.newSnapshot());
+ KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProps, bindingSets);
+ PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins);
+ NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads());
+ KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaProps);
+ return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter)
+ .setProcessor(processor).setPruner(pruner).build();
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new PeriodicApplicationException(e.getMessage());
+ }
+ }
+
+ private static void addRegisteredNotices(NotificationCoordinatorExecutor coord, Snapshot sx) {
+ coord.start();
+ PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
+ provider.processRegisteredNotifications(coord, sx);
+ }
+
+ private static NotificationCoordinatorExecutor getCoordinator(int numThreads, BlockingQueue<TimestampedNotification> notifications) {
+ return new PeriodicNotificationCoordinatorExecutor(numThreads, notifications);
+ }
+
+ private static KafkaExporterExecutor getExporter(int numThreads, Properties props, BlockingQueue<BindingSetRecord> bindingSets) {
+ KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe());
+ return new KafkaExporterExecutor(producer, numThreads, bindingSets);
+ }
+
+ private static PeriodicQueryPrunerExecutor getPruner(PeriodicQueryResultStorage storage, FluoClient fluo, int numThreads,
+ BlockingQueue<NodeBin> bins) {
+ return new PeriodicQueryPrunerExecutor(storage, fluo, numThreads, bins);
+ }
+
+ private static NotificationProcessorExecutor getProcessor(PeriodicQueryResultStorage periodicStorage,
+ BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets,
+ int numThreads) {
+ return new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, numThreads);
+ }
+
+ private static KafkaNotificationProvider getProvider(int numThreads, String topic, NotificationCoordinatorExecutor coord,
+ Properties props) {
+ return new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord,
+ numThreads);
+ }
+
+ private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(PeriodicNotificationApplicationConfiguration conf)
+ throws AccumuloException, AccumuloSecurityException {
+ Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers());
+ Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword()));
+ String ryaInstance = conf.getTablePrefix();
+ return new AccumuloPeriodicQueryResultStorage(conn, ryaInstance);
+ }
+
+ private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) {
+ Properties kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
+ kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId());
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
+ kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return kafkaProps;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java
new file mode 100644
index 0000000..0486244
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.coordinator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.api.NotificationProcessor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of {@link NotificationCoordinatorExecutor} that generates regular notifications
+ * as indicated by {@link PeriodicNotification}s that are registered with this Object. When notifications
+ * are generated they are placed on a work queue to be processed by the {@link NotificationProcessor}.
+ *
+ */
+public class PeriodicNotificationCoordinatorExecutor implements NotificationCoordinatorExecutor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PeriodicNotificationCoordinatorExecutor.class);
+ private int numThreads;
+ private ScheduledExecutorService producerThreadPool;
+ private Map<String, ScheduledFuture<?>> serviceMap = new HashMap<>();
+ private BlockingQueue<TimestampedNotification> notifications;
+ private final ReentrantLock lock = new ReentrantLock(true);
+ private boolean running = false;
+
+ public PeriodicNotificationCoordinatorExecutor(int numThreads, BlockingQueue<TimestampedNotification> notifications) {
+ this.numThreads = numThreads;
+ this.notifications = notifications;
+ }
+
+ @Override
+ public void processNextCommandNotification(CommandNotification notification) {
+ lock.lock();
+ try {
+ processNotification(notification);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void start() {
+ if (!running) {
+ producerThreadPool = Executors.newScheduledThreadPool(numThreads);
+ running = true;
+ }
+ }
+
+ @Override
+ public void stop() {
+
+ if (producerThreadPool != null) {
+ producerThreadPool.shutdown();
+ }
+
+ running = false;
+
+ try {
+ if (!producerThreadPool.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+ producerThreadPool.shutdownNow();
+ }
+ } catch (Exception e) {
+ LOG.info("Service Executor Shutdown has been called. Terminating NotificationRunnable");
+ }
+ }
+
+ private void processNotification(CommandNotification notification) {
+ Command command = notification.getCommand();
+ Notification periodic = notification.getNotification();
+ switch (command) {
+ case ADD:
+ addNotification(periodic);
+ break;
+ case DELETE:
+ deleteNotification(periodic);
+ break;
+ }
+ }
+
+ private void addNotification(Notification notification) {
+ Preconditions.checkArgument(notification instanceof PeriodicNotification);
+ PeriodicNotification notify = (PeriodicNotification) notification;
+ if (!serviceMap.containsKey(notification.getId())) {
+ ScheduledFuture<?> future = producerThreadPool.scheduleAtFixedRate(new NotificationProducer(notify), notify.getInitialDelay(),
+ notify.getPeriod(), notify.getTimeUnit());
+ serviceMap.put(notify.getId(), future);
+ }
+ }
+
+ private boolean deleteNotification(Notification notification) {
+ if (serviceMap.containsKey(notification.getId())) {
+ ScheduledFuture<?> future = serviceMap.remove(notification.getId());
+ future.cancel(true);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Scheduled Task that places a {@link PeriodicNotification}
+ * in the work queue at regular intervals.
+ *
+ */
+ class NotificationProducer implements Runnable {
+
+ private PeriodicNotification notification;
+
+ public NotificationProducer(PeriodicNotification notification) {
+ this.notification = notification;
+ }
+
+ public void run() {
+ try {
+ notifications.put(new TimestampedNotification(notification));
+ } catch (InterruptedException e) {
+ LOG.info("Unable to add notification. Process interrupted. ");
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ @Override
+ public boolean currentlyRunning() {
+ return running;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/BindingSetRecord.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/BindingSetRecord.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/BindingSetRecord.java
new file mode 100644
index 0000000..471b021
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/BindingSetRecord.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.exporter;
+
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Objects;
+
+/**
+ * Object that associates a {@link BindingSet} with a given Kafka topic.
+ * This ensures that the {@link KafkaPeriodicBindingSetExporter} can export
+ * each BindingSet to its appropriate topic.
+ *
+ */
+public class BindingSetRecord {
+
+ private BindingSet bs;
+ private String topic;
+
+ public BindingSetRecord(BindingSet bs, String topic) {
+ this.bs = bs;
+ this.topic = topic;
+ }
+
+ /**
+ * @return BindingSet in this BindingSetRecord
+ */
+ public BindingSet getBindingSet() {
+ return bs;
+ }
+
+ /**
+ * @return Kafka topic for this BindingSetRecord
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if(this == o) {
+ return true;
+ }
+
+ if(o instanceof BindingSetRecord) {
+ BindingSetRecord record = (BindingSetRecord) o;
+ return Objects.equal(this.bs, record.bs)&&Objects.equal(this.topic,record.topic);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(bs, topic);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("Binding Set Record \n").append(" Topic: " + topic + "\n").append(" BindingSet: " + bs + "\n")
+ .toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
new file mode 100644
index 0000000..4880015
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.exporter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.log4j.Logger;
+import org.apache.rya.periodic.notification.api.BindingSetExporter;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.openrdf.query.BindingSet;
+
+import jline.internal.Preconditions;
+
+/**
+ * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s.
+ *
+ */
+public class KafkaExporterExecutor implements LifeCycle {
+
+ private static final Logger log = Logger.getLogger(BindingSetExporter.class);
+ private KafkaProducer<String, BindingSet> producer;
+ private BlockingQueue<BindingSetRecord> bindingSets;
+ private ExecutorService executor;
+ private List<KafkaPeriodicBindingSetExporter> exporters;
+ private int num_Threads;
+ private boolean running = false;
+
+ /**
+ * Creates a KafkaExporterExecutor for exporting periodic query results to Kafka.
+ * @param producer for publishing results to Kafka
+ * @param num_Threads number of threads used to publish results
+ * @param bindingSets - work queue containing {@link BindingSet}s to be published
+ */
+ public KafkaExporterExecutor(KafkaProducer<String, BindingSet> producer, int num_Threads, BlockingQueue<BindingSetRecord> bindingSets) {
+ Preconditions.checkNotNull(producer);
+ Preconditions.checkNotNull(bindingSets);
+ this.producer = producer;
+ this.bindingSets = bindingSets;
+ this.num_Threads = num_Threads;
+ this.exporters = new ArrayList<>();
+ }
+
+ @Override
+ public void start() {
+ if (!running) {
+ executor = Executors.newFixedThreadPool(num_Threads);
+
+ for (int threadNumber = 0; threadNumber < num_Threads; threadNumber++) {
+ log.info("Creating exporter:" + threadNumber);
+ KafkaPeriodicBindingSetExporter exporter = new KafkaPeriodicBindingSetExporter(producer, threadNumber, bindingSets);
+ exporters.add(exporter);
+ executor.submit(exporter);
+ }
+ running = true;
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (executor != null) {
+ executor.shutdown();
+ }
+
+ if (exporters != null && exporters.size() > 0) {
+ exporters.forEach(x -> x.shutdown());
+ }
+
+ if (producer != null) {
+ producer.close();
+ }
+
+ running = false;
+ try {
+ if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+ log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ log.info("Interrupted during shutdown, exiting uncleanly");
+ }
+ }
+
+ @Override
+ public boolean currentlyRunning() {
+ return running;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
new file mode 100644
index 0000000..9baede3
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.exporter;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.periodic.notification.api.BindingSetExporter;
+import org.openrdf.model.Literal;
+import org.openrdf.query.BindingSet;
+
+import jline.internal.Preconditions;
+
+/**
+ * Object that exports {@link BindingSet}s to the Kafka topic indicated by
+ * the {@link BindingSetRecord}.
+ *
+ */
+public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, Runnable {
+
+ private static final Logger log = Logger.getLogger(BindingSetExporter.class);
+ private KafkaProducer<String, BindingSet> producer;
+ private BlockingQueue<BindingSetRecord> bindingSets;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+ private int threadNumber;
+
+ public KafkaPeriodicBindingSetExporter(KafkaProducer<String, BindingSet> producer, int threadNumber,
+ BlockingQueue<BindingSetRecord> bindingSets) {
+ Preconditions.checkNotNull(producer);
+ Preconditions.checkNotNull(bindingSets);
+ this.threadNumber = threadNumber;
+ this.producer = producer;
+ this.bindingSets = bindingSets;
+ }
+
+ /**
+ * Exports BindingSets to Kafka. The BindingSet and topic are extracted from
+ * the indicated BindingSetRecord and the BindingSet is then exported to the topic.
+ */
+ @Override
+ public void exportNotification(BindingSetRecord record) throws ResultExportException {
+ String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID;
+ BindingSet bindingSet = record.getBindingSet();
+ String topic = record.getTopic();
+ long binId = ((Literal) bindingSet.getValue(bindingName)).longValue();
+ final Future<RecordMetadata> future = producer
+ .send(new ProducerRecord<String, BindingSet>(topic, Long.toString(binId), bindingSet));
+ try {
+ //wait for confirmation that results have been received
+ future.get(5, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new ResultExportException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!closed.get()) {
+ exportNotification(bindingSets.take());
+ }
+ } catch (InterruptedException | ResultExportException e) {
+ log.trace("Thread " + threadNumber + " is unable to process message.");
+ }
+ }
+
+
+ public void shutdown() {
+ closed.set(true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
new file mode 100644
index 0000000..c31a5c0
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public class BasicNotification implements Notification {
+
+ private String id;
+
+ /**
+ * Creates a BasicNotification
+ * @param id - Fluo query id associated with this Notification
+ */
+ public BasicNotification(String id) {
+ this.id = id;
+ }
+
+ /**
+ * @return the Fluo Query Id that this notification will generate results for
+ */
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other instanceof BasicNotification) {
+ BasicNotification not = (BasicNotification) other;
+ return Objects.equal(this.id, not.id);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ return builder.append("id").append("=").append(id).toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
new file mode 100644
index 0000000..597b228
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This Object contains a Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic Query with the
+ * indicated id. Additionally, the CommandNotification contains a
+ * {@link Command} about which action the
+ * {@link NotificationCoordinatorExecutor} should take (adding or deleting).
+ * CommandNotifications are meant to be added to an external work queue (such as
+ * Kafka) to be processed by the NotificationCoordinatorExecutor.
+ *
+ */
+public class CommandNotification implements Notification {
+
+ private Notification notification;
+ private Command command;
+
+ public enum Command {
+ ADD, DELETE
+ };
+
+ /**
+ * Creates a new CommandNotification
+ * @param command - the command associated with this notification (either add, update, or delete)
+ * @param notification - the underlying notification associated with this command
+ */
+ public CommandNotification(Command command, Notification notification) {
+ this.notification = Preconditions.checkNotNull(notification);
+ this.command = Preconditions.checkNotNull(command);
+ }
+
+ @Override
+ public String getId() {
+ return notification.getId();
+ }
+
+ /**
+ * Returns {@link Notification} contained by this CommmandNotification.
+ * @return - Notification contained by this Object
+ */
+ public Notification getNotification() {
+ return this.notification;
+ }
+
+ /**
+ * @return Command contained by this Object (either add or delete)
+ */
+ public Command getCommand() {
+ return this.command;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other instanceof CommandNotification) {
+ CommandNotification cn = (CommandNotification) other;
+ return Objects.equal(this.command, cn.command) && Objects.equal(this.notification, cn.notification);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(command, notification);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("command").append("=").append(command.toString()).append(";")
+ .append(notification.toString()).toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
new file mode 100644
index 0000000..aa9e581
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Notification Object used by the Periodic Query Service to inform workers to
+ * process results for a given Periodic Query with the indicated id.
+ * Additionally, this Object contains a period that indicates a frequency at
+ * which regular updates are generated.
+ *
+ */
+public class PeriodicNotification implements Notification {
+
+ private String id;
+ private long period;
+ private TimeUnit periodTimeUnit;
+ private long initialDelay;
+
+ /**
+ * Creates a PeriodicNotification.
+ * @param id - Fluo Query Id that this notification is associated with
+ * @param period - period at which notifications are generated
+ * @param periodTimeUnit - time unit associated with the period and delay
+ * @param initialDelay - amount of time to wait before generating the first notification
+ */
+ public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) {
+ this.id = Preconditions.checkNotNull(id);
+ this.periodTimeUnit = Preconditions.checkNotNull(periodTimeUnit);
+ Preconditions.checkArgument(period > 0 && initialDelay >= 0);
+ this.period = period;
+ this.initialDelay = initialDelay;
+ }
+
+
+ /**
+ * Create a PeriodicNotification
+ * @param other - other PeriodicNotification used in copy constructor
+ */
+ public PeriodicNotification(PeriodicNotification other) {
+ this(other.id, other.period, other.periodTimeUnit, other.initialDelay);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * @return - period at which regular notifications are generated
+ */
+ public long getPeriod() {
+ return period;
+ }
+
+ /**
+ * @return time unit of period and initial delay
+ */
+ public TimeUnit getTimeUnit() {
+ return periodTimeUnit;
+ }
+
+ /**
+ * @return amount of time to delay before beginning to generate notifications
+ */
+ public long getInitialDelay() {
+ return initialDelay;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ String delim = "=";
+ String delim2 = ";";
+ return builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2)
+ .append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim)
+ .append(initialDelay).toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (!(other instanceof PeriodicNotification)) {
+ return false;
+ }
+
+ PeriodicNotification notification = (PeriodicNotification) other;
+ return Objects.equals(this.id, notification.id) && (this.period == notification.period)
+ && Objects.equals(this.periodTimeUnit, notification.periodTimeUnit) && (this.initialDelay == notification.initialDelay);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, period, periodTimeUnit, initialDelay);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private String id;
+ private long period;
+ private TimeUnit periodTimeUnit;
+ private long initialDelay = 0;
+
+ /**
+ * @param id - periodic query id
+ * @return - builder to chain method calls
+ */
+ public Builder id(String id) {
+ this.id = id;
+ return this;
+ }
+
+ /**
+ * @param period of the periodic notification for generating regular notifications
+ * @return - builder to chain method calls
+ */
+ public Builder period(long period) {
+ this.period = period;
+ return this;
+ }
+
+ /**
+ * @param timeUnit of period and initial delay
+ * @return - builder to chain method calls
+ */
+ public Builder timeUnit(TimeUnit timeUnit) {
+ this.periodTimeUnit = timeUnit;
+ return this;
+ }
+
+ /**
+ * @param initialDelay - amount of time to wait before generating notifications
+ * @return - builder to chain method calls
+ */
+ public Builder initialDelay(long initialDelay) {
+ this.initialDelay = initialDelay;
+ return this;
+ }
+
+ /**
+ * Builds PeriodicNotification
+ * @return PeriodicNotification constructed from Builder specified parameters
+ */
+ public PeriodicNotification build() {
+ return new PeriodicNotification(id, period, periodTimeUnit, initialDelay);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
new file mode 100644
index 0000000..38073ce
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link PeriodicNotification} Object used by the Periodic Query Service to inform workers to
+ * process results for a given Periodic Query with the indicated id. Additionally
+ * this Object contains a {@link Date} object to indicate the date time at which this
+ * notification was generated.
+ *
+ */
+public class TimestampedNotification extends PeriodicNotification {
+
+ private Date date;
+
+ /**
+ * Constructs a TimestampedNotification
+ * @param id - Fluo Query Id associated with this Notification
+ * @param period - period at which notifications are generated
+ * @param periodTimeUnit - time unit associated with period and initial delay
+ * @param initialDelay - amount of time to wait before generating first notification
+ */
+ public TimestampedNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) {
+ super(id, period, periodTimeUnit, initialDelay);
+ date = new Date();
+ }
+
+ /**
+ * Creates a TimestampedNotification
+ * @param notification - PeriodicNotification used to create this TimestampedNotification.
+ * This constructor creates a time stamp for the TimestampedNotification.
+ */
+ public TimestampedNotification(PeriodicNotification notification) {
+ super(notification);
+ date = new Date();
+ }
+
+ /**
+ * @return timestamp at which this notification was generated
+ */
+ public Date getTimestamp() {
+ return date;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ";date=" + date;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
new file mode 100644
index 0000000..a363d5d
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
@@ -0,0 +1,114 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.periodic.notification.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Executor service that runs {@link TimestampedNotificationProcessor}s with basic
+ * functionality for starting, stopping, and determining whether notification processors are
+ * being executed.
+ *
+ */
+public class NotificationProcessorExecutor implements LifeCycle {
+
+ private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class);
+ private BlockingQueue<TimestampedNotification> notifications; // notifications
+ private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
+ private BlockingQueue<BindingSetRecord> bindingSets; // query results to
+ // export
+ private PeriodicQueryResultStorage periodicStorage;
+ private List<TimestampedNotificationProcessor> processors;
+ private int numberThreads;
+ private ExecutorService executor;
+ private boolean running = false;
+
+ /**
+ * Creates NotificationProcessorExecutor.
+ * @param periodicStorage - storage layer that periodic results are read from
+ * @param notifications - notifications are pulled from this queue, and the timestamp indicates which bin of results to query for
+ * @param bins - after notifications are processed, they are added to the bin to be deleted
+ * @param bindingSets - results read from the storage layer to be exported
+ * @param numberThreads - number of threads used for processing
+ */
+ public NotificationProcessorExecutor(PeriodicQueryResultStorage periodicStorage, BlockingQueue<TimestampedNotification> notifications,
+ BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, int numberThreads) {
+ this.notifications = Preconditions.checkNotNull(notifications);
+ this.bins = Preconditions.checkNotNull(bins);
+ this.bindingSets = Preconditions.checkNotNull(bindingSets);
+ this.periodicStorage = periodicStorage;
+ this.numberThreads = numberThreads;
+ processors = new ArrayList<>();
+ }
+
+ @Override
+ public void start() {
+ if (!running) {
+ executor = Executors.newFixedThreadPool(numberThreads);
+ for (int threadNumber = 0; threadNumber < numberThreads; threadNumber++) {
+ log.info("Creating exporter:" + threadNumber);
+ TimestampedNotificationProcessor processor = TimestampedNotificationProcessor.builder().setBindingSets(bindingSets)
+ .setBins(bins).setPeriodicStorage(periodicStorage).setNotifications(notifications).setThreadNumber(threadNumber)
+ .build();
+ processors.add(processor);
+ executor.submit(processor);
+ }
+ running = true;
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (processors != null && processors.size() > 0) {
+ processors.forEach(x -> x.shutdown());
+ }
+ if (executor != null) {
+ executor.shutdown();
+ }
+ running = false;
+ try {
+ if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+ log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ log.info("Interrupted during shutdown, exiting uncleanly");
+ }
+ }
+
+ @Override
+ public boolean currentlyRunning() {
+ return running;
+ }
+
+}