You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/09/11 16:14:35 UTC

[2/7] incubator-rya git commit: RYA-355 Refactored the periodic notification service structure. Closes #221.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
deleted file mode 100644
index d69efe5..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.application;
-
-import java.util.Properties;
-
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-
-import jline.internal.Preconditions;
-
-/**
- * Configuration object for creating a {@link PeriodicNotificationApplication}.
- */
-public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfConfiguration {
-
-    public static String FLUO_APP_NAME = "fluo.app.name";
-    public static String FLUO_TABLE_NAME = "fluo.table.name";
-    public static String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
-    public static String NOTIFICATION_TOPIC = "kafka.notification.topic";
-    public static String NOTIFICATION_GROUP_ID = "kafka.notification.group.id";
-    public static String NOTIFICATION_CLIENT_ID = "kafka.notification.client.id";
-    public static String COORDINATOR_THREADS = "cep.coordinator.threads";
-    public static String PRODUCER_THREADS = "cep.producer.threads";
-    public static String EXPORTER_THREADS = "cep.exporter.threads";
-    public static String PROCESSOR_THREADS = "cep.processor.threads";
-    public static String PRUNER_THREADS = "cep.pruner.threads";
-    
-    public PeriodicNotificationApplicationConfiguration() {}
-    
-    /**
-     * Creates an PeriodicNotificationApplicationConfiguration object from a Properties file.  This method assumes
-     * that all values in the Properties file are Strings and that the Properties file uses the keys below.
-     * See rya.cep/cep.integration.tests/src/test/resources/properties/notification.properties for an example.
-     * <br>
-     * <ul>
-     * <li>"accumulo.auths" - String of Accumulo authorizations. Default is empty String.
-     * <li>"accumulo.instance" - Accumulo instance name (required)
-     * <li>"accumulo.user" - Accumulo user (required)
-     * <li>"accumulo.password" - Accumulo password (required)
-     * <li>"accumulo.rya.prefix" - Prefix for Accumulo backed Rya instance.  Default is "rya_"
-     * <li>"accumulo.zookeepers" - Zookeepers for underlying Accumulo instance (required)
-     * <li>"fluo.app.name" - Name of Fluo Application (required)
-     * <li>"fluo.table.name" - Name of Fluo Table (required)
-     * <li>"kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers and Consumers (required)
-     * <li>"kafka.notification.topic" - Topic to which new Periodic Notifications are published. Default is "notifications".
-     * <li>"kafka.notification.client.id" - Client Id for notification topic.  Default is "consumer0"
-     * <li>"kafka.notification.group.id" - Group Id for notification topic.  Default is "group0"
-     * <li>"cep.coordinator.threads" - Number of threads used by coordinator. Default is 1.
-     * <li>"cep.producer.threads" - Number of threads used by producer.  Default is 1.
-     * <li>"cep.exporter.threads" - Number of threads used by exporter.  Default is 1.
-     * <li>"cep.processor.threads" - Number of threads used by processor.  Default is 1.
-     * <li>"cep.pruner.threads" - Number of threads used by pruner.  Default is 1.
-     * </ul>
-     * <br>
-     * @param props - Properties file containing Accumulo specific configuration parameters
-     * @return AccumumuloRdfConfiguration with properties set
-     */
-    public PeriodicNotificationApplicationConfiguration(Properties props) {
-       super(fromProperties(props));
-       setFluoAppName(props.getProperty(FLUO_APP_NAME));
-       setFluoTableName(props.getProperty(FLUO_TABLE_NAME));
-       setBootStrapServers(props.getProperty(KAFKA_BOOTSTRAP_SERVERS));
-       setNotificationClientId(props.getProperty(NOTIFICATION_CLIENT_ID, "consumer0"));
-       setNotificationTopic(props.getProperty(NOTIFICATION_TOPIC, "notifications"));
-       setNotificationGroupId(props.getProperty(NOTIFICATION_GROUP_ID, "group0"));
-       setProducerThreads(Integer.parseInt(props.getProperty(PRODUCER_THREADS, "1")));
-       setProcessorThreads(Integer.parseInt(props.getProperty(PROCESSOR_THREADS, "1")));
-       setExporterThreads(Integer.parseInt(props.getProperty(EXPORTER_THREADS, "1")));
-       setPrunerThreads(Integer.parseInt(props.getProperty(PRUNER_THREADS, "1")));
-       setCoordinatorThreads(Integer.parseInt(props.getProperty(COORDINATOR_THREADS, "1")));
-    }
-    
-    /**
-     * Sets the name of the Fluo Application
-     * @param fluoAppName 
-     */
-    public void setFluoAppName(String fluoAppName) {
-        set(FLUO_APP_NAME, Preconditions.checkNotNull(fluoAppName));
-    }
-    
-    /**
-     * Sets the name of the Fluo table
-     * @param fluoTableName
-     */
-    public void setFluoTableName(String fluoTableName) {
-       set(FLUO_TABLE_NAME, Preconditions.checkNotNull(fluoTableName)); 
-    }
-    
-    /**
-     * Sets the Kafka bootstrap servers
-     * @param bootStrapServers
-     */
-    public void setBootStrapServers(String bootStrapServers) {
-        set(KAFKA_BOOTSTRAP_SERVERS, Preconditions.checkNotNull(bootStrapServers)); 
-    }
-    
-    /**
-     * Sets the Kafka topic name for new notification requests
-     * @param notificationTopic
-     */
-    public void setNotificationTopic(String notificationTopic) {
-        set(NOTIFICATION_TOPIC, Preconditions.checkNotNull(notificationTopic));
-    }
-    
-    /**
-     * Sets the GroupId for new notification request topic
-     * @param notificationGroupId
-     */
-    public void setNotificationGroupId(String notificationGroupId) {
-        set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationGroupId));
-    }
-    
-    /**
-     * Sets the ClientId for the Kafka notification topic
-     * @param notificationClientId
-     */
-    public void setNotificationClientId(String notificationClientId) {
-        set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationClientId));
-    }
-    
-    /**
-     * Sets the number of threads for the coordinator
-     * @param threads
-     */
-    public void setCoordinatorThreads(int threads) {
-        setInt(COORDINATOR_THREADS, threads);
-    }
-    
-    /**
-     * Sets the number of threads for the exporter
-     * @param threads
-     */
-    public void setExporterThreads(int threads) {
-        setInt(EXPORTER_THREADS, threads);
-    }
-    
-    /**
-     * Sets the number of threads for the producer for reading new periodic notifications
-     * @param threads
-     */
-    public void setProducerThreads(int threads) {
-        setInt(PRODUCER_THREADS, threads);
-    }
-    
-    /**
-     * Sets the number of threads for the bin pruner
-     * @param threads
-     */
-    public void setPrunerThreads(int threads) {
-        setInt(PRUNER_THREADS, threads);
-    }
-    
-    /**
-     * Sets the number of threads for the Notification processor
-     * @param threads
-     */
-    public void setProcessorThreads(int threads) {
-        setInt(PROCESSOR_THREADS, threads);
-    }
-    
-    /**
-     * @return name of the Fluo application
-     */
-    public String getFluoAppName() {
-        return get(FLUO_APP_NAME);
-    }
-    
-    /**
-     * @return name of the Fluo table
-     */
-    public String getFluoTableName() {
-       return get(FLUO_TABLE_NAME); 
-    }
-    
-    /**
-     * @return Kafka bootstrap servers
-     */
-    public String getBootStrapServers() {
-        return get(KAFKA_BOOTSTRAP_SERVERS); 
-    }
-    
-    /**
-     * @return notification topic
-     */
-    public String getNotificationTopic() {
-        return get(NOTIFICATION_TOPIC, "notifications");
-    }
-    
-    /**
-     * @return Kafka GroupId for the notificaton topic
-     */
-    public String getNotificationGroupId() {
-        return get(NOTIFICATION_GROUP_ID, "group0");
-    }
-    
-    /**
-     * @return Kafka ClientId for the notification topic
-     */
-    public String getNotificationClientId() {
-        return get(NOTIFICATION_CLIENT_ID, "consumer0");
-    }
-    
-    /**
-     * @return the number of threads for the coordinator
-     */
-    public int getCoordinatorThreads() {
-        return getInt(COORDINATOR_THREADS, 1);
-    }
-    
-    /**
-     * @return the number of threads for the exporter
-     */
-    public int getExporterThreads() {
-        return getInt(EXPORTER_THREADS, 1);
-    }
-    
-    /**
-     * @return the number of threads for the notification producer
-     */
-    public int getProducerThreads() {
-        return getInt(PRODUCER_THREADS, 1);
-    }
-    
-    /**
-     * @return the number of threads for the bin pruner
-     */
-    public int getPrunerThreads() {
-        return getInt(PRUNER_THREADS, 1);
-    }
-    
-    /**
-     * @return number of threads for the processor
-     */
-    public int getProcessorThreads() {
-        return getInt(PROCESSOR_THREADS, 1);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
deleted file mode 100644
index 771a4ab..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.application;
-
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
-import org.apache.rya.periodic.notification.api.BindingSetRecord;
-import org.apache.rya.periodic.notification.api.NodeBin;
-import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
-import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
-import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
-import org.apache.rya.periodic.notification.notification.TimestampedNotification;
-import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
-import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
-import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
-import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
-import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
-import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
-import org.openrdf.query.BindingSet;
-
-/**
- * Factory for creating a {@link PeriodicNotificationApplication}.
- */
-public class PeriodicNotificationApplicationFactory {
-
-    /**
-     * Create a PeriodicNotificationApplication.
-     * @param props - Properties file that specifies the parameters needed to create the application
-     * @return PeriodicNotificationApplication to periodically poll Rya Fluo for new results
-     * @throws PeriodicApplicationException
-     */
-    public static PeriodicNotificationApplication getPeriodicApplication(Properties props) throws PeriodicApplicationException {
-        PeriodicNotificationApplicationConfiguration conf = new PeriodicNotificationApplicationConfiguration(props);
-        Properties kafkaProps = getKafkaProperties(conf);
-
-        BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
-        BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
-        BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>();
-
-        FluoClient fluo = null;
-        try {
-            PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf);
-            fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf);
-            NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications);
-            addRegisteredNotices(coordinator, fluo.newSnapshot());
-            KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProps, bindingSets);
-            PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins);
-            NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads());
-            KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaProps);
-            return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter)
-                    .setProcessor(processor).setPruner(pruner).build();
-        } catch (AccumuloException | AccumuloSecurityException e) {
-            throw new PeriodicApplicationException(e.getMessage());
-        } 
-    }
-    
-    private static void addRegisteredNotices(NotificationCoordinatorExecutor coord, Snapshot sx) {
-        coord.start();
-        PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
-        provider.processRegisteredNotifications(coord, sx);
-    }
-
-    private static NotificationCoordinatorExecutor getCoordinator(int numThreads, BlockingQueue<TimestampedNotification> notifications) {
-        return new PeriodicNotificationCoordinatorExecutor(numThreads, notifications);
-    }
-
-    private static KafkaExporterExecutor getExporter(int numThreads, Properties props, BlockingQueue<BindingSetRecord> bindingSets) {
-        KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe());
-        return new KafkaExporterExecutor(producer, numThreads, bindingSets);
-    }
-
-    private static PeriodicQueryPrunerExecutor getPruner(PeriodicQueryResultStorage storage, FluoClient fluo, int numThreads,
-            BlockingQueue<NodeBin> bins) {
-        return new PeriodicQueryPrunerExecutor(storage, fluo, numThreads, bins);
-    }
-
-    private static NotificationProcessorExecutor getProcessor(PeriodicQueryResultStorage periodicStorage,
-            BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets,
-            int numThreads) {
-        return new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, numThreads);
-    }
-
-    private static KafkaNotificationProvider getProvider(int numThreads, String topic, NotificationCoordinatorExecutor coord,
-            Properties props) {
-        return new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord,
-                numThreads);
-    }
-
-    private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(PeriodicNotificationApplicationConfiguration conf)
-            throws AccumuloException, AccumuloSecurityException {
-        Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers());
-        Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword()));
-        String ryaInstance = conf.getTablePrefix();
-        return new AccumuloPeriodicQueryResultStorage(conn, ryaInstance);
-    }
-    
-    private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { 
-        Properties kafkaProps = new Properties();
-        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
-        kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId());
-        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
-        kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        return kafkaProps;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java
deleted file mode 100644
index 0486244..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.coordinator;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.rya.periodic.notification.api.Notification;
-import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
-import org.apache.rya.periodic.notification.api.NotificationProcessor;
-import org.apache.rya.periodic.notification.notification.CommandNotification;
-import org.apache.rya.periodic.notification.notification.PeriodicNotification;
-import org.apache.rya.periodic.notification.notification.TimestampedNotification;
-import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Implementation of {@link NotificationCoordinatorExecutor} that generates regular notifications
- * as indicated by {@link PeriodicNotification}s that are registered with this Object. When notifications
- * are generated they are placed on a work queue to be processed by the {@link NotificationProcessor}.
- *
- */
-public class PeriodicNotificationCoordinatorExecutor implements NotificationCoordinatorExecutor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PeriodicNotificationCoordinatorExecutor.class);
-    private int numThreads;
-    private ScheduledExecutorService producerThreadPool;
-    private Map<String, ScheduledFuture<?>> serviceMap = new HashMap<>();
-    private BlockingQueue<TimestampedNotification> notifications;
-    private final ReentrantLock lock = new ReentrantLock(true);
-    private boolean running = false;
-
-    public PeriodicNotificationCoordinatorExecutor(int numThreads, BlockingQueue<TimestampedNotification> notifications) {
-        this.numThreads = numThreads;
-        this.notifications = notifications;
-    }
-
-    @Override
-    public void processNextCommandNotification(CommandNotification notification) {
-        lock.lock();
-        try {
-            processNotification(notification);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public void start() {
-        if (!running) {
-            producerThreadPool = Executors.newScheduledThreadPool(numThreads);
-            running = true;
-        }
-    }
-
-    @Override
-    public void stop() {
-
-        if (producerThreadPool != null) {
-            producerThreadPool.shutdown();
-        }
-
-        running = false;
-
-        try {
-            if (!producerThreadPool.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
-                producerThreadPool.shutdownNow();
-            }
-        } catch (Exception e) {
-            LOG.info("Service Executor Shutdown has been called.  Terminating NotificationRunnable");
-        }
-    }
-
-    private void processNotification(CommandNotification notification) {
-        Command command = notification.getCommand();
-        Notification periodic = notification.getNotification();
-        switch (command) {
-        case ADD:
-            addNotification(periodic);
-            break;
-        case DELETE:
-            deleteNotification(periodic);
-            break;
-        }
-    }
-
-    private void addNotification(Notification notification) {
-        Preconditions.checkArgument(notification instanceof PeriodicNotification);
-        PeriodicNotification notify = (PeriodicNotification) notification;
-        if (!serviceMap.containsKey(notification.getId())) {
-            ScheduledFuture<?> future = producerThreadPool.scheduleAtFixedRate(new NotificationProducer(notify), notify.getInitialDelay(),
-                    notify.getPeriod(), notify.getTimeUnit());
-            serviceMap.put(notify.getId(), future);
-        }
-    }
-
-    private boolean deleteNotification(Notification notification) {
-        if (serviceMap.containsKey(notification.getId())) {
-            ScheduledFuture<?> future = serviceMap.remove(notification.getId());
-            future.cancel(true);
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Scheduled Task that places a {@link PeriodicNotification}
-     * in the work queue at regular intervals. 
-     *
-     */
-    class NotificationProducer implements Runnable {
-
-        private PeriodicNotification notification;
-
-        public NotificationProducer(PeriodicNotification notification) {
-            this.notification = notification;
-        }
-
-        public void run() {
-            try {
-                notifications.put(new TimestampedNotification(notification));
-            } catch (InterruptedException e) {
-                LOG.info("Unable to add notification.  Process interrupted. ");
-                throw new RuntimeException(e);
-            }
-        }
-
-    }
-
-    @Override
-    public boolean currentlyRunning() {
-        return running;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
deleted file mode 100644
index c2e5ebf..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.exporter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.log4j.Logger;
-import org.apache.rya.periodic.notification.api.BindingSetExporter;
-import org.apache.rya.periodic.notification.api.BindingSetRecord;
-import org.apache.rya.periodic.notification.api.LifeCycle;
-import org.openrdf.query.BindingSet;
-
-import jline.internal.Preconditions;
-
-/**
- * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s.  
- *
- */
-public class KafkaExporterExecutor implements LifeCycle {
-
-    private static final Logger log = Logger.getLogger(BindingSetExporter.class);
-    private KafkaProducer<String, BindingSet> producer;
-    private BlockingQueue<BindingSetRecord> bindingSets;
-    private ExecutorService executor;
-    private List<KafkaPeriodicBindingSetExporter> exporters;
-    private int num_Threads;
-    private boolean running = false;
-
-    /**
-     * Creates a KafkaExporterExecutor for exporting periodic query results to Kafka.
-     * @param producer for publishing results to Kafka
-     * @param num_Threads number of threads used to publish results
-     * @param bindingSets - work queue containing {@link BindingSet}s to be published
-     */
-    public KafkaExporterExecutor(KafkaProducer<String, BindingSet> producer, int num_Threads, BlockingQueue<BindingSetRecord> bindingSets) {
-        Preconditions.checkNotNull(producer);
-        Preconditions.checkNotNull(bindingSets);
-        this.producer = producer;
-        this.bindingSets = bindingSets;
-        this.num_Threads = num_Threads;
-        this.exporters = new ArrayList<>();
-    }
-
-    @Override
-    public void start() {
-        if (!running) {
-            executor = Executors.newFixedThreadPool(num_Threads);
-
-            for (int threadNumber = 0; threadNumber < num_Threads; threadNumber++) {
-                log.info("Creating exporter:" + threadNumber);
-                KafkaPeriodicBindingSetExporter exporter = new KafkaPeriodicBindingSetExporter(producer, threadNumber, bindingSets);
-                exporters.add(exporter);
-                executor.submit(exporter);
-            }
-            running = true;
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (executor != null) {
-            executor.shutdown();
-        }
-
-        if (exporters != null && exporters.size() > 0) {
-            exporters.forEach(x -> x.shutdown());
-        }
-
-        if (producer != null) {
-            producer.close();
-        }
-
-        running = false;
-        try {
-            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
-                log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
-                executor.shutdownNow();
-            }
-        } catch (InterruptedException e) {
-            log.info("Interrupted during shutdown, exiting uncleanly");
-        }
-    }
-
-    @Override
-    public boolean currentlyRunning() {
-        return running;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
deleted file mode 100644
index 8a0322f..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.exporter;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.periodic.notification.api.BindingSetExporter;
-import org.apache.rya.periodic.notification.api.BindingSetRecord;
-import org.apache.rya.periodic.notification.api.BindingSetRecordExportException;
-import org.openrdf.model.Literal;
-import org.openrdf.query.BindingSet;
-
-import jline.internal.Preconditions;
-
-/**
- * Object that exports {@link BindingSet}s to the Kafka topic indicated by
- * the {@link BindingSetRecord}.
- * 
- */
-public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, Runnable {
-
-    private static final Logger log = Logger.getLogger(BindingSetExporter.class);
-    private KafkaProducer<String, BindingSet> producer;
-    private BlockingQueue<BindingSetRecord> bindingSets;
-    private AtomicBoolean closed = new AtomicBoolean(false);
-    private int threadNumber;
-
-    public KafkaPeriodicBindingSetExporter(KafkaProducer<String, BindingSet> producer, int threadNumber,
-            BlockingQueue<BindingSetRecord> bindingSets) {
-        Preconditions.checkNotNull(producer);
-        Preconditions.checkNotNull(bindingSets);
-        this.threadNumber = threadNumber;
-        this.producer = producer;
-        this.bindingSets = bindingSets;
-    }
-
-    /**
-     * Exports BindingSets to Kafka.  The BindingSet and topic are extracted from
-     * the indicated BindingSetRecord and the BindingSet is then exported to the topic.
-     */
-    @Override
-    public void exportNotification(BindingSetRecord record) throws BindingSetRecordExportException {
-        String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID;
-        BindingSet bindingSet = record.getBindingSet();
-        String topic = record.getTopic();
-        long binId = ((Literal) bindingSet.getValue(bindingName)).longValue();
-        final Future<RecordMetadata> future = producer
-                .send(new ProducerRecord<String, BindingSet>(topic, Long.toString(binId), bindingSet));
-        try {
-            //wait for confirmation that results have been received
-            future.get(5, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            throw new BindingSetRecordExportException(e.getMessage());
-        }
-    }
-
-    @Override
-    public void run() {
-        try {
-            while (!closed.get()) {
-                exportNotification(bindingSets.take());
-            }
-        } catch (InterruptedException | BindingSetRecordExportException e) {
-            log.trace("Thread " + threadNumber + " is unable to process message.");
-        }
-    }
-    
-    
-    public void shutdown() {
-        closed.set(true);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
deleted file mode 100644
index a9a5ad1..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
+++ /dev/null
@@ -1,114 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */package org.apache.rya.periodic.notification.processor;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.periodic.notification.api.BindingSetRecord;
-import org.apache.rya.periodic.notification.api.LifeCycle;
-import org.apache.rya.periodic.notification.api.NodeBin;
-import org.apache.rya.periodic.notification.notification.TimestampedNotification;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Executor service that runs {@link TimestampedNotificationProcessor}s with basic
- * functionality for starting, stopping, and determining whether notification processors are
- * being executed. 
- *
- */
-public class NotificationProcessorExecutor implements LifeCycle {
-
-    private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class);
-    private BlockingQueue<TimestampedNotification> notifications; // notifications
-    private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
-    private BlockingQueue<BindingSetRecord> bindingSets; // query results to
-                                                         // export
-    private PeriodicQueryResultStorage periodicStorage;
-    private List<TimestampedNotificationProcessor> processors;
-    private int numberThreads;
-    private ExecutorService executor;
-    private boolean running = false;
-
-    /**
-     * Creates NotificationProcessorExecutor.
-     * @param periodicStorage - storage layer that periodic results are read from
-     * @param notifications - notifications are pulled from this queue, and the timestamp indicates which bin of results to query for
-     * @param bins - after notifications are processed, they are added to the bin to be deleted
-     * @param bindingSets - results read from the storage layer to be exported
-     * @param numberThreads - number of threads used for processing
-     */
-    public NotificationProcessorExecutor(PeriodicQueryResultStorage periodicStorage, BlockingQueue<TimestampedNotification> notifications,
-            BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, int numberThreads) {
-        this.notifications = Preconditions.checkNotNull(notifications);
-        this.bins = Preconditions.checkNotNull(bins);
-        this.bindingSets = Preconditions.checkNotNull(bindingSets);
-        this.periodicStorage = periodicStorage;
-        this.numberThreads = numberThreads;
-        processors = new ArrayList<>();
-    }
-
-    @Override
-    public void start() {
-        if (!running) {
-            executor = Executors.newFixedThreadPool(numberThreads);
-            for (int threadNumber = 0; threadNumber < numberThreads; threadNumber++) {
-                log.info("Creating exporter:" + threadNumber);
-                TimestampedNotificationProcessor processor = TimestampedNotificationProcessor.builder().setBindingSets(bindingSets)
-                        .setBins(bins).setPeriodicStorage(periodicStorage).setNotifications(notifications).setThreadNumber(threadNumber)
-                        .build();
-                processors.add(processor);
-                executor.submit(processor);
-            }
-            running = true;
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (processors != null && processors.size() > 0) {
-            processors.forEach(x -> x.shutdown());
-        }
-        if (executor != null) {
-            executor.shutdown();
-        }
-        running = false;
-        try {
-            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
-                log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
-                executor.shutdownNow();
-            }
-        } catch (InterruptedException e) {
-            log.info("Interrupted during shutdown, exiting uncleanly");
-        }
-    }
-
-    @Override
-    public boolean currentlyRunning() {
-        return running;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
deleted file mode 100644
index 8b65683..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.processor;
-
-import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
-import org.apache.rya.periodic.notification.api.BinPruner;
-import org.apache.rya.periodic.notification.api.BindingSetRecord;
-import org.apache.rya.periodic.notification.api.NodeBin;
-import org.apache.rya.periodic.notification.api.NotificationProcessor;
-import org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter;
-import org.apache.rya.periodic.notification.notification.TimestampedNotification;
-import org.openrdf.query.BindingSet;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Implementation of {@link NotificationProcessor} that uses the id indicated by
- * the {@link TimestampedNotification} to obtain results from the
- * {@link PeriodicQueryResultStorage} layer containing the results of the
- * Periodic Query. The TimestampedNotificationProcessor then parses the results
- * and adds them to work queues to be processed by the {@link BinPruner} and the
- * {@link KafkaPeriodicBindingSetExporter}.
- *
- */
-public class TimestampedNotificationProcessor implements NotificationProcessor, Runnable {
-
-    private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class);
-    private PeriodicQueryResultStorage periodicStorage;
-    private BlockingQueue<TimestampedNotification> notifications; // notifications
-                                                                  // to process
-    private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
-    private BlockingQueue<BindingSetRecord> bindingSets; // query results to export
-    private AtomicBoolean closed = new AtomicBoolean(false);
-    private int threadNumber;
-    
-
-    public TimestampedNotificationProcessor(PeriodicQueryResultStorage periodicStorage,
-            BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets,
-            int threadNumber) {
-        this.notifications = Preconditions.checkNotNull(notifications);
-        this.bins = Preconditions.checkNotNull(bins);
-        this.bindingSets = Preconditions.checkNotNull(bindingSets);
-        this.periodicStorage = periodicStorage;
-        this.threadNumber = threadNumber;
-    }
-
-    /**
-     * Processes the TimestampNotifications by scanning the PCJ tables for
-     * entries in the bin corresponding to
-     * {@link TimestampedNotification#getTimestamp()} and adding them to the
-     * export BlockingQueue. The TimestampNotification is then used to form a
-     * {@link NodeBin} that is passed to the BinPruner BlockingQueue so that the
-     * bins can be deleted from Fluo and Accumulo.
-     */
-    @Override
-    public void processNotification(TimestampedNotification notification) {
-
-        String id = notification.getId();
-        long ts = notification.getTimestamp().getTime();
-        long period = notification.getPeriod();
-        long bin = getBinFromTimestamp(ts, period);
-        NodeBin nodeBin = new NodeBin(id, bin);
-
-        try (CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(bin));) {
-
-            while(iter.hasNext()) {
-                bindingSets.add(new BindingSetRecord(iter.next(), id));
-            }
-            // add NodeBin to BinPruner queue so that bin can be deleted from
-            // Fluo and Accumulo
-            bins.add(nodeBin);
-        } catch (Exception e) {
-            log.debug("Encountered error: " + e.getMessage() + " while accessing periodic results for bin: " + bin + " for query: " + id);
-        }
-    }
-
-    /**
-     * Computes left bin end point containing event time ts
-     * 
-     * @param ts - event time
-     * @param start - time that periodic event began
-     * @param period - length of period
-     * @return left bin end point containing event time ts
-     */
-    private long getBinFromTimestamp(long ts, long period) {
-        Preconditions.checkArgument(period > 0);
-        return (ts / period) * period;
-    }
-
-    @Override
-    public void run() {
-        try {
-            while(!closed.get()) {
-                processNotification(notifications.take());
-            }
-        } catch (Exception e) {
-            log.trace("Thread_" + threadNumber + " is unable to process next notification.");
-            throw new RuntimeException(e);
-        }
-
-    }
-    
-    public void shutdown() {
-        closed.set(true);
-    }
-
-    public static Builder builder() {
-        return new Builder();
-    }
-
-  
-
-    public static class Builder {
-
-        private PeriodicQueryResultStorage periodicStorage;
-        private BlockingQueue<TimestampedNotification> notifications; // notifications to process
-        private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
-        private BlockingQueue<BindingSetRecord> bindingSets; // query results to export
-                                                       
-        private int threadNumber;
-
-        /**
-         * Set notification queue
-         * @param notifications - work queue containing notifications to be processed
-         * @return this Builder for chaining method calls
-         */
-        public Builder setNotifications(BlockingQueue<TimestampedNotification> notifications) {
-            this.notifications = notifications;
-            return this;
-        }
-
-        /**
-         * Set nodeBin queue
-         * @param bins - work queue containing NodeBins to be pruned
-         * @return this Builder for chaining method calls
-         */
-        public Builder setBins(BlockingQueue<NodeBin> bins) {
-            this.bins = bins;
-            return this;
-        }
-
-        /**
-         * Set BindingSet queue
-         * @param bindingSets - work queue containing BindingSets to be exported
-         * @return this Builder for chaining method calls
-         */
-        public Builder setBindingSets(BlockingQueue<BindingSetRecord> bindingSets) {
-            this.bindingSets = bindingSets;
-            return this;
-        }
-
-        /**
-         * Sets the number of threads used by this processor
-         * @param threadNumber - number of threads used by this processor
-         * @return - number of threads used by this processor
-         */
-        public Builder setThreadNumber(int threadNumber) {
-            this.threadNumber = threadNumber;
-            return this;
-        }
-        
-        /**
-         * Set the PeriodicStorage layer
-         * @param periodicStorage - periodic storage layer that periodic results are read from
-         * @return - this Builder for chaining method calls
-         */
-        public Builder setPeriodicStorage(PeriodicQueryResultStorage periodicStorage) {
-            this.periodicStorage = periodicStorage;
-            return this;
-        }
-
-        /**
-         * Builds a TimestampedNotificationProcessor
-         * @return - TimestampedNotificationProcessor built from arguments passed to this Builder
-         */
-        public TimestampedNotificationProcessor build() {
-            return new TimestampedNotificationProcessor(periodicStorage, notifications, bins, bindingSets, threadNumber);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
deleted file mode 100644
index 4dac64c..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.pruner;
-
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
-import org.apache.rya.periodic.notification.api.BinPruner;
-import org.apache.rya.periodic.notification.api.NodeBin;
-
-import jline.internal.Preconditions;
-
-/**
- * Deletes BindingSets from time bins in the indicated PCJ table
- */
-public class AccumuloBinPruner implements BinPruner {
-
-    private static final Logger log = Logger.getLogger(AccumuloBinPruner.class);
-    private PeriodicQueryResultStorage periodicStorage;
-
-    public AccumuloBinPruner(PeriodicQueryResultStorage periodicStorage) {
-        Preconditions.checkNotNull(periodicStorage);
-        this.periodicStorage = periodicStorage;
-    }
-
-    /**
-     * This method deletes all BindingSets in the indicated bin from the PCJ
-     * table indicated by the id. It is assumed that all BindingSet entries for
-     * the corresponding bin are written to the PCJ table so that the bin Id
-     * occurs first.
-     * 
-     * @param id
-     *            - pcj table id
-     * @param bin
-     *            - temporal bin the BindingSets are contained in
-     */
-    @Override
-    public void pruneBindingSetBin(NodeBin nodeBin) {
-        Preconditions.checkNotNull(nodeBin);
-        String id = nodeBin.getNodeId();
-        long bin = nodeBin.getBin();
-        try {
-            periodicStorage.deletePeriodicQueryResults(id, bin);
-        } catch (PeriodicQueryStorageException e) {
-            log.trace("Unable to delete results from Peroidic Table: " + id + " for bin: " + bin);
-            throw new RuntimeException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
deleted file mode 100644
index bee9c02..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.pruner;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
-import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
-import org.apache.rya.periodic.notification.api.BinPruner;
-import org.apache.rya.periodic.notification.api.NodeBin;
-
-import com.google.common.base.Optional;
-
-/**
- * Deletes {@link BindingSet}s from the indicated Fluo table.
- */
-public class FluoBinPruner implements BinPruner {
-
-    private static final Logger log = Logger.getLogger(FluoBinPruner.class);
-    private FluoClient client;
-
-    public FluoBinPruner(FluoClient client) {
-        this.client = client;
-    }
-
-    /**
-     * This method deletes BindingSets in the specified bin from the BindingSet
-     * Column of the indicated Fluo nodeId
-     * 
-     * @param id
-     *            - Fluo nodeId
-     * @param bin
-     *            - bin id
-     */
-    @Override
-    public void pruneBindingSetBin(NodeBin nodeBin) {
-        String id = nodeBin.getNodeId();
-        long bin = nodeBin.getBin();
-        try (Transaction tx = client.newTransaction()) {
-            Optional<NodeType> type = NodeType.fromNodeId(id);
-            if (!type.isPresent()) {
-                log.trace("Unable to determine NodeType from id: " + id);
-                throw new RuntimeException();
-            }
-            Column batchInfoColumn = type.get().getResultColumn();
-            String batchInfoSpanPrefix = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bin;
-            SpanBatchDeleteInformation batchInfo = SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn)
-                    .setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build();
-            BatchInformationDAO.addBatch(tx, id, batchInfo);
-            tx.commit();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
deleted file mode 100644
index 516690e..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.pruner;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
-import org.apache.rya.periodic.notification.api.BinPruner;
-import org.apache.rya.periodic.notification.api.NodeBin;
-
-import jline.internal.Preconditions;
-
-/**
- * Implementation of {@link BinPruner} that deletes old, already processed
- * Periodic Query results from Fluo and the PCJ table to which the Fluo results
- * are exported.
- *
- */
-public class PeriodicQueryPruner implements BinPruner, Runnable {
-
-    private static final Logger log = Logger.getLogger(PeriodicQueryPruner.class);
-    private FluoClient client;
-    private AccumuloBinPruner accPruner;
-    private FluoBinPruner fluoPruner;
-    private BlockingQueue<NodeBin> bins;
-    private AtomicBoolean closed = new AtomicBoolean(false);
-    private int threadNumber;
-
-    public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner accPruner, FluoClient client, BlockingQueue<NodeBin> bins, int threadNumber) {
-        this.fluoPruner = Preconditions.checkNotNull(fluoPruner);
-        this.accPruner = Preconditions.checkNotNull(accPruner);
-        this.client = Preconditions.checkNotNull(client);
-        this.bins = Preconditions.checkNotNull(bins);
-        this.threadNumber = threadNumber;
-    }
-    
-    @Override
-    public void run() {
-        try {
-            while (!closed.get()) {
-                pruneBindingSetBin(bins.take());
-            }
-        } catch (InterruptedException e) {
-            log.trace("Thread " + threadNumber + " is unable to prune the next message.");
-            throw new RuntimeException(e);
-        }
-    }
-    
-    /**
-     * Prunes BindingSet bins from the Rya Fluo Application in addition to the BindingSet
-     * bins created in the PCJ tables associated with the give query id.
-     * @param id - QueryResult Id for the Rya Fluo application 
-     * @param bin - bin id for bins to be deleted
-     */
-    @Override
-    public void pruneBindingSetBin(NodeBin nodeBin) {
-        String pcjId = nodeBin.getNodeId();
-        long bin = nodeBin.getBin();
-        try(Snapshot sx = client.newSnapshot()) {
-            String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
-            Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId);
-            accPruner.pruneBindingSetBin(nodeBin);
-            for(String fluoId: fluoIds) {
-                fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin));
-            }
-        } catch (Exception e) {
-            log.trace("Could not successfully initialize PeriodicQueryBinPruner.");
-        }
-    }
-    
-    
-    public void shutdown() {
-        closed.set(true);
-    }
-
-    private Set<String> getNodeIdsFromResultId(SnapshotBase sx, String id) {
-        Set<String> ids = new HashSet<>();
-        PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, id, ids);
-        return ids;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
deleted file mode 100644
index 1c11f96..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.pruner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.periodic.notification.api.LifeCycle;
-import org.apache.rya.periodic.notification.api.NodeBin;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Executor service that runs {@link PeriodicQueryPruner}s with added functionality
- * for starting, stopping, and determining if the query pruners are running.
- */
-public class PeriodicQueryPrunerExecutor implements LifeCycle {
-
-    private static final Logger log = Logger.getLogger(PeriodicQueryPrunerExecutor.class);
-    private FluoClient client;
-    private int numThreads;
-    private ExecutorService executor;
-    private BlockingQueue<NodeBin> bins;
-    private PeriodicQueryResultStorage periodicStorage;
-    private List<PeriodicQueryPruner> pruners;
-    private boolean running = false;
-
-    public PeriodicQueryPrunerExecutor(PeriodicQueryResultStorage periodicStorage, FluoClient client, int numThreads,
-            BlockingQueue<NodeBin> bins) {
-        Preconditions.checkArgument(numThreads > 0);
-        this.periodicStorage = periodicStorage;
-        this.numThreads = numThreads;
-        executor = Executors.newFixedThreadPool(numThreads);
-        this.bins = bins;
-        this.client = client;
-        this.pruners = new ArrayList<>();
-    }
-
-    @Override
-    public void start() {
-        if (!running) {
-            AccumuloBinPruner accPruner = new AccumuloBinPruner(periodicStorage);
-            FluoBinPruner fluoPruner = new FluoBinPruner(client);
-
-            for (int threadNumber = 0; threadNumber < numThreads; threadNumber++) {
-                PeriodicQueryPruner pruner = new PeriodicQueryPruner(fluoPruner, accPruner, client, bins, threadNumber);
-                pruners.add(pruner);
-                executor.submit(pruner);
-            }
-            running = true;
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (pruners != null && pruners.size() > 0) {
-            pruners.forEach(x -> x.shutdown());
-        }
-        if(client != null) {
-            client.close();
-        }
-        if (executor != null) {
-            executor.shutdown();
-            running = false;
-        }
-        try {
-            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
-                log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
-                executor.shutdownNow();
-            }
-        } catch (InterruptedException e) {
-            log.info("Interrupted during shutdown, exiting uncleanly");
-        }
-    }
-
-    @Override
-    public boolean currentlyRunning() {
-        return running;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
deleted file mode 100644
index 69bd39c..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.recovery;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.ColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
-import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
-import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
-import org.apache.rya.periodic.notification.notification.CommandNotification;
-import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
-import org.apache.rya.periodic.notification.notification.PeriodicNotification;
-
-/**
- * This class is used by the {@link PeriodicNotificationCoordinatorExecutor}
- * to add all existing {@link PeriodicNotification}s stored in Fluo when it is
- * initialized.  This enables the the {@link PeriodicServiceApplication} to be 
- * recovered from failure by restoring it original state.
- *
- */
-public class PeriodicNotificationProvider {
-
-    private FluoQueryMetadataDAO dao;
-    
-    public PeriodicNotificationProvider() {
-        this.dao = new FluoQueryMetadataDAO();
-    }
-    
-    /**
-     * Retrieve all of the information about Periodic Query results already registered
-     * with Fluo.  This is returned in the form of {@link CommandNotification}s that
-     * can be registered with the {@link NotificationCoordinatorExecutor}.
-     * @param sx - snapshot for reading results from Fluo
-     * @return - collection of CommandNotifications that indicate Periodic Query information registered with system
-     */
-    public Collection<CommandNotification> getNotifications(Snapshot sx) {
-        Set<PeriodicQueryMetadata> periodicMetadata = new HashSet<>();
-        RowScanner scanner = sx.scanner().fetch(FluoQueryColumns.PERIODIC_QUERY_NODE_ID)
-                .over(Span.prefix(IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX)).byRow().build();
-        Iterator<ColumnScanner> colScannerIter = scanner.iterator();
-        while (colScannerIter.hasNext()) {
-            ColumnScanner colScanner = colScannerIter.next();
-            Iterator<ColumnValue> values = colScanner.iterator();
-            while (values.hasNext()) {
-                PeriodicQueryMetadata metadata = dao.readPeriodicQueryMetadata(sx, values.next().getsValue());
-                periodicMetadata.add(metadata);
-            }
-        }
-        return getCommandNotifications(sx, periodicMetadata);
-    }
-    
-    /**
-     * Registers all of Periodic Query information already contained within Fluo to the 
-     * {@link NotificationCoordinatorExecutor}.
-     * @param coordinator - coordinator that periodic info will be registered with
-     * @param sx - snapshot for reading results from Fluo
-     */
-    public void processRegisteredNotifications(NotificationCoordinatorExecutor coordinator, Snapshot sx) {
-        coordinator.start();
-        Collection<CommandNotification> notifications = getNotifications(sx);
-        for(CommandNotification notification: notifications) {
-            coordinator.processNextCommandNotification(notification);
-        }
-    }
-    
-    private Collection<CommandNotification> getCommandNotifications(Snapshot sx, Collection<PeriodicQueryMetadata> metadata) {
-        Set<CommandNotification> notifications = new HashSet<>();
-        int i = 1;
-        for(PeriodicQueryMetadata meta:metadata) {
-            //offset initial wait to avoid overloading system
-            PeriodicNotification periodic = new PeriodicNotification(getQueryId(meta.getNodeId(), sx), meta.getPeriod(),TimeUnit.MILLISECONDS,i*5000);
-            notifications.add(new CommandNotification(Command.ADD, periodic));
-            i++;
-        }
-        return notifications;
-    }
-    
-    private String getQueryId(String periodicNodeId, Snapshot sx) {
-        return getQueryIdFromPeriodicId(sx, periodicNodeId);
-    }
-    
-    private String getQueryIdFromPeriodicId(Snapshot sx, String nodeId) {
-        NodeType nodeType = NodeType.fromNodeId(nodeId).orNull();
-        String id = null;
-        switch (nodeType) {
-        case FILTER:
-            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.FILTER_PARENT_NODE_ID).toString());
-            break;
-        case PERIODIC_QUERY:
-            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID).toString());
-            break;
-        case QUERY:
-            id = FluoQueryUtils.convertFluoQueryIdToPcjId(nodeId);
-            break;
-        case AGGREGATION: 
-            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString());
-            break;
-        case CONSTRUCT:
-            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString());
-            break;
-        case PROJECTION:
-            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PROJECTION_PARENT_NODE_ID).toString());
-            break;
-        default:
-            throw new IllegalArgumentException("Invalid node type");
-        
-        }
-        return id;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
deleted file mode 100644
index f5cd13a..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.registration.kafka;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.rya.periodic.notification.api.LifeCycle;
-import org.apache.rya.periodic.notification.api.Notification;
-import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
-import org.apache.rya.periodic.notification.notification.CommandNotification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Consumer group to pull all requests for adding and deleting {@link Notification}s
- * from Kafka.  This Object executes {@link PeriodicNotificationConsumer}s that retrieve
- * the {@link CommandNotification}s and register them with the {@link NotificationCoordinatorExecutor}.
- *
- */
-public class KafkaNotificationProvider implements LifeCycle {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationProvider.class);
-    private String topic;
-    private ExecutorService executor;
-    private NotificationCoordinatorExecutor coord;
-    private Properties props;
-    private int numThreads;
-    private boolean running = false;
-    Deserializer<String> keyDe;
-    Deserializer<CommandNotification> valDe;
-    List<PeriodicNotificationConsumer> consumers;
-
-    /**
-     * Create KafkaNotificationProvider for reading new notification requests form Kafka
-     * @param topic - notification topic    
-     * @param keyDe - Kafka message key deserializer
-     * @param valDe - Kafka message value deserializer
-     * @param props - properties used to creates a {@link KafkaConsumer}
-     * @param coord - {@link NotificationCoordinatorExecutor} for managing and generating notifications
-     * @param numThreads - number of threads used by this notification provider
-     */
-    public KafkaNotificationProvider(String topic, Deserializer<String> keyDe, Deserializer<CommandNotification> valDe, Properties props,
-            NotificationCoordinatorExecutor coord, int numThreads) {
-        this.coord = coord;
-        this.numThreads = numThreads;
-        this.topic = topic;
-        this.props = props;
-        this.consumers = new ArrayList<>();
-        this.keyDe = keyDe;
-        this.valDe = valDe;
-    }
-
-    @Override
-    public void stop() {
-        if (consumers != null && consumers.size() > 0) {
-            for (PeriodicNotificationConsumer consumer : consumers) {
-                consumer.shutdown();
-            }
-        }
-        if (executor != null) {
-            executor.shutdown();
-        }
-        running = false;
-        try {
-            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
-                LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
-                executor.shutdownNow();
-            }
-        } catch (InterruptedException e) {
-            LOG.info("Interrupted during shutdown, exiting uncleanly");
-        }
-    }
-
-    public void start() {
-        if (!running) {
-            if (!coord.currentlyRunning()) {
-                coord.start();
-            }
-            // now launch all the threads
-            executor = Executors.newFixedThreadPool(numThreads);
-
-            // now create consumers to consume the messages
-            int threadNumber = 0;
-            for (int i = 0; i < numThreads; i++) {
-                LOG.info("Creating consumer:" + threadNumber);
-                KafkaConsumer<String, CommandNotification> consumer = new KafkaConsumer<String, CommandNotification>(props, keyDe, valDe);
-                PeriodicNotificationConsumer periodicConsumer = new PeriodicNotificationConsumer(topic, consumer, threadNumber, coord);
-                consumers.add(periodicConsumer);
-                executor.submit(periodicConsumer);
-                threadNumber++;
-            }
-            running = true;
-        }
-    }
-
-    @Override
-    public boolean currentlyRunning() {
-        return running;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
deleted file mode 100644
index 6785ce8..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.registration.kafka;
-
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.log4j.Logger;
-import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
-import org.apache.rya.periodic.notification.notification.CommandNotification;
-
-/**
- * Consumer for the {@link KafkaNotificationProvider}.  This consumer pull messages
- * from Kafka and registers them with the {@link NotificationCoordinatorExecutor}.
- *
- */
-public class PeriodicNotificationConsumer implements Runnable {
-    private KafkaConsumer<String, CommandNotification> consumer;
-    private int m_threadNumber;
-    private String topic;
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-    private NotificationCoordinatorExecutor coord;
-    private static final Logger LOG = Logger.getLogger(PeriodicNotificationConsumer.class);
-
-    /**
-     * Creates a new PeriodicNotificationConsumer for consuming new notification requests from
-     * Kafka.
-     * @param topic - new notification topic
-     * @param consumer - consumer for pulling new requests from Kafka
-     * @param a_threadNumber - number of consumer threads to be used
-     * @param coord - notification coordinator for managing and generating notifications
-     */
-    public PeriodicNotificationConsumer(String topic, KafkaConsumer<String, CommandNotification> consumer, int a_threadNumber,
-            NotificationCoordinatorExecutor coord) {
-        this.topic = topic;
-        m_threadNumber = a_threadNumber;
-        this.consumer = consumer;
-        this.coord = coord;
-    }
-
-    public void run() {
-        
-        try {
-            LOG.info("Creating kafka stream for consumer:" + m_threadNumber);
-            consumer.subscribe(Arrays.asList(topic));
-            while (!closed.get()) {
-                ConsumerRecords<String, CommandNotification> records = consumer.poll(10000);
-                // Handle new records
-                for(ConsumerRecord<String, CommandNotification> record: records) {
-                    CommandNotification notification = record.value();
-                    LOG.info("Thread " + m_threadNumber + " is adding notification " + notification + " to queue.");
-                    LOG.info("Message: " + notification);
-                    coord.processNextCommandNotification(notification);
-                }
-            }
-        } catch (WakeupException e) {
-            // Ignore exception if closing
-            if (!closed.get()) throw e;
-        } finally {
-            consumer.close();
-        }
-    }
-    
-    public void shutdown() {
-        closed.set(true);
-        consumer.wakeup();
-    }
-}