You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/09/04 13:14:08 UTC

[4/4] incubator-rya git commit: RYA-319-Integration of Periodic Query with CLI. Closes #220.

RYA-319-Integration of Periodic Query with CLI. Closes #220.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/63f87b86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/63f87b86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/63f87b86

Branch: refs/heads/master
Commit: 63f87b868f33c718f085f5a7907d22b823dcd5d3
Parents: ad6ab01
Author: Caleb Meier <ca...@parsons.com>
Authored: Mon Aug 7 21:22:00 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Mon Sep 4 06:11:12 2017 -0700

----------------------------------------------------------------------
 .../org/apache/rya/api/client/CreatePCJ.java    |   4 +-
 .../rya/api/client/CreatePeriodicPCJ.java       |  40 ++++
 .../rya/api/client/DeletePeriodicPCJ.java       |  38 ++++
 .../rya/api/client/ListIncrementalQueries.java  |  38 ++++
 .../org/apache/rya/api/client/RyaClient.java    |  31 +++
 extras/indexing/pom.xml                         |   5 +-
 .../accumulo/AccumuloCreatePeriodicPCJ.java     | 145 ++++++++++++
 .../accumulo/AccumuloDeletePeriodicPCJ.java     | 135 +++++++++++
 .../AccumuloListIncrementalQueries.java         | 101 +++++++++
 .../accumulo/AccumuloRyaClientFactory.java      |   3 +
 extras/rya.pcj.fluo/pcj.fluo.api/pom.xml        |   4 +
 .../indexing/pcj/fluo/api/CreateFluoPcj.java    |  24 +-
 .../pcj/fluo/api/CreatePeriodicQuery.java       | 215 ++++++++++++++++++
 .../pcj/fluo/api/DeletePeriodicQuery.java       |  92 ++++++++
 .../indexing/pcj/fluo/api/ListFluoQueries.java  | 149 ++++++++++++
 .../fluo/app/IncrementalUpdateConstants.java    |   1 +
 .../pcj/fluo/app/export/ExporterManager.java    |  23 +-
 .../pcj/fluo/app/export/NoOpExporter.java       |  59 -----
 .../KafkaBindingSetExporterParameters.java      |   1 -
 .../export/rya/PeriodicBindingSetExporter.java  |   2 +-
 .../fluo/app/observers/QueryResultObserver.java |   4 +-
 .../indexing/pcj/fluo/app/query/FluoQuery.java  |   6 +-
 .../pcj/fluo/app/query/FluoQueryColumns.java    |   1 +
 .../fluo/app/query/FluoQueryMetadataDAO.java    |   6 +-
 .../pcj/fluo/api/ListFluoQueriesIT.java         |  96 ++++++++
 .../indexing/pcj/fluo/integration/BatchIT.java  |  12 +-
 .../pcj/fluo/integration/CreateDeleteIT.java    |   3 +-
 .../integration/CreateDeletePeriodicPCJ.java    | 227 +++++++++++++++++++
 .../pcj/fluo/integration/KafkaExportIT.java     |  24 +-
 .../indexing/pcj/fluo/integration/QueryIT.java  |  40 ++--
 .../pcj/fluo/test/base/KafkaExportITBase.java   |  17 +-
 .../periodic.service.api/.gitignore             |   1 +
 .../periodic.service.api/pom.xml                |  52 +++++
 .../periodic/notification/api/BinPruner.java    |  40 ++++
 .../notification/api/BindingSetExporter.java    |  37 +++
 .../notification/api/BindingSetRecord.java      |  80 +++++++
 .../api/BindingSetRecordExportException.java    |  45 ++++
 .../periodic/notification/api/LifeCycle.java    |  45 ++++
 .../rya/periodic/notification/api/NodeBin.java  |  77 +++++++
 .../periodic/notification/api/Notification.java |  34 +++
 .../api/NotificationCoordinatorExecutor.java    |  41 ++++
 .../notification/api/NotificationProcessor.java |  41 ++++
 .../api/PeriodicNotificationClient.java         |  64 ++++++
 .../notification/BasicNotification.java         |  76 +++++++
 .../notification/CommandNotification.java       |  99 ++++++++
 .../notification/PeriodicNotification.java      | 178 +++++++++++++++
 .../notification/TimestampedNotification.java   |  69 ++++++
 .../KafkaNotificationRegistrationClient.java    |  80 +++++++
 .../BasicNotificationTypeAdapter.java           |  55 +++++
 .../serialization/BindingSetSerDe.java          | 105 +++++++++
 .../CommandNotificationSerializer.java          |  76 +++++++
 .../CommandNotificationTypeAdapter.java         |  89 ++++++++
 .../PeriodicNotificationTypeAdapter.java        |  73 ++++++
 .../periodic.service.integration.tests/pom.xml  |  29 +--
 .../PeriodicNotificationApplicationIT.java      | 102 ++++-----
 .../PeriodicNotificationProviderIT.java         |   5 +-
 .../PeriodicNotificationExporterIT.java         |   1 +
 .../PeriodicNotificationProcessorIT.java        |   2 +-
 .../pruner/PeriodicNotificationBinPrunerIT.java |   7 +-
 .../PeriodicCommandNotificationConsumerIT.java  |  31 ++-
 .../periodic.service.notification/pom.xml       | 201 ++++++++--------
 .../periodic/notification/api/BinPruner.java    |  40 ----
 .../notification/api/BindingSetExporter.java    |  38 ----
 .../notification/api/CreatePeriodicQuery.java   | 124 ----------
 .../periodic/notification/api/LifeCycle.java    |  45 ----
 .../rya/periodic/notification/api/NodeBin.java  |  77 -------
 .../periodic/notification/api/Notification.java |  34 ---
 .../api/NotificationCoordinatorExecutor.java    |  41 ----
 .../notification/api/NotificationProcessor.java |  41 ----
 .../api/PeriodicNotificationClient.java         |  64 ------
 .../PeriodicNotificationApplication.java        |   2 +-
 .../PeriodicNotificationApplicationFactory.java |   2 +-
 .../notification/exporter/BindingSetRecord.java |  80 -------
 .../exporter/KafkaExporterExecutor.java         |   1 +
 .../KafkaPeriodicBindingSetExporter.java        |   9 +-
 .../notification/BasicNotification.java         |  76 -------
 .../notification/CommandNotification.java       |  99 --------
 .../notification/PeriodicNotification.java      | 178 ---------------
 .../notification/TimestampedNotification.java   |  69 ------
 .../NotificationProcessorExecutor.java          |   2 +-
 .../TimestampedNotificationProcessor.java       |   2 +-
 .../KafkaNotificationRegistrationClient.java    |  80 -------
 .../BasicNotificationTypeAdapter.java           |  55 -----
 .../serialization/BindingSetSerDe.java          | 105 ---------
 .../CommandNotificationSerializer.java          |  76 -------
 .../CommandNotificationTypeAdapter.java         |  89 --------
 .../PeriodicNotificationTypeAdapter.java        |  73 ------
 extras/rya.periodic.service/pom.xml             |   1 +
 .../org/apache/rya/shell/RyaAdminCommands.java  |  81 ++++++-
 .../apache/rya/shell/RyaAdminCommandsTest.java  |  65 ++++++
 pom.xml                                         |  20 ++
 91 files changed, 3235 insertions(+), 1815 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
index 6e92b28..3c369d8 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
@@ -28,7 +28,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  */
 @DefaultAnnotation(NonNull.class)
 public interface CreatePCJ {
-
+    
     /**
      * Metadata enum used to indicate the type of query that is registered.  If
      * the topmost node is a Construct QueryNode, then the type is Construct.  If the
@@ -44,7 +44,7 @@ public interface CreatePCJ {
      * Application.
      *
      */
-    public static enum ExportStrategy{RYA, KAFKA, NO_OP_EXPORT};
+    public static enum ExportStrategy{RYA, KAFKA, PERIODIC};
 
     
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java
new file mode 100644
index 0000000..7c006d0
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+/**
+ * This class creates new PeriodicPCJ for a given Rya instance.  
+ */
+public interface CreatePeriodicPCJ {
+
+    /**
+     * Creates a new PeriodicPCJ for a given Rya instance. The provided periodicTopic and bootStrapServers are used for
+     * registering new PeriodiNotifications with the underlying notification registration service. Typically, the
+     * bootStrapServers are the IP for the KafkaBrokers.
+     * 
+     * @param instanceName - Rya instance to connect to
+     * @param sparql - SPARQL query registered with the Periodic Service
+     * @param periodicTopic - Kafka topic that new PeriodicNotifications are exported to for registration with the
+     *            PeriodicService
+     * @param bootStrapServers - Connection string for Kafka brokers
+     * @return Fluo Query Id of the registered Periodic Query
+     */
+    public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java
new file mode 100644
index 0000000..c30afd2
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+/**
+ * Deletes and instance of a Periodic PCJ from Rya
+ */
+public interface DeletePeriodicPCJ {
+
+    /**
+     * Deletes a PCJ from an instance of Rya.
+     *
+     * @param instanceName - Indicates which Rya instance is maintaining the Periodic PCJ. (not null)
+     * @param pcjId - The ID of the Periodic PCJ that will be deleted. (not null)
+     * @param topic - Kafka topic for deleteing PeriodicNotifications
+     * @param brokers - Comma delimited host/port pairs for connecting to Kafka brokers.
+     * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+     * @throws RyaClientException Something caused the command to fail.
+     */
+    public void deletePeriodicPCJ(String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java b/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java
new file mode 100644
index 0000000..75e1297
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+/**
+ * Verifies that Rya instance has Fluo application enabled and lists
+ * all SPARQL queries maintained by the applcation.
+ */
+public interface ListIncrementalQueries {
+
+    /**
+     * Lists all SPARQL queries maintained by the Fluo Application for a given rya instance and associated information,
+     * including the Fluo Query Id, the QueryType, the ExportStrategy, and the pretty-printed SPARQL query.
+     * 
+     * @param ryaInstance - Rya instance whose queries are incrementally maintained by Fluo
+     * @return String comprised of new line delimited Strings that provide information about each query registered in
+     * Fluo, including the query Id, the query type, the export strategies, and the SPARQL query
+     * @throws RyaClientException
+     */
+    public String listIncrementalQueries(String ryaInstance) throws RyaClientException;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
index d1481dc..c04bd86 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
@@ -34,6 +34,9 @@ public class RyaClient {
     private final Install install;
     private final CreatePCJ createPcj;
     private final DeletePCJ deletePcj;
+    private final CreatePeriodicPCJ createPeriodicPcj;
+    private final DeletePeriodicPCJ deletePeriodicPcj;
+    private final ListIncrementalQueries listIncrementalQueries;
     private final BatchUpdatePCJ bactchUpdatePCJ;
     private final GetInstanceDetails getInstanceDetails;
     private final InstanceExists instanceExists;
@@ -51,6 +54,9 @@ public class RyaClient {
             final Install install,
             final CreatePCJ createPcj,
             final DeletePCJ deletePcj,
+            final CreatePeriodicPCJ createPeriodicPcj,
+            final DeletePeriodicPCJ deletePeriodicPcj,
+            final ListIncrementalQueries listIncrementalQueries,
             final BatchUpdatePCJ batchUpdatePcj,
             final GetInstanceDetails getInstanceDetails,
             final InstanceExists instanceExists,
@@ -63,6 +69,9 @@ public class RyaClient {
         this.install = requireNonNull(install);
         this.createPcj = requireNonNull(createPcj);
         this.deletePcj = requireNonNull(deletePcj);
+        this.createPeriodicPcj = createPeriodicPcj;
+        this.deletePeriodicPcj = deletePeriodicPcj;
+        this.listIncrementalQueries = listIncrementalQueries;
         this.bactchUpdatePCJ = requireNonNull(batchUpdatePcj);
         this.getInstanceDetails = requireNonNull(getInstanceDetails);
         this.instanceExists = requireNonNull(instanceExists);
@@ -96,8 +105,30 @@ public class RyaClient {
     public DeletePCJ getDeletePCJ() {
         return deletePcj;
     }
+    
+    /**
+     * @return An instance of {@link CreatePeridodicPCJ} that is connected to a Rya Periodic Storage
+     */ 
+    public CreatePeriodicPCJ getCreatePeriodicPCJ() {
+        return createPeriodicPcj;
+    }
 
     /**
+     * @return An instance of {@link DeletePeriodicPCJ} that is connected to a Rya Periodic Storage
+     */
+    public DeletePeriodicPCJ getDeletePeriodicPCJ() {
+        return deletePeriodicPcj;
+    }
+    
+    /**
+     * @return An instance of {@link ListIncrementalQueries} for displaying queries that are incrementallly
+     * maintained by the Rya instance
+     */
+    public ListIncrementalQueries getListIncrementalQueries() {
+        return listIncrementalQueries;
+    }
+    
+    /**
      * @return An instance of {@link BatchUpdatePCJ} that is connect to a Rya storage
      *   if the Rya instance supports PCJ indexing.
      */

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml
index 7961b9f..16a205f 100644
--- a/extras/indexing/pom.xml
+++ b/extras/indexing/pom.xml
@@ -81,7 +81,10 @@
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.pcj.fluo.api</artifactId>
         </dependency>
-        
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.periodic.service.api</artifactId>
+        </dependency>
         <!-- OpenRDF -->
         <dependency>
             <groupId>org.openrdf.sesame</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java
new file mode 100644
index 0000000..26a25da
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.client.CreatePeriodicPCJ;
+import org.apache.rya.api.client.GetInstanceDetails;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery.PeriodicQueryCreationException;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.SailException;
+
+import com.google.common.base.Optional;
+
+/**
+ * Class used by the RyaClient for creating Periodic PCJ.
+ *
+ */
+public class AccumuloCreatePeriodicPCJ extends AccumuloCommand implements CreatePeriodicPCJ {
+
+    private final GetInstanceDetails getInstanceDetails;
+    
+    /**
+     * Constructs an instance of {@link AccumuloCreatePeriodicPCJ}.
+     *
+     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
+     * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null)
+     */
+    public AccumuloCreatePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+        super(connectionDetails, connector);
+        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
+    }
+    
+    @Override
+    public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException {
+        requireNonNull(instanceName);
+        requireNonNull(sparql);
+
+        final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName);
+        final boolean ryaInstanceExists = ryaDetailsHolder.isPresent();
+        if (!ryaInstanceExists) {
+            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
+        }
+
+        final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails();
+        final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled();
+        if (!pcjIndexingEnabeld) {
+            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
+        }
+
+        // If a Fluo application is being used, task it with updating the PCJ.
+        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+        if (fluoDetailsHolder.isPresent()) {
+            final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
+            try {
+                return updateFluoAppAndRegisterWithKafka(instanceName, fluoAppName, sparql, periodicTopic, bootStrapServers);
+            } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException
+                    | RyaDAOException | PeriodicQueryCreationException e) {
+                throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+            } catch (UnsupportedQueryException e) {
+                throw new RyaClientException("The new PCJ could not be initialized because it either contains an unsupported query node "
+                        + "or an invalid ExportStrategy for the given QueryType.  Projection queries can be exported to either Rya or Kafka,"
+                        + "unless they contain an aggregation, in which case they can only be exported to Kafka.  Construct queries can be exported"
+                        + "to Rya and Kafka, and Periodic queries can only be exported to Rya.");
+            } 
+        } else {
+            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
+        }
+    }
+
+
+    
+    
+    private String updateFluoAppAndRegisterWithKafka(final String ryaInstance, final String fluoAppName, String sparql, String periodicTopic, String bootStrapServers) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException, UnsupportedQueryException, PeriodicQueryCreationException {
+        requireNonNull(sparql);
+        requireNonNull(periodicTopic);
+        requireNonNull(bootStrapServers);
+
+        final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
+        
+        // Connect to the Fluo application that is updating this instance's PCJs.
+        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
+        try(final FluoClient fluoClient = new FluoClientFactory().connect(
+                cd.getUsername(),
+                new String(cd.getPassword()),
+                cd.getInstanceName(),
+                cd.getZookeepers(),
+                fluoAppName);) {
+            // Initialize the PCJ within the Fluo application.
+            final CreatePeriodicQuery periodicPcj = new CreatePeriodicQuery(fluoClient, periodicStorage);
+            PeriodicNotificationClient periodicClient = new KafkaNotificationRegistrationClient(periodicTopic, createProducer(bootStrapServers));
+            return periodicPcj.withRyaIntegration(sparql, periodicClient, getConnector(), ryaInstance).getQueryId();
+        }
+    }
+    
+    
+    private static KafkaProducer<String, CommandNotification> createProducer(String bootStrapServers) {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
+        return new KafkaProducer<>(props);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java
new file mode 100644
index 0000000..18e49dc
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.client.DeletePeriodicPCJ;
+import org.apache.rya.api.client.GetInstanceDetails;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
+import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
+import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.openrdf.query.MalformedQueryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+/**
+ * Class used by the RyaClient and Rya Shell for deleting Periodic PCJ.
+ *
+ */
+public class AccumuloDeletePeriodicPCJ extends AccumuloCommand implements DeletePeriodicPCJ {
+    private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class);
+
+    private final GetInstanceDetails getInstanceDetails;
+
+    /**
+     * Constructs an instance of {@link AccumuloDeletePeriodicPCJ}.
+     *
+     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
+     * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
+     */
+    public AccumuloDeletePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+        super(connectionDetails, connector);
+        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
+    }
+
+    @Override
+    public void deletePeriodicPCJ(final String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException {
+        requireNonNull(instanceName);
+        requireNonNull(pcjId);
+
+        final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName);
+        final boolean ryaInstanceExists = originalDetails.isPresent();
+        if(!ryaInstanceExists) {
+            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
+        }
+
+        final boolean pcjIndexingEnabled = originalDetails.get().getPCJIndexDetails().isEnabled();
+        if(!pcjIndexingEnabled) {
+            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
+        }
+
+        // If the PCJ was being maintained by a Fluo application, then stop that process.
+        final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails();
+        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+
+        if (fluoDetailsHolder.isPresent()) {
+            final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName();
+            try {
+                stopUpdatingPCJ(instanceName, fluoAppName, pcjId, topic, brokers);
+            } catch (MalformedQueryException | UnsupportedQueryException | QueryDeletionException e) {
+                throw new RyaClientException(String.format("Unable to delete Periodic Query with id: %s", pcjId), e);
+            }
+        } else {
+            log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are "
+                    + "missing for the Rya instance named '%s'.", instanceName));
+        }
+        
+    }
+
+
+    private void stopUpdatingPCJ(final String ryaInstance, final String fluoAppName, final String pcjId, final String topic, final String brokers) throws UnsupportedQueryException, MalformedQueryException, QueryDeletionException {
+        requireNonNull(fluoAppName);
+        requireNonNull(pcjId);
+
+        // Connect to the Fluo application that is updating this instance's PCJs.
+        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
+        try (final FluoClient fluoClient = new FluoClientFactory().connect(cd.getUsername(), new String(cd.getPassword()),
+                cd.getInstanceName(), cd.getZookeepers(), fluoAppName)) {
+            // Delete the PCJ from the Fluo App.
+            PeriodicQueryResultStorage periodic = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
+            DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, periodic);
+            deletePeriodic.deletePeriodicQuery(pcjId, getPeriodicNotificationClient(topic, brokers));
+        }
+    }
+    
+    
+    private static PeriodicNotificationClient getPeriodicNotificationClient(String topic, String brokers) throws MalformedQueryException {
+        return new KafkaNotificationRegistrationClient(topic, createProducer(brokers));
+    }
+
+    private static KafkaProducer<String, CommandNotification> createProducer(String brokers) {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
+        return new KafkaProducer<>(props);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java
new file mode 100644
index 0000000..51e7d6a
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.List;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.api.client.GetInstanceDetails;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.ListIncrementalQueries;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
+import org.apache.rya.indexing.pcj.fluo.api.ListFluoQueries;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+@DefaultAnnotation(NonNull.class)
+public class AccumuloListIncrementalQueries extends AccumuloCommand implements ListIncrementalQueries {
+
+    private final GetInstanceDetails getInstanceDetails;
+    
+    public AccumuloListIncrementalQueries(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+        super(connectionDetails, connector);
+        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
+    }
+    
+    @Override
+    public String listIncrementalQueries(String instanceName) throws RyaClientException {
+
+        requireNonNull(instanceName);
+        
+        final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName);
+        final boolean ryaInstanceExists = ryaDetailsHolder.isPresent();
+        if (!ryaInstanceExists) {
+            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
+        }
+
+        final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails();
+        final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled();
+        if (!pcjIndexingEnabeld) {
+            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
+        }
+
+        // If a Fluo application is being used, task it with updating the PCJ.
+        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+        if (fluoDetailsHolder.isPresent()) {
+            final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
+            try {
+                return getFluoQueryString(instanceName, fluoAppName);
+            } catch (Exception e) {
+                throw new RyaClientException("Problem while creating Fluo Query Strings.", e);
+            } 
+        } else {
+            throw new RyaClientException(String.format("The '%s' instance of Rya does not have Fluo incremental updating enabled.", instanceName));
+        }
+    }
+
+    
+    private String getFluoQueryString(final String ryaInstance, final String fluoAppName) throws Exception {
+
+        // Connect to the Fluo application that is updating this instance's PCJs.
+        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
+        try(final FluoClient fluoClient = new FluoClientFactory().connect(
+                cd.getUsername(),
+                new String(cd.getPassword()),
+                cd.getInstanceName(),
+                cd.getZookeepers(),
+                fluoAppName);) {
+            // Initialize the PCJ within the Fluo application.
+           ListFluoQueries listQueries = new ListFluoQueries();
+           List<String> queries = listQueries.listFluoQueries(fluoClient);
+           return Joiner.on("\n").join(queries);
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index 5ee02f9..d9bf644 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -52,6 +52,9 @@ public class AccumuloRyaClientFactory {
                 new AccumuloInstall(connectionDetails, connector),
                 new AccumuloCreatePCJ(connectionDetails, connector),
                 new AccumuloDeletePCJ(connectionDetails, connector),
+                new AccumuloCreatePeriodicPCJ(connectionDetails, connector),
+                new AccumuloDeletePeriodicPCJ(connectionDetails, connector),
+                new AccumuloListIncrementalQueries(connectionDetails, connector),
                 new AccumuloBatchUpdatePCJ(connectionDetails, connector),
                 new AccumuloGetInstanceDetails(connectionDetails, connector),
                 new AccumuloInstanceExists(connectionDetails, connector),

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
index 758c481..16d33b2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
@@ -42,6 +42,10 @@ under the License.
             <artifactId>rya.pcj.fluo.app</artifactId>
         </dependency>
         <dependency>
+		   <groupId>org.apache.rya</groupId>
+		   <artifactId>rya.periodic.service.api</artifactId>
+		</dependency>
+        <dependency>
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.sail</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
index 501f1f5..a988bc7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
@@ -38,6 +38,7 @@ import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
 import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaType;
 import org.apache.rya.api.domain.RyaURI;
@@ -135,6 +136,7 @@ public class CreateFluoPcj {
      * according to the Kafka {@link ExportStrategy}.  
      *
      * @param sparql - sparql query String to be registered with Fluo
+     * @param strategies - ExportStrategies used to specify how final results will be handled
      * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
      * @return The metadata that was written to the Fluo application for the PCJ.
      * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
@@ -218,7 +220,17 @@ public class CreateFluoPcj {
                 .setSparql(sparql)
                 .setJoinBatchSize(joinBatchSize);
         
-        return builder.build();
+        FluoQuery query = builder.build();
+        
+        if(query.getQueryType() == QueryType.PERIODIC && !Sets.newHashSet(ExportStrategy.PERIODIC).containsAll(strategies)) {
+            throw new UnsupportedQueryException("Periodic Queries must only utilize the PeriodicExport or the NoOpExport ExportStrategy.");
+        }
+        
+        if(query.getQueryType() != QueryType.PERIODIC && strategies.contains(ExportStrategy.PERIODIC)) {
+            throw new UnsupportedQueryException("Only Periodic Queries can utilize the PeriodicExport ExportStrategy.");
+        }
+        
+        return query;
     }
     
     private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) {
@@ -283,13 +295,13 @@ public class CreateFluoPcj {
      * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
      * @param accumulo - Accumulo connector for connecting with Accumulo
      * @param ryaInstance - name of Rya instance to connect to
-     * @return The Fluo application's Query ID of the query that was created.
+     * @return FluoQuery containing the metadata for the newly registered SPARQL query
      * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
      * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
      * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
      * @throws UnsupportedQueryException 
      */
-    public String withRyaIntegration(
+    public FluoQuery withRyaIntegration(
             final String pcjId,
             final String sparql,
             final Set<ExportStrategy> strategies,
@@ -308,7 +320,7 @@ public class CreateFluoPcj {
         //import results already ingested into Rya that match query
         importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
         // return queryId to the caller for later monitoring from the export.
-        return fluoQuery.getQueryMetadata().getNodeId();
+        return fluoQuery;
     }
     
     /**
@@ -326,13 +338,13 @@ public class CreateFluoPcj {
      * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
      * @param accumulo - Accumuo connector for connecting to Accumulo
      * @param ryaInstance - name of Rya instance to connect to
-     * @return The Fluo application's Query ID of the query that was created.
+     * @return FluoQuery containing the metadata for the newly registered SPARQL query
      * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
      * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
      * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
      * @throws UnsupportedQueryException 
      */
-    public String withRyaIntegration(
+    public FluoQuery withRyaIntegration(
             final String pcjId,
             final PrecomputedJoinStorage pcjStorage,
             final FluoClient fluo,

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java
new file mode 100644
index 0000000..24adde9
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+import com.google.common.collect.Sets;
+
+
+/**
+ * Object that creates a Periodic Query.  A Periodic Query is any query
+ * requesting periodic updates about events that occurred within a given
+ * window of time of this instant. This is also known as a rolling window
+ * query.  Period Queries can be expressed using SPARQL by including the
+ * {@link Function} indicated by the URI {@link PeriodicQueryUtil#PeriodicQueryURI}
+ * in the query.  The user must provide this Function with the following arguments:
+ * the temporal variable in the query that will be filtered on, the window of time
+ * that events must occur within, the period at which the user wants to receive updates,
+ * and the time unit.  The following query requests all observations that occurred
+ * within the last minute and requests updates every 15 seconds.  It also performs
+ * a count on those observations.
+ * <p>
+ * <pre>
+ *  prefix function: http://org.apache.rya/function#
+ *                "prefix time: http://www.w3.org/2006/time# 
+ *                "select (count(?obs) as ?total) where {
+ *                "Filter(function:periodic(?time, 1, .25, time:minutes))
+ *                "?obs uri:hasTime ?time.
+ *                "?obs uri:hasId ?id }
+ * </pre>
+ * <p>
+ * This class is responsible for taking a Periodic Query expressed as a SPARQL query
+ * and adding to Fluo and Kafka so that it can be processed by the {@link PeriodicNotificationApplication}.
+ */
+public class CreatePeriodicQuery {
+
+    private FluoClient fluoClient;
+    private PeriodicQueryResultStorage periodicStorage;
+    
+    
+    /**
+     * Constructs an instance of CreatePeriodicQuery for creating periodic queries.  An instance
+     * of CreatePeriodicQuery that is created using this constructor will not publish new PeriodicNotifications
+     * to Kafka.
+     * 
+     * @param fluoClient - Fluo client for interacting with Fluo
+     * @param periodicStorage - PeriodicQueryResultStorage storing periodic query results
+     */
+    public CreatePeriodicQuery(FluoClient fluoClient, PeriodicQueryResultStorage periodicStorage) {
+        this.fluoClient = fluoClient;
+        this.periodicStorage = periodicStorage;
+    }
+    
+    
+    /**
+     * Creates a Periodic Query by adding the query to Fluo and using the resulting
+     * Fluo id to create a {@link PeriodicQueryResultStorage} table.
+     * 
+     * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
+     * @return FluoQuery indicating the metadata of the registered SPARQL query
+     */
+    public FluoQuery createPeriodicQuery(String sparql) throws PeriodicQueryCreationException {
+        try {
+            Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
+            if(optNode.isPresent()) {
+                String pcjId = FluoQueryUtils.createNewPcjId();
+               
+                //register query with Fluo
+                CreateFluoPcj createPcj = new CreateFluoPcj();
+                FluoQuery fluoQuery = createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluoClient);
+                
+                //register query with PeriodicResultStorage table
+                periodicStorage.createPeriodicQuery(pcjId, sparql);
+                
+                return fluoQuery;
+            } else {
+                throw new RuntimeException("Invalid PeriodicQuery.  Query must possess a PeriodicQuery Filter.");
+            }
+        } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) {
+            throw new PeriodicQueryCreationException(e);
+        }
+    }
+    
+    
+    /**
+     * Creates a Periodic Query by adding the query to Fluo and using the resulting
+     * Fluo id to create a {@link PeriodicQueryResultStorage} table.  Additionally,
+     * the associated PeriodicNotification is registered with the Periodic Query Service.
+     * 
+     * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
+     * @param notificationClient - {@link PeriodicNotificationClient} for registering new PeriodicNotifications
+     * @return FluoQuery indicating the metadata of the registered SPARQL query
+     */
+    public FluoQuery createPeriodicQuery(String sparql, PeriodicNotificationClient notificationClient) throws PeriodicQueryCreationException {
+        try {
+            Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
+            if(optNode.isPresent()) {
+                PeriodicQueryNode periodicNode = optNode.get();
+                String pcjId = FluoQueryUtils.createNewPcjId();
+               
+                //register query with Fluo
+                CreateFluoPcj createPcj = new CreateFluoPcj();
+                FluoQuery fluoQuery = createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluoClient);
+                
+                //register query with PeriodicResultStorage table
+                periodicStorage.createPeriodicQuery(pcjId, sparql);
+                //create notification
+                PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod())
+                        .timeUnit(periodicNode.getUnit()).build();
+                //register notification with periodic notification app
+                notificationClient.addNotification(notification);
+                
+                return fluoQuery;
+            } else {
+                throw new RuntimeException("Invalid PeriodicQuery.  Query must possess a PeriodicQuery Filter.");
+            }
+        } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) {
+            throw new PeriodicQueryCreationException(e);
+        }
+    }
+    
+    
+    /**
+     * Creates a Periodic Query by adding the query to Fluo and using the resulting
+     * Fluo id to create a {@link PeriodicQueryResultStorage} table.
+     * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
+     * @param notificationClient - {@link PeriodicNotificationClient} for registering new PeriodicNotifications
+     * @param conn - Accumulo connector for connecting to the Rya instance
+     * @param ryaInstance - name of the Accumulo back Rya instance
+     * @return FluoQuery indicating the metadata of the registered SPARQL query
+     */
+    public FluoQuery withRyaIntegration(String sparql, PeriodicNotificationClient notificationClient, Connector conn, String ryaInstance)
+            throws PeriodicQueryCreationException {
+        try {
+            Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
+            if (optNode.isPresent()) {
+                PeriodicQueryNode periodicNode = optNode.get();
+                String pcjId = FluoQueryUtils.createNewPcjId();
+
+                // register query with Fluo
+                CreateFluoPcj createPcj = new CreateFluoPcj();
+                FluoQuery fluoQuery = createPcj.withRyaIntegration(pcjId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC),
+                        fluoClient, conn, ryaInstance);
+
+                // register query with PeriodicResultStorage table
+                periodicStorage.createPeriodicQuery(pcjId, sparql);
+                // create notification
+                PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod())
+                        .timeUnit(periodicNode.getUnit()).build();
+                // register notification with periodic notification app
+                notificationClient.addNotification(notification);
+
+                return fluoQuery;
+            } else {
+                throw new RuntimeException("Invalid PeriodicQuery.  Query must possess a PeriodicQuery Filter.");
+            }
+        } catch (Exception e) {
+            throw new PeriodicQueryCreationException(e);
+        }
+    }
+    
+    /**
+     * This Exception gets thrown whenever there is an issue creating a PeriodicQuery.
+     *
+     */
+    public static class PeriodicQueryCreationException extends Exception {
+
+        private static final long serialVersionUID = 1L;
+        
+        public PeriodicQueryCreationException(Exception e) {
+            super(e);
+        }
+        
+        public PeriodicQueryCreationException(String message, Exception e) {
+            super(message, e);
+        }
+        
+        public PeriodicQueryCreationException(String message) {
+            super(message);
+        }
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java
new file mode 100644
index 0000000..4ff88da
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+
+import com.google.common.base.Preconditions;
+
+public class DeletePeriodicQuery {
+
+    private FluoClient fluo;
+    private PeriodicQueryResultStorage periodicStorage;
+    
+    public DeletePeriodicQuery(FluoClient fluo, PeriodicQueryResultStorage periodicStorage) {
+        this.fluo = fluo;
+        this.periodicStorage = periodicStorage;
+    }
+    
+    /**
+     * Deletes the Periodic Query with the indicated pcjId from Fluo and {@link PeriodicQueryResultStorage}.
+     * @param pcjId - Id of the Periodic Query to be deleted
+     */
+    public void deletePeriodicQuery(String pcjId) throws QueryDeletionException {
+        
+        Preconditions.checkNotNull(pcjId);
+        
+        DeleteFluoPcj deletePcj = new DeleteFluoPcj(1000);
+        try {
+            deletePcj.deletePcj(fluo, pcjId);
+            periodicStorage.deletePeriodicQuery(pcjId);
+        } catch (UnsupportedQueryException | PeriodicQueryStorageException e) {
+            throw new QueryDeletionException(String.format("Unable to delete the Periodic Query with Id: %s", pcjId), e);
+        }
+        
+    }
+    
+    /**
+     * Deletes the Periodic Query with the indicated pcjId from Fluo and {@link PeriodicQueryResultStorage}. In
+     * addition, this method also informs the Periodic Notification Service to stop generating PeriodicNotifications
+     * associated with the Periodic Query.
+     * 
+     * @param queryId - Id of the Periodic Query to be deleted
+     * @param periodicClient - Client used to inform the Periodic Notification Service to stop generating notifications
+     * @throws QueryDeletionException
+     */
+    public void deletePeriodicQuery(String pcjId, PeriodicNotificationClient periodicClient) throws QueryDeletionException {
+        
+        Preconditions.checkNotNull(periodicClient);
+        
+        deletePeriodicQuery(pcjId);
+        periodicClient.deleteNotification(new BasicNotification(pcjId));
+    }
+    
+    /**
+     *  This Exception is thrown when a problem is encountered while deleting a
+     *  query from the Fluo Application or the underlying storage layer.
+     */
+    public static class QueryDeletionException extends Exception {
+        
+        private static final long serialVersionUID = 1L;
+
+        public QueryDeletionException(String message) {
+            super(message);
+        }
+        
+        public QueryDeletionException(String message, Exception e) {
+            super(message, e);
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java
new file mode 100644
index 0000000..8f5bbfe
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Class for retrieving a List containing a String representation of each query maintained by Fluo.
+ *
+ */
+public class ListFluoQueries {
+
+    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+    /**
+     * Retrieve a list of String representations of each query maintained by Fluo
+     * 
+     * @param fluo - FluoClient for interacting with Fluo
+     * @return - List of String representations of queries maintained by Fluo.
+     * @throws Exception 
+     */
+    public List<String> listFluoQueries(FluoClient fluo) throws Exception {
+
+        List<String> queryStrings = new ArrayList<>();
+        Snapshot sx = fluo.newSnapshot();
+
+        List<String> ids = new ListQueryIds().listQueryIds(fluo);
+        for (String id : ids) {
+            queryStrings.add(extractString(dao.readQueryMetadata(sx, id)));
+        }
+
+        return queryStrings;
+    }
+
+    private static String extractString(QueryMetadata metadata) throws Exception {
+        FluoQueryStringBuilder builder = new FluoQueryStringBuilder();
+        return builder.setQueryId(metadata.getNodeId()).setQueryType(metadata.getQueryType())
+                .setExportStrategies(metadata.getExportStrategies()).setQuery(metadata.getSparql()).build();
+    }
+    
+    private static String getPrettyPrintSparql(String sparql, int indent) throws Exception {
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(sparql, null);
+        SPARQLQueryRenderer render = new SPARQLQueryRenderer();
+        String renderedQuery = render.render(pq);
+        
+        //remove extra quotes generated by query renderer
+        String[] splitRender = renderedQuery.split("\"\"\"");
+        StringBuilder builder = new StringBuilder();
+        for(String s: splitRender) {
+            builder.append(s).append("\"");
+        }
+        builder.replace(builder.length() - 1, builder.length(), "");
+        
+        //add indent to all lines following newline char
+        String[] newLineRender = builder.toString().split("\n");
+        builder = new StringBuilder();
+        String prefix = getVariableIndent(indent);
+        for(int i = 0; i < newLineRender.length; i++) {
+            if(i != 0) {
+                builder.append(prefix);
+            }
+            builder.append(newLineRender[i]).append("\n");
+        }
+        
+        return builder.toString();
+    }
+
+    private static String getVariableIndent(int len) {
+        return new String(new char[len]).replace('\0', ' ');
+    }
+
+    public static class FluoQueryStringBuilder {
+
+        private String queryId;
+        private String sparql;
+        private QueryType queryType;
+        private Set<ExportStrategy> strategies;
+
+        public FluoQueryStringBuilder setQueryId(String queryId) {
+            this.queryId = Preconditions.checkNotNull(queryId);
+            return this;
+        }
+
+        public FluoQueryStringBuilder setQuery(String query) {
+            this.sparql = Preconditions.checkNotNull(query);
+            return this;
+        }
+
+        public FluoQueryStringBuilder setExportStrategies(Set<ExportStrategy> strategies) {
+            this.strategies = Preconditions.checkNotNull(strategies);
+            return this;
+        }
+
+        public FluoQueryStringBuilder setQueryType(QueryType queryType) {
+            this.queryType = Preconditions.checkNotNull(queryType);
+            return this;
+        }
+
+        public String build() throws Exception {
+
+            int valueAlign = 20;
+            String sparqlHeader = "SPARQL: ";
+            String idHeader = "QUERY ID: ";
+            String typeHeader = "QUERY TYPE: ";
+            String strategiesHeader = "EXPORT STRATEGIES: ";
+            
+            StringBuilder builder = new StringBuilder();
+            builder.append(idHeader).append(getVariableIndent(valueAlign - idHeader.length())).append(queryId).append("\n")
+                   .append(typeHeader).append(getVariableIndent(valueAlign - typeHeader.length())).append(queryType).append("\n")
+                   .append(strategiesHeader).append(getVariableIndent(valueAlign - strategiesHeader.length())).append(strategies).append("\n")
+                   .append(sparqlHeader).append(getVariableIndent(valueAlign - sparqlHeader.length())).append(getPrettyPrintSparql(sparql, valueAlign)).append("\n");
+
+            return builder.toString();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index c090d37..5405837 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -39,6 +39,7 @@ public class IncrementalUpdateConstants {
     public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
     public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
     
+    //binding name reserved for periodic bin id for periodic query results
     public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId;
 
     public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI";

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
index 62f1271..2cb7eff 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
@@ -30,7 +30,6 @@ import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaSubGraph;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
@@ -95,16 +94,21 @@ public class ExporterManager implements AutoCloseable {
      * @throws ResultExportException
      */
     private void exportBindingSet(Map<ExportStrategy, IncrementalResultExporter> exporters, Set<ExportStrategy> strategies, String pcjId, Bytes data) throws ResultExportException {
+        VisibilityBindingSet bs;
         try {
-            VisibilityBindingSet bs = BS_SERDE.deserialize(data);
+            bs = BS_SERDE.deserialize(data);
             simplifyVisibilities(bs);
+        } catch (Exception e) {
+            throw new ResultExportException("Unable to deserialize the given BindingSet.", e);
+        }
             
+        try{
             for(ExportStrategy strategy: strategies) {
                 IncrementalBindingSetExporter exporter = (IncrementalBindingSetExporter) exporters.get(strategy);
                 exporter.export(pcjId, bs);
             }
         } catch (Exception e) {
-            throw new ResultExportException("Unable to deserialize the provided BindingSet", e);
+            throw new ResultExportException("Unable to export the given BindingSet " + bs + " with the given set of ExportStrategies " + strategies, e);
         }
     }
     
@@ -125,9 +129,14 @@ public class ExporterManager implements AutoCloseable {
             throw new ResultExportException("Undable to deserialize provided RyaSubgraph", e);
         }
         
-        for(ExportStrategy strategy: strategies) {
-            IncrementalRyaSubGraphExporter exporter = (IncrementalRyaSubGraphExporter) exporters.get(strategy);
-            exporter.export(pcjId, subGraph);
+        try {
+            for (ExportStrategy strategy : strategies) {
+                IncrementalRyaSubGraphExporter exporter = (IncrementalRyaSubGraphExporter) exporters.get(strategy);
+                exporter.export(pcjId, subGraph);
+            }
+        } catch (Exception e) {
+            throw new ResultExportException(
+                    "Unable to export the given subgraph " + subGraph + " using all of the ExportStrategies " + strategies);
         }
     }
     
@@ -195,8 +204,6 @@ public class ExporterManager implements AutoCloseable {
          * @return - ExporterManager for managing IncrementalResultExporters and exporting results
          */
         public ExporterManager build() {
-            //adds NoOpExporter in the event that users does not want to Export results
-            addIncrementalResultExporter(new NoOpExporter());
             return new ExporterManager(exporters);
         }
         

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
deleted file mode 100644
index ab7f2ed..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export;
-
-import java.util.Set;
-
-import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
-import org.apache.rya.api.client.CreatePCJ.QueryType;
-import org.apache.rya.api.domain.RyaSubGraph;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-import com.google.common.collect.Sets;
-
-/**
- * This class is a NoOpExporter that can be specified if a user does not
- * want their results exported from Fluo.
- *
- */
-public class NoOpExporter implements IncrementalBindingSetExporter, IncrementalRyaSubGraphExporter {
-
-    @Override
-    public Set<QueryType> getQueryTypes() {
-        return Sets.newHashSet(QueryType.CONSTRUCT, QueryType.PROJECTION);
-    }
-
-    @Override
-    public ExportStrategy getExportStrategy() {
-        return ExportStrategy.NO_OP_EXPORT;
-    }
-
-    @Override
-    public void close() throws Exception {
-    }
-
-    @Override
-    public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException {
-    }
-
-    @Override
-    public void export(String queryId, VisibilityBindingSet result) throws ResultExportException {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
index 4550a50..3687c9f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 
 import com.google.common.base.Preconditions;
 
-
 public class KafkaBindingSetExporterParameters extends KafkaExportParameterBase {
     
     public static final String CONF_USE_KAFKA_BINDING_SET_EXPORTER = "pcj.fluo.export.kafka.bindingset.enabled";

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
index 604462b..5a8f01c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
@@ -52,7 +52,7 @@ public class PeriodicBindingSetExporter implements IncrementalBindingSetExporter
 
     @Override
     public ExportStrategy getExportStrategy() {
-        return ExportStrategy.RYA;
+        return ExportStrategy.PERIODIC;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index ba7beee..e07c514 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_BINDING_SET;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
@@ -35,7 +36,6 @@ import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporte
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 
@@ -66,7 +66,7 @@ public class QueryResultObserver extends AbstractObserver {
 
     @Override
     public ObservedColumn getObservedColumn() {
-        return new ObservedColumn(FluoQueryColumns.QUERY_BINDING_SET, NotificationType.STRONG);
+        return new ObservedColumn(QUERY_BINDING_SET, NotificationType.STRONG);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index 17ab14f..a1c7c00 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -92,11 +92,7 @@ public class FluoQuery {
         this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
         this.filterMetadata = requireNonNull(filterMetadata);
         this.joinMetadata = requireNonNull(joinMetadata);
-        if(constructMetadata.isPresent()) {
-            this.type = QueryType.CONSTRUCT;
-        } else {
-            this.type = QueryType.PROJECTION;
-        }
+        this.type = queryMetadata.getQueryType();
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 8569a48..6ca0e8d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.fluo.api.data.Column;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index d5d9fe7..1cf2825 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -113,8 +113,10 @@ public class FluoQueryMetadataDAO {
         final String[] exportStrategies = values.get(FluoQueryColumns.QUERY_EXPORT_STRATEGIES).split(IncrementalUpdateConstants.VAR_DELIM);
         
         Set<ExportStrategy> strategies = new HashSet<>();
-        for(String strategy: exportStrategies) {
-            strategies.add(ExportStrategy.valueOf(strategy));
+        for (String strategy : exportStrategies) {
+            if (!strategy.isEmpty()) {
+                strategies.add(ExportStrategy.valueOf(strategy));
+            }
         }
 
         return QueryMetadata.builder(nodeId)