You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/25 19:35:36 UTC

[5/5] incubator-rya git commit: RYA-246-Query-Export-Strategy. Closes #213.

RYA-246-Query-Export-Strategy. Closes #213.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/05147266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/05147266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/05147266

Branch: refs/heads/master
Commit: 0514726604757c03e66015edee742b0fbdcf1ca2
Parents: 82df3ad
Author: Caleb Meier <ca...@parsons.com>
Authored: Mon Aug 7 21:22:00 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Fri Aug 25 12:34:42 2017 -0700

----------------------------------------------------------------------
 .../org/apache/rya/api/client/CreatePCJ.java    |  37 ++++
 .../api/client/accumulo/AccumuloCreatePCJ.java  |  27 ++-
 .../api/client/accumulo/AccumuloDeletePCJ.java  |   6 +-
 .../rya/api/client/accumulo/FluoITBase.java     |   2 +-
 .../src/main/java/RyaClientExample.java         |   2 +-
 .../storage/accumulo/ShiftVarOrderFactory.java  |   1 +
 .../indexing/pcj/fluo/api/CreateFluoPcj.java    | 146 +++++++++----
 .../indexing/pcj/fluo/api/DeleteFluoPcj.java    | 127 +----------
 .../indexing/pcj/fluo/api/GetPcjMetadata.java   |  10 +-
 .../indexing/pcj/fluo/api/GetQueryReport.java   |  30 +--
 .../rya/indexing/pcj/fluo/api/ListQueryIds.java |   2 +-
 .../fluo/app/IncrementalUpdateConstants.java    |   3 -
 .../pcj/fluo/app/export/ExporterManager.java    | 216 +++++++++++++++++++
 .../export/IncrementalBindingSetExporter.java   |   8 +-
 .../IncrementalBindingSetExporterFactory.java   | 104 ---------
 .../app/export/IncrementalResultExporter.java   |  42 ++++
 .../IncrementalResultExporterFactory.java       | 104 +++++++++
 .../export/IncrementalRyaSubGraphExporter.java  |   2 +-
 .../IncrementalRyaSubGraphExporterFactory.java  |  47 ----
 .../pcj/fluo/app/export/NoOpExporter.java       |  59 +++++
 .../export/kafka/KafkaBindingSetExporter.java   |  25 ++-
 .../kafka/KafkaBindingSetExporterFactory.java   |  13 +-
 .../KafkaBindingSetExporterParameters.java      |  80 +++++++
 .../export/kafka/KafkaExportParameterBase.java  |  86 ++++++++
 .../app/export/kafka/KafkaExportParameters.java |  86 --------
 .../export/kafka/KafkaRyaSubGraphExporter.java  |  16 ++
 .../kafka/KafkaRyaSubGraphExporterFactory.java  |  17 +-
 .../kafka/KafkaSubGraphExporterParameters.java  |  81 +++++++
 .../export/rya/PeriodicBindingSetExporter.java  |  71 ++++++
 .../rya/PeriodicBindingSetExporterFactory.java  |  74 +++++++
 .../app/export/rya/RyaBindingSetExporter.java   |  43 ++--
 .../rya/RyaBindingSetExporterFactory.java       |  15 +-
 .../app/export/rya/RyaExportParameters.java     |  33 ++-
 .../export/rya/RyaSubGraphExportParameters.java | 120 +++++++++++
 .../app/export/rya/RyaSubGraphExporter.java     | 106 +++++++++
 .../export/rya/RyaSubGraphExporterFactory.java  |  58 +++++
 .../fluo/app/observers/AggregationObserver.java |   3 -
 .../fluo/app/observers/BindingSetUpdater.java   |   2 +-
 .../observers/ConstructQueryResultObserver.java | 167 ++------------
 .../pcj/fluo/app/observers/FilterObserver.java  |   3 -
 .../pcj/fluo/app/observers/JoinObserver.java    |   3 -
 .../app/observers/PeriodicQueryObserver.java    |   2 -
 .../fluo/app/observers/ProjectionObserver.java  |   2 -
 .../fluo/app/observers/QueryResultObserver.java |  94 ++++----
 .../app/observers/StatementPatternObserver.java |   4 -
 .../indexing/pcj/fluo/app/query/FluoQuery.java  |  20 +-
 .../pcj/fluo/app/query/FluoQueryColumns.java    |  26 ---
 .../fluo/app/query/FluoQueryMetadataDAO.java    |  19 +-
 .../pcj/fluo/app/query/QueryMetadata.java       |   5 +-
 .../fluo/app/query/SparqlFluoQueryBuilder.java  |  24 ++-
 .../app/query/UnsupportedQueryException.java    |  41 ++++
 .../pcj/fluo/app/util/FluoQueryUtils.java       |   8 +
 .../export/rya/KafkaExportParametersTest.java   |  25 +--
 .../app/export/rya/RyaExportParametersTest.java |   6 +-
 .../fluo/app/query/PeriodicQueryUtilTest.java   |   2 +-
 .../app/query/QueryMetadataVisitorTest.java     |   2 +-
 .../pcj/fluo/client/PcjAdminClient.java         |   4 +
 .../pcj/fluo/client/PcjAdminClientCommand.java  |   4 +-
 .../fluo/client/command/NewQueryCommand.java    |   3 +-
 .../fluo/client/command/QueryReportCommand.java |   3 +-
 .../fluo/client/util/QueryReportRenderer.java   |  10 +-
 .../rya/indexing/pcj/fluo/demo/DemoDriver.java  |   2 +-
 .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java  |   7 +-
 .../indexing/pcj/fluo/api/GetPcjMetadataIT.java |   5 +-
 .../indexing/pcj/fluo/api/ListQueryIdsIT.java   |  10 +-
 .../fluo/app/query/FluoQueryMetadataDAOIT.java  |  24 +--
 .../indexing/pcj/fluo/integration/BatchIT.java  |   3 +-
 .../pcj/fluo/integration/CreateDeleteIT.java    |   7 +-
 .../pcj/fluo/integration/KafkaExportIT.java     |  20 +-
 .../integration/KafkaRyaSubGraphExportIT.java   |  22 +-
 .../indexing/pcj/fluo/integration/QueryIT.java  |   6 +-
 .../pcj/fluo/test/base/KafkaExportITBase.java   |  59 ++---
 .../rya/pcj/fluo/test/base/RyaExportITBase.java |   3 +-
 .../PeriodicNotificationProviderIT.java         |   3 +-
 .../pruner/PeriodicNotificationBinPrunerIT.java |   6 +-
 .../notification/api/CreatePeriodicQuery.java   |  19 +-
 .../pruner/PeriodicQueryPruner.java             |   9 +-
 .../recovery/PeriodicNotificationProvider.java  |   3 +-
 .../org/apache/rya/shell/RyaAdminCommands.java  |  21 +-
 .../apache/rya/shell/RyaAdminCommandsTest.java  |  12 +-
 80 files changed, 1698 insertions(+), 924 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
index e03a1f1..6e92b28 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
@@ -18,6 +18,8 @@
  */
 package org.apache.rya.api.client;
 
+import java.util.Set;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
@@ -28,7 +30,41 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 public interface CreatePCJ {
 
     /**
+     * Metadata enum used to indicate the type of query that is registered.  If
+     * the topmost node is a Construct QueryNode, then the type is Construct.  If the
+     * topmost node is a Projection QueryNode, then the type is Projection.  If the
+     * query contains a PeriodicQuery Filter anywhere within the query, then it is of type
+     * Periodic. 
+     *
+     */
+    public static enum QueryType{CONSTRUCT, PROJECTION, PERIODIC};
+    
+    /**
+     * Specifies the how Results will be exported from the Rya Fluo
+     * Application.
+     *
+     */
+    public static enum ExportStrategy{RYA, KAFKA, NO_OP_EXPORT};
+
+    
+    /**
+     * Designate a new PCJ that will be maintained by the target instance of Rya.
+     * Results will be exported according to the specified export strategies.
+     *
+     * @param instanceName - Indicates which Rya instance will create and maintain
+     *   the PCJ. (not null)
+     * @param sparql - The SPARQL query that will be maintained. (not null)
+     * @param strategies - The export strategies used to export results for this query
+     * @return The ID that was assigned to this newly created PCJ.
+     * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+     * @throws RyaClientException Something caused the command to fail.
+     */
+    public String createPCJ(final String instanceName, String sparql, Set<ExportStrategy> strategies) throws InstanceDoesNotExistException, RyaClientException;
+    
+    
+    /**
      * Designate a new PCJ that will be maintained by the target instance of Rya.
+     * Results will be exported to a Rya PCJ table.
      *
      * @param instanceName - Indicates which Rya instance will create and maintain
      *   the PCJ. (not null)
@@ -38,4 +74,5 @@ public interface CreatePCJ {
      * @throws RyaClientException Something caused the command to fail.
      */
     public String createPCJ(final String instanceName, String sparql) throws InstanceDoesNotExistException, RyaClientException;
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
index 644189a..6aef33c 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
@@ -20,6 +20,8 @@ package org.apache.rya.api.client.accumulo;
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.Set;
+
 import org.apache.accumulo.core.client.Connector;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
@@ -39,6 +41,7 @@ import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
 import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
@@ -49,6 +52,7 @@ import org.openrdf.repository.RepositoryException;
 import org.openrdf.sail.SailException;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -73,7 +77,7 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
     }
 
     @Override
-    public String createPCJ(final String instanceName, final String sparql) throws InstanceDoesNotExistException, RyaClientException {
+    public String createPCJ(final String instanceName, final String sparql, Set<ExportStrategy> strategies) throws InstanceDoesNotExistException, RyaClientException {
         requireNonNull(instanceName);
         requireNonNull(sparql);
 
@@ -99,9 +103,14 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
             if(fluoDetailsHolder.isPresent()) {
                 final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
                 try {
-                    updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
+                    updateFluoApp(instanceName, fluoAppName, pcjId, sparql, strategies);
                 } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
                     throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+                } catch (UnsupportedQueryException e) {
+                    throw new RyaClientException("The new PCJ could not be initialized because it either contains an unsupported query node "
+                            + "or an invalid ExportStrategy for the given QueryType.  Projection queries can be exported to either Rya or Kafka,"
+                            + "unless they contain an aggregation, in which case they can only be exported to Kafka.  Construct queries can be exported"
+                            + "to Rya and Kafka, and Periodic queries can only be exported to Rya.");
                 }
 
                 // Update the Rya Details to indicate the PCJ is being updated incrementally.
@@ -133,9 +142,16 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
         }
     }
 
-    private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException {
-        requireNonNull(pcjStorage);
+    @Override
+    public String createPCJ(String instanceName, String sparql) throws InstanceDoesNotExistException, RyaClientException {
+        return createPCJ(instanceName, sparql, Sets.newHashSet(ExportStrategy.RYA));
+    }
+    
+    
+    private void updateFluoApp(final String ryaInstance, final String fluoAppName, final String pcjId, String sparql, Set<ExportStrategy> strategies) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException, UnsupportedQueryException {
+        requireNonNull(sparql);
         requireNonNull(pcjId);
+        requireNonNull(strategies);
 
         // Connect to the Fluo application that is updating this instance's PCJs.
         final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
@@ -147,7 +163,8 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
                 fluoAppName);) {
             // Initialize the PCJ within the Fluo application.
             final CreateFluoPcj fluoCreatePcj = new CreateFluoPcj();
-            fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance);
+            fluoCreatePcj.withRyaIntegration(pcjId, sparql, strategies, fluoClient, getConnector(), ryaInstance);
         }
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
index eb2b2d7..547254d 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
@@ -123,7 +123,11 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ {
                 cd.getZookeepers(),
                 fluoAppName)) {
             // Delete the PCJ from the Fluo App.
-            new DeleteFluoPcj(1000).deletePcj(fluoClient, pcjId);
+            try {
+                new DeleteFluoPcj(1000).deletePcj(fluoClient, pcjId);
+            } catch (Exception e) {
+                log.warn("PcjId corresponds to an invalid PCJ. The query cannot be deleted.");
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
index 113b397..695704b 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
@@ -199,7 +199,7 @@ public abstract class FluoITBase {
         final HashMap<String, String> params = new HashMap<>();
 
         final RyaExportParameters ryaParams = new RyaExportParameters(params);
-        ryaParams.setExportToRya(true);
+        ryaParams.setUseRyaBindingSetExporter(true);
         ryaParams.setAccumuloInstanceName(instanceName);
         ryaParams.setZookeeperServers(zookeepers);
         ryaParams.setExporterUsername(clusterInstance.getUsername());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexingExample/src/main/java/RyaClientExample.java
----------------------------------------------------------------------
diff --git a/extras/indexingExample/src/main/java/RyaClientExample.java b/extras/indexingExample/src/main/java/RyaClientExample.java
index 1b0450f..b0afd5a 100644
--- a/extras/indexingExample/src/main/java/RyaClientExample.java
+++ b/extras/indexingExample/src/main/java/RyaClientExample.java
@@ -249,7 +249,7 @@ public class RyaClientExample {
         // export observer.
         final HashMap<String, String> params = new HashMap<>();
         final RyaExportParameters ryaParams = new RyaExportParameters(params);
-        ryaParams.setExportToRya(true);
+        ryaParams.setUseRyaBindingSetExporter(true);
         ryaParams.setAccumuloInstanceName(instanceName);
         ryaParams.setZookeeperServers(zookeepers);
         ryaParams.setExporterUsername(username);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
index 26c4339..e297ec9 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
@@ -46,6 +46,7 @@ public class ShiftVarOrderFactory implements PcjVarOrderFactory {
         final Set<String> bindingNames = new SPARQLParser().parseQuery(sparql, null)
                 .getTupleExpr()
                 .getBindingNames();
+        
 
         return makeVarOrders( new VariableOrder(bindingNames) );
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
index 150a256..501f1f5 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
@@ -27,7 +27,6 @@ import java.io.UnsupportedEncodingException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -38,6 +37,7 @@ import org.apache.fluo.api.client.Transaction;
 import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaType;
 import org.apache.rya.api.domain.RyaURI;
@@ -50,6 +50,8 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
@@ -61,6 +63,7 @@ import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.algebra.StatementPattern;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -128,60 +131,92 @@ public class CreateFluoPcj {
     /**
      * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method
      * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated
-     * inside of Fluo.  This method assumes that the user will export the results to Kafka or
-     * some other external resource.  The export id is equivalent to the queryId that is returned,
-     * which is in contrast to the other createPcj methods in this class which accept an external pcjId
-     * that is used to identify the Accumulo table or Kafka topic for exporting results.
+     * inside of Fluo.  This method assumes that the user will export the results to Kafka 
+     * according to the Kafka {@link ExportStrategy}.  
      *
      * @param sparql - sparql query String to be registered with Fluo
      * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
      * @return The metadata that was written to the Fluo application for the PCJ.
      * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+     * @throws UnsupportedQueryException 
      * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
      */
-    public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException {
+    public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException, UnsupportedQueryException {
         Preconditions.checkNotNull(sparql);
         Preconditions.checkNotNull(fluo);
         
-        String pcjId = UUID.randomUUID().toString().replaceAll("-", "");
-        return createPcj(pcjId, sparql, fluo);
+        String pcjId = FluoQueryUtils.createNewPcjId();
+        return createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluo);
     }
     
     /**
      * Tells the Fluo PCJ Updater application to maintain a new PCJ.  This method provides
      * no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely
-     * creates the FluoQuery (metadata) inside of Fluo so that results and be incrementally generated
-     * inside of Fluo.  This method assumes that the user will export the results to Kafka or
-     * some other external resource.
+     * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated
+     * inside of Fluo.  Results are exported according to the Set of {@link ExportStrategy} enums.  If
+     * the Rya ExportStrategy is specified, care should be taken to verify that the PCJ table exists.
      *
      * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
      * @param sparql - sparql query String to be registered with Fluo
+     * @param strategies - ExportStrategies used to specify how final results will be handled
      * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
      * @return The metadata that was written to the Fluo application for the PCJ.
-     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+     * @throws UnsupportedQueryException 
+     * @throws MalformedQueryException
      */
     public FluoQuery createPcj(
             final String pcjId,
             final String sparql,
-            final FluoClient fluo) throws MalformedQueryException {
+            final Set<ExportStrategy> strategies,
+            final FluoClient fluo) throws MalformedQueryException, UnsupportedQueryException {
         requireNonNull(pcjId);
         requireNonNull(sparql);
+        requireNonNull(strategies);
         requireNonNull(fluo);
 
-        FluoQuery fluoQuery = makeFluoQuery(sparql, pcjId);
+        FluoQuery fluoQuery = makeFluoQuery(sparql, pcjId, strategies);
         writeFluoQuery(fluo, fluoQuery, pcjId);
 
         return fluoQuery;
     }
     
-    private FluoQuery makeFluoQuery(String sparql, String pcjId) throws MalformedQueryException {
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  The method takes in an
+     * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists.
+     * Results are exported to a PCJ table with the provided pcjId according to the Rya
+     * {@link ExportStrategy}.
+     *
+     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
+     * @param pcjStorage - Provides access to the PCJ index. (not null)
+     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+     * @return The metadata that was written to the Fluo application for the PCJ.
+     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+     * @throws UnsupportedQueryException 
+     */
+    public FluoQuery createPcj(
+            final String pcjId,
+            final PrecomputedJoinStorage pcjStorage,
+            final FluoClient fluo) throws MalformedQueryException, PcjException, UnsupportedQueryException {
+        requireNonNull(pcjId);
+        requireNonNull(pcjStorage);
+        requireNonNull(fluo);
+
+        // Parse the query's structure for the metadata that will be written to fluo.
+        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+        final String sparql = pcjMetadata.getSparql();
+        return createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo);
+    }
+    
+    private FluoQuery makeFluoQuery(String sparql, String pcjId, Set<ExportStrategy> strategies) throws MalformedQueryException, UnsupportedQueryException {
         
         String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
         
-        SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
-        builder.setFluoQueryId(queryId);
-        builder.setSparql(sparql);
-        builder.setJoinBatchSize(joinBatchSize);
+        SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder()
+                .setExportStrategies(strategies)
+                .setFluoQueryId(queryId)
+                .setSparql(sparql)
+                .setJoinBatchSize(joinBatchSize);
         
         return builder.build();
     }
@@ -195,56 +230,72 @@ public class CreateFluoPcj {
             tx.commit();
         }
     }
-
     
     /**
-     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  The method takes in an
-     * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists.
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+     * <p>
+     * This call scans Rya for Statement Pattern matches and inserts them into
+     * the Fluo application. It is assumed that results for any query registered
+     * using this method will be exported to Kafka according to the Kafka {@link ExportStrategy}.
      *
-     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
-     * @param pcjStorage - Provides access to the PCJ index. (not null)
+     * @param sparql - sparql query that will registered with Fluo. (not null)
      * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
-     * @return The metadata that was written to the Fluo application for the PCJ.
+     * @param accumulo - Accumulo connector for connecting with Accumulo
+     * @param ryaInstance - Name of Rya instance to connect to
+     * @return The Fluo application's Query ID of the query that was created.
      * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
      * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+     * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
+     * @throws UnsupportedQueryException 
      */
-    public FluoQuery createPcj(
-            final String pcjId,
-            final PrecomputedJoinStorage pcjStorage,
-            final FluoClient fluo) throws MalformedQueryException, PcjException {
-        requireNonNull(pcjId);
-        requireNonNull(pcjStorage);
+    public String withRyaIntegration(
+            final String sparql,
+            final FluoClient fluo,
+            final Connector accumulo,
+            final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
+        requireNonNull(sparql);
         requireNonNull(fluo);
+        requireNonNull(accumulo);
+        requireNonNull(ryaInstance);
 
-        // Parse the query's structure for the metadata that will be written to fluo.
-        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
-        final String sparql = pcjMetadata.getSparql();
-        return createPcj(pcjId, sparql, fluo);
+        
+        // Write the SPARQL query's structure to the Fluo Application.
+        final FluoQuery fluoQuery = createPcj(sparql, fluo);
+        //import results already ingested into Rya that match query
+        importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
+        // return queryId to the caller for later monitoring from the export.
+        return fluoQuery.getQueryMetadata().getNodeId();
     }
     
+    
     /**
      * Tells the Fluo PCJ Updater application to maintain a new PCJ.
      * <p>
      * This call scans Rya for Statement Pattern matches and inserts them into
      * the Fluo application. This method does not verify that a PcjTable with the
-     * the given pcjId actually exists. It is assumed that results for any query registered
-     * using this method will be exported to Kafka or some other external service.
+     * the given pcjId actually exists, so one should verify that the table exists before
+     * using the Rya ExportStrategy. Results will be exported according to the Set of
+     * {@link ExportStrategy} enums.
      *
      * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
      * @param sparql - sparql query that will registered with Fluo. (not null)
+     * @param strategies - ExportStrategies used to specify how final results will be handled
      * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
-     * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
+     * @param accumulo - Accumulo connector for connecting with Accumulo
+     * @param ryaInstance - name of Rya instance to connect to
      * @return The Fluo application's Query ID of the query that was created.
      * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
      * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
      * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
+     * @throws UnsupportedQueryException 
      */
     public String withRyaIntegration(
             final String pcjId,
             final String sparql,
+            final Set<ExportStrategy> strategies,
             final FluoClient fluo,
             final Connector accumulo,
-            final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
+            final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
         requireNonNull(pcjId);
         requireNonNull(sparql);
         requireNonNull(fluo);
@@ -253,14 +304,13 @@ public class CreateFluoPcj {
 
         
         // Write the SPARQL query's structure to the Fluo Application.
-        final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo);
+        final FluoQuery fluoQuery = createPcj(pcjId, sparql, strategies, fluo);
         //import results already ingested into Rya that match query
         importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
         // return queryId to the caller for later monitoring from the export.
         return fluoQuery.getQueryMetadata().getNodeId();
     }
     
-
     /**
      * Tells the Fluo PCJ Updater application to maintain a new PCJ.
      * <p>
@@ -268,24 +318,26 @@ public class CreateFluoPcj {
      * the Fluo application. The Fluo application will then maintain the intermediate
      * results as new triples are inserted and export any new query results to the
      * {@code pcjId} within the provided {@code pcjStorage}.  This method requires that a
-     * PCJ table already exist for the query corresponding to the pcjId.  Results will be exported
-     * to this table.
+     * PCJ table already exist for the query corresponding to the pcjId.  By default, results will be exported
+     * to this table according to the Rya {@link ExportStrategy}.
      *
      * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
      * @param pcjStorage - Provides access to the PCJ index. (not null)
      * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
-     * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
+     * @param accumulo - Accumuo connector for connecting to Accumulo
+     * @param ryaInstance - name of Rya instance to connect to
      * @return The Fluo application's Query ID of the query that was created.
      * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
      * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
      * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
+     * @throws UnsupportedQueryException 
      */
     public String withRyaIntegration(
             final String pcjId,
             final PrecomputedJoinStorage pcjStorage,
             final FluoClient fluo,
             final Connector accumulo,
-            final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
+            final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
         requireNonNull(pcjId);
         requireNonNull(pcjStorage);
         requireNonNull(fluo);
@@ -296,9 +348,11 @@ public class CreateFluoPcj {
         final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
         final String sparql = pcjMetadata.getSparql();
         
-        return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance);
+        return withRyaIntegration(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo, accumulo, ryaInstance);
     }
     
+    
+    
     private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance)
             throws RyaDAOException {
         // Reuse the same set object while performing batch inserts.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
index 58a52fb..0d97b2f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
@@ -21,7 +21,6 @@ package org.apache.rya.indexing.pcj.fluo.api;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -33,15 +32,10 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.openrdf.query.BindingSet;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -85,8 +79,9 @@ public class DeleteFluoPcj {
      *            Index. (not null)
      * @param pcjId - The PCJ ID for the query that will removed from the Fluo
      *            application. (not null)
+     * @throws UnsupportedQueryException 
      */
-    public void deletePcj(final FluoClient client, final String pcjId) {
+    public void deletePcj(final FluoClient client, final String pcjId) throws UnsupportedQueryException {
         requireNonNull(client);
         requireNonNull(pcjId);
 
@@ -109,84 +104,17 @@ public class DeleteFluoPcj {
      * @param tx - Transaction of a given Fluo table. (not null)
      * @param pcjId - Id of query. (not null)
      * @return list of Node IDs associated with the query {@code pcjId}.
+     * @throws UnsupportedQueryException 
      */
-    private List<String> getNodeIds(Transaction tx, String pcjId) {
+    private List<String> getNodeIds(Transaction tx, String pcjId) throws UnsupportedQueryException {
         requireNonNull(tx);
         requireNonNull(pcjId);
 
-        // Get the ID that tracks the query within the Fluo application.
-        final String queryId = getQueryIdFromPcjId(tx, pcjId);
-
-        // Get the query's children nodes.
-        final List<String> nodeIds = new ArrayList<>();
-        nodeIds.add(queryId);
-        getChildNodeIds(tx, queryId, nodeIds);
-        return nodeIds;
+        String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
+        FluoQuery fluoQuery = dao.readFluoQuery(tx, queryId);
+        return FluoQueryUtils.collectNodeIds(fluoQuery);
     }
 
-    /**
-     * Recursively navigate query tree to extract all of the nodeIds.
-     *
-     * @param tx - Transaction of a given Fluo table. (not null)
-     * @param nodeId - Current node in query tree. (not null)
-     * @param nodeIds - The Node IDs extracted from query tree. (not null)
-     */
-    private void getChildNodeIds(final Transaction tx, final String nodeId, final List<String> nodeIds) {
-        requireNonNull(tx);
-        requireNonNull(nodeId);
-        requireNonNull(nodeIds);
-
-        final NodeType type = NodeType.fromNodeId(nodeId).get();
-        switch (type) {
-            case QUERY:
-                final QueryMetadata queryMeta = dao.readQueryMetadata(tx, nodeId);
-                final String queryChild = queryMeta.getChildNodeId();
-                nodeIds.add(queryChild);
-                getChildNodeIds(tx, queryChild, nodeIds);
-                break;
-            case CONSTRUCT:
-                final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId);
-                final String constructChild = constructMeta.getChildNodeId();
-                nodeIds.add(constructChild);
-                getChildNodeIds(tx, constructChild, nodeIds);
-                break;
-            case JOIN:
-                final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId);
-                final String lchild = joinMeta.getLeftChildNodeId();
-                final String rchild = joinMeta.getRightChildNodeId();
-                nodeIds.add(lchild);
-                nodeIds.add(rchild);
-                getChildNodeIds(tx, lchild, nodeIds);
-                getChildNodeIds(tx, rchild, nodeIds);
-                break;
-            case FILTER:
-                final FilterMetadata filterMeta = dao.readFilterMetadata(tx, nodeId);
-                final String filterChild = filterMeta.getChildNodeId();
-                nodeIds.add(filterChild);
-                getChildNodeIds(tx, filterChild, nodeIds);
-                break;
-            case AGGREGATION:
-                final AggregationMetadata aggMeta = dao.readAggregationMetadata(tx, nodeId);
-                final String aggChild = aggMeta.getChildNodeId();
-                nodeIds.add(aggChild);
-                getChildNodeIds(tx, aggChild, nodeIds);
-                break;
-            case PERIODIC_QUERY:
-                final PeriodicQueryMetadata periodicMeta = dao.readPeriodicQueryMetadata(tx, nodeId);
-                final String periodicChild = periodicMeta.getChildNodeId();
-                nodeIds.add(periodicChild);
-                getChildNodeIds(tx, periodicChild, nodeIds);
-                break;
-            case PROJECTION:
-                final ProjectionMetadata projectionMetadata = dao.readProjectionMetadata(tx, nodeId);
-                final String projectionChild = projectionMetadata.getChildNodeId();
-                nodeIds.add(projectionChild);
-                getChildNodeIds(tx, projectionChild, nodeIds);
-                break;
-            case STATEMENT_PATTERN:
-                break;
-        }
-    }
 
     /**
      * Deletes metadata for all nodeIds associated with a given queryId in a
@@ -203,8 +131,6 @@ public class DeleteFluoPcj {
         requireNonNull(pcjId);
 
         try (final Transaction typeTx = tx) {
-            deletePcjIdAndSparqlMetadata(typeTx, pcjId);
-
             for (final String nodeId : nodeIds) {
                 final NodeType type = NodeType.fromNodeId(nodeId).get();
                 deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns());
@@ -232,24 +158,6 @@ public class DeleteFluoPcj {
     }
 
     /**
-     * Deletes high level query meta for converting from queryId to pcjId and
-     * vice versa, as well as converting from sparql to queryId.
-     *
-     * @param tx - Transaction the deletes will be performed with. (not null)
-     * @param pcjId - The PCJ whose metadata will be deleted. (not null)
-     */
-    private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) {
-        requireNonNull(tx);
-        requireNonNull(pcjId);
-
-        final String queryId = getQueryIdFromPcjId(tx, pcjId);
-        final String sparql = getSparqlFromQueryId(tx, queryId);
-        tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID);
-        tx.delete(sparql, FluoQueryColumns.QUERY_ID);
-        tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID);
-    }
-
-    /**
      * Deletes all results (BindingSets or Statements) associated with the specified nodeId.
      *
      * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null)
@@ -294,19 +202,4 @@ public class DeleteFluoPcj {
         }
     }
 
-    private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) {
-        requireNonNull(tx);
-        requireNonNull(pcjId);
-
-        final Bytes queryIdBytes = tx.get(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID);
-        return queryIdBytes.toString();
-    }
-
-    private String getSparqlFromQueryId(final Transaction tx, final String queryId) {
-        requireNonNull(tx);
-        requireNonNull(queryId);
-
-        final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId);
-        return metadata.getSparql();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
index 061a1d5..d08cb73 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
@@ -24,15 +24,13 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.data.Bytes;
-
 /**
  * Get {@link PcjMetadata} for queries that are managed by the Fluo app.
  */
@@ -87,7 +85,7 @@ public class GetPcjMetadata {
         // Lookup the Rya PCJ ID associated with the query.
         String pcjId = null;
         try(Snapshot snap = fluo.newSnapshot() ) {
-            pcjId = snap.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
+            pcjId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId);
             if(pcjId == null) {
                 throw new NotInFluoException("Could not get the PcjMetadata for queryId '" + queryId +
                         "' because a Rya PCJ ID not stored in the Fluo table.");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
index 1fb1485..ddbaaaf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
@@ -25,27 +25,27 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
 
 /**
  * Get a reports that indicates how many binding sets have been emitted for
@@ -63,8 +63,9 @@ public class GetQueryReport {
      * @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null)
      * @return A map from Query ID to QueryReport that holds a report for all of
      *   the queries that are being managed within the fluo app.
+     * @throws UnsupportedQueryException 
      */
-    public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) {
+    public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) throws UnsupportedQueryException {
         checkNotNull(fluo);
 
         // Fetch the queries that are being managed by the Fluo.
@@ -85,8 +86,9 @@ public class GetQueryReport {
      * @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null)
      * @param queryId - The ID of the query to fetch. (not null)
      * @return A report that was built for the query.
+     * @throws UnsupportedQueryException 
      */
-    public QueryReport getReport(final FluoClient fluo, final String queryId) {
+    public QueryReport getReport(final FluoClient fluo, final String queryId) throws UnsupportedQueryException {
         checkNotNull(fluo);
         checkNotNull(queryId);
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
index df1648b..e09d0c6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
@@ -52,7 +52,7 @@ public class ListQueryIds {
 
         try(Snapshot snap = fluo.newSnapshot() ) {
             // Create an iterator that iterates over the QUERY_ID column.
-            final CellScanner cellScanner = snap.scanner().fetch( FluoQueryColumns.QUERY_ID).build();
+            final CellScanner cellScanner = snap.scanner().fetch( FluoQueryColumns.QUERY_NODE_ID).build();
 
             for (RowColumnValue rcv : cellScanner) {
             	queryIds.add(rcv.getsValue());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index 4b6f44e..c090d37 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -39,9 +39,6 @@ public class IncrementalUpdateConstants {
     public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
     public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
     
-    public static enum QueryType{Construct, Projection, Periodic};
-    public static enum ExportStrategy{Rya, Kafka};
-    
     public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId;
 
     public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI";

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
new file mode 100644
index 0000000..62f1271
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class manages all of the {@link IncrementalResultExporter}s for the Rya Fluo Application.
+ * It maps the {@link FluoQuery}'s {@link QueryType} and Set of {@link ExportStrategy} objects
+ * to the correct IncrementalResultExporter. 
+ *
+ */
+public class ExporterManager implements AutoCloseable {
+
+    private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
+    private static final RyaSubGraphKafkaSerDe SG_SERDE = new RyaSubGraphKafkaSerDe();
+    private Map<String, String> simplifiedVisibilities = new HashMap<>();
+    
+    private Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> exporters;
+    
+    private ExporterManager(Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> exporters) {
+        this.exporters = Preconditions.checkNotNull(exporters);
+    }
+    
+    /**
+     * @return {@link Builder} for constructing an instance of an ExporterManager.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    /**
+     * Maps the data to the correct {@link IncrementalResultExporter} using the provided
+     * QueryType and ExportStrategies to be exported.
+     * @param type - QueryType that produced the result
+     * @param strategies - ExportStrategies used to export the result
+     * @param queryId - Fluo Query Id for the query that produced the result
+     * @param data - Serialized result to be exported
+     * @throws ResultExportException 
+     */
+    public void export(QueryType type, Set<ExportStrategy> strategies, String queryId, Bytes data) throws ResultExportException {
+        
+        String pcjId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId);
+        
+        if(type == QueryType.CONSTRUCT) {
+            exportSubGraph(exporters.get(type), strategies, pcjId, data);
+        } else {
+            exportBindingSet(exporters.get(type), strategies, pcjId, data);
+        }
+        
+    }
+    
+    /**
+     * Exports BindingSet using the exporters for a given {@link QueryType}.
+     * @param exporters - exporters corresponding to a given queryType
+     * @param strategies - export strategies used to export results (possibly a subset of those in the exporters map)
+     * @param pcjId - id of the query whose results are being exported
+     * @param data - serialized BindingSet result
+     * @throws ResultExportException
+     */
+    private void exportBindingSet(Map<ExportStrategy, IncrementalResultExporter> exporters, Set<ExportStrategy> strategies, String pcjId, Bytes data) throws ResultExportException {
+        try {
+            VisibilityBindingSet bs = BS_SERDE.deserialize(data);
+            simplifyVisibilities(bs);
+            
+            for(ExportStrategy strategy: strategies) {
+                IncrementalBindingSetExporter exporter = (IncrementalBindingSetExporter) exporters.get(strategy);
+                exporter.export(pcjId, bs);
+            }
+        } catch (Exception e) {
+            throw new ResultExportException("Unable to deserialize the provided BindingSet", e);
+        }
+    }
+    
+    /**
+     * Exports RyaSubGraph using the exporters for a given {@link QueryType}.
+     * @param exporters - exporters corresponding to a given queryType
+     * @param strategies - export strategies used to export results (possibly a subset of those in the exporters map)
+     * @param pcjId - id of the query whose results are being exported
+     * @param data - serialized RyaSubGraph result
+     * @throws ResultExportException
+     */
+    private void exportSubGraph(Map<ExportStrategy, IncrementalResultExporter> exporters, Set<ExportStrategy> strategies, String pcjId, Bytes data) throws ResultExportException {
+        RyaSubGraph subGraph = SG_SERDE.fromBytes(data.toArray());
+        
+        try {
+            simplifyVisibilities(subGraph);
+        } catch (UnsupportedEncodingException e) {
+            throw new ResultExportException("Undable to deserialize provided RyaSubgraph", e);
+        }
+        
+        for(ExportStrategy strategy: strategies) {
+            IncrementalRyaSubGraphExporter exporter = (IncrementalRyaSubGraphExporter) exporters.get(strategy);
+            exporter.export(pcjId, subGraph);
+        }
+    }
+    
+    private void simplifyVisibilities(VisibilityBindingSet result) {
+        // Simplify the result's visibilities.
+        final String visibility = result.getVisibility();
+        if(!simplifiedVisibilities.containsKey(visibility)) {
+            final String simplified = VisibilitySimplifier.simplify( visibility );
+            simplifiedVisibilities.put(visibility, simplified);
+        }
+        result.setVisibility( simplifiedVisibilities.get(visibility) );
+    }
+    
+    private void simplifyVisibilities(RyaSubGraph subgraph) throws UnsupportedEncodingException {
+        Set<RyaStatement> statements = subgraph.getStatements();
+        if (statements.size() > 0) {
+            byte[] visibilityBytes = statements.iterator().next().getColumnVisibility();
+            // Simplify the result's visibilities and cache new simplified
+            // visibilities
+            String visibility = new String(visibilityBytes, "UTF-8");
+            if (!simplifiedVisibilities.containsKey(visibility)) {
+                String simplified = VisibilitySimplifier.simplify(visibility);
+                simplifiedVisibilities.put(visibility, simplified);
+            }
+
+            for (RyaStatement statement : statements) {
+                statement.setColumnVisibility(simplifiedVisibilities.get(visibility).getBytes("UTF-8"));
+            }
+            
+            subgraph.setStatements(statements);
+        }
+    }
+    
+    public static class Builder {
+        
+        private Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> exporters = new HashMap<>();
+        
+        /**
+         * Add an {@link IncrementalResultExporter} to be used by this ExporterManager for exporting results
+         * @param exporter - IncrementalResultExporter for exporting query results
+         * @return - Builder for chaining method calls
+         */
+        public Builder addIncrementalResultExporter(IncrementalResultExporter exporter) {
+            
+            Set<QueryType> types = exporter.getQueryTypes();
+            ExportStrategy strategy = exporter.getExportStrategy();
+            
+            for (QueryType type : types) {
+                if (!exporters.containsKey(type)) {
+                    Map<ExportStrategy, IncrementalResultExporter> exportMap = new HashMap<>();
+                    exportMap.put(strategy, exporter);
+                    exporters.put(type, exportMap);
+                } else {
+                    Map<ExportStrategy, IncrementalResultExporter> exportMap = exporters.get(type);
+                    if (!exportMap.containsKey(strategy)) {
+                        exportMap.put(strategy, exporter);
+                    }
+                }
+            }
+            
+            return this;
+        }
+        
+        /**
+         * @return - ExporterManager for managing IncrementalResultExporters and exporting results
+         */
+        public ExporterManager build() {
+            //adds NoOpExporter in the event that users does not want to Export results
+            addIncrementalResultExporter(new NoOpExporter());
+            return new ExporterManager(exporters);
+        }
+        
+    }
+
+    @Override
+    public void close() throws Exception {
+        
+        Collection<Map<ExportStrategy, IncrementalResultExporter>> values = exporters.values();
+        
+        for(Map<ExportStrategy, IncrementalResultExporter> map: values) {
+            for(IncrementalResultExporter exporter: map.values()) {
+                exporter.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
index c2f4cb4..9877671 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
@@ -18,7 +18,6 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.export;
 
-import org.apache.fluo.api.client.TransactionBase;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -29,17 +28,16 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * other location.
  */
 @DefaultAnnotation(NonNull.class)
-public interface IncrementalBindingSetExporter extends AutoCloseable {
+public interface IncrementalBindingSetExporter extends IncrementalResultExporter {
 
     /**
      * Export a Binding Set that is a result of a SPARQL query that does not include a Group By clause.
      *
-     * @param tx - The Fluo transaction this export is a part of. (not null)
-     * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null)
+     * @param queryId - The PCJ ID of the SPARQL query the binding set is a result of. (not null)
      * @param bindingSetString - The Binding Set as it was represented within the Fluo application. (not null)
      * @throws ResultExportException The result could not be exported.
      */
-    public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException;
+    public void export(String queryId, VisibilityBindingSet result) throws ResultExportException;
 
     /**
      * A result could not be exported.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
deleted file mode 100644
index 1bf492a..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.observer.Observer.Context;
-
-/**
- * Builds instances of {@link IncrementalBindingSetExporter} using the provided
- * configurations.
- */
-@DefaultAnnotation(NonNull.class)
-public interface IncrementalBindingSetExporterFactory {
-
-    /**
-     * Builds an instance of {@link IncrementalBindingSetExporter} using the
-     * configurations that are provided.
-     *
-     * @param context - Contains the host application's configuration values
-     *   and any parameters that were provided at initialization. (not null)
-     * @return An exporter if configurations were found in the context; otherwise absent.
-     * @throws IncrementalExporterFactoryException A non-configuration related
-     *   problem has occurred and the exporter could not be created as a result.
-     * @throws ConfigurationException Thrown if configuration values were
-     *   provided, but an instance of the exporter could not be initialized
-     *   using them. This could be because they were improperly formatted,
-     *   a required field was missing, or some other configuration based problem.
-     */
-    public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
-
-    /**
-     * Indicates a {@link IncrementalBindingSetExporter} could not be created by a
-     * {@link IncrementalBindingSetExporterFactory}.
-     */
-    public static class IncrementalExporterFactoryException extends Exception {
-        private static final long serialVersionUID = 1L;
-
-        /**
-         * Constructs an instance of {@link }.
-         *
-         * @param message - Explains why this exception is being thrown.
-         */
-        public IncrementalExporterFactoryException(final String message) {
-            super(message);
-        }
-
-        /**
-         * Constructs an instance of {@link }.
-         *
-         * @param message - Explains why this exception is being thrown.
-         * @param cause - The exception that caused this one to be thrown.
-         */
-        public IncrementalExporterFactoryException(final String message, final Throwable t) {
-            super(message, t);
-        }
-    }
-
-    /**
-     * The configuration could not be interpreted because required fields were
-     * missing or a value wasn't properly formatted.
-     */
-    public static class ConfigurationException extends IncrementalExporterFactoryException {
-        private static final long serialVersionUID = 1L;
-
-        /**
-         * Constructs an instance of {@link ConfigurationException}.
-         *
-         * @param message - Explains why this exception is being thrown.
-         */
-        public ConfigurationException(final String message) {
-            super(message);
-        }
-
-        /**
-         * Constructs an instance of {@link ConfigurationException}.
-         *
-         * @param message - Explains why this exception is being thrown.
-         * @param cause - The exception that caused this one to be thrown.
-         */
-        public ConfigurationException(final String message, final Throwable cause) {
-            super(message, cause);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
new file mode 100644
index 0000000..e49a777
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import java.util.Set;
+
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+
+/**
+ * Common interface for the different incremental exporters used in the Rya Fluo Application.
+ *
+ */
+public interface IncrementalResultExporter extends AutoCloseable {
+
+    /**
+     * @return - A Set of {@link QueryType}s whose results this exporter handles
+     */
+    public Set<QueryType> getQueryTypes();
+    
+    /**
+     * @return - The {@link ExportStrategy} indicating where results are exported
+     */
+    public ExportStrategy getExportStrategy();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
new file mode 100644
index 0000000..5bba4ab
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+import com.google.common.base.Optional;
+
+import org.apache.fluo.api.observer.Observer.Context;
+
+/**
+ * Builds instances of {@link IncrementalResultExporter} using the provided
+ * configurations.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface IncrementalResultExporterFactory {
+
+    /**
+     * Builds an instance of {@link IncrementalResultExporter} using the
+     * configurations that are provided.
+     *
+     * @param context - Contains the host application's configuration values
+     *   and any parameters that were provided at initialization. (not null)
+     * @return An exporter if configurations were found in the context; otherwise absent.
+     * @throws IncrementalExporterFactoryException A non-configuration related
+     *   problem has occurred and the exporter could not be created as a result.
+     * @throws ConfigurationException Thrown if configuration values were
+     *   provided, but an instance of the exporter could not be initialized
+     *   using them. This could be because they were improperly formatted,
+     *   a required field was missing, or some other configuration based problem.
+     */
+    public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
+
+    /**
+     * Indicates a {@link IncrementalResultExporter} could not be created by a
+     * {@link IncrementalBindingSetExporterFactory}.
+     */
+    public static class IncrementalExporterFactoryException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link }.
+         *
+         * @param message - Explains why this exception is being thrown.
+         */
+        public IncrementalExporterFactoryException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link }.
+         *
+         * @param message - Explains why this exception is being thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public IncrementalExporterFactoryException(final String message, final Throwable t) {
+            super(message, t);
+        }
+    }
+
+    /**
+     * The configuration could not be interpreted because required fields were
+     * missing or a value wasn't properly formatted.
+     */
+    public static class ConfigurationException extends IncrementalExporterFactoryException {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link ConfigurationException}.
+         *
+         * @param message - Explains why this exception is being thrown.
+         */
+        public ConfigurationException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link ConfigurationException}.
+         *
+         * @param message - Explains why this exception is being thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public ConfigurationException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
index 797502c..7b7f084 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
@@ -25,7 +25,7 @@ import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter
  * from the Rya-Fluo application to the core Rya tables.
  *
  */
-public interface IncrementalRyaSubGraphExporter extends AutoCloseable {
+public interface IncrementalRyaSubGraphExporter extends IncrementalResultExporter {
 
     /**
      * Export a RyaSubGraph that is the result of SPARQL Construct Query.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
deleted file mode 100644
index ecbec09..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.rya.indexing.pcj.fluo.app.export;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import org.apache.fluo.api.observer.Observer.Context;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
-
-import com.google.common.base.Optional;
-
-/**
- * Builds instances of {@link IncrementalRyaSubGraphExporter} using the provided
- * configurations.
- */
-public interface IncrementalRyaSubGraphExporterFactory {
-
-    /**
-     * Builds an instance of {@link IncrementalRyaSubGraphExporter} using the
-     * configurations that are provided.
-     *
-     * @param context - Contains the host application's configuration values
-     *   and any parameters that were provided at initialization. (not null)
-     * @return An exporter if configurations were found in the context; otherwise absent.
-     * @throws IncrementalExporterFactoryException A non-configuration related
-     *   problem has occurred and the exporter could not be created as a result.
-     * @throws ConfigurationException Thrown if configuration values were
-     *   provided, but an instance of the exporter could not be initialized
-     *   using them. This could be because they were improperly formatted,
-     *   a required field was missing, or some other configuration based problem.
-     */
-    public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
new file mode 100644
index 0000000..ab7f2ed
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import java.util.Set;
+
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class is a NoOpExporter that can be specified if a user does not
+ * want their results exported from Fluo.
+ *
+ */
+public class NoOpExporter implements IncrementalBindingSetExporter, IncrementalRyaSubGraphExporter {
+
+    @Override
+    public Set<QueryType> getQueryTypes() {
+        return Sets.newHashSet(QueryType.CONSTRUCT, QueryType.PROJECTION);
+    }
+
+    @Override
+    public ExportStrategy getExportStrategy() {
+        return ExportStrategy.NO_OP_EXPORT;
+    }
+
+    @Override
+    public void close() throws Exception {
+    }
+
+    @Override
+    public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException {
+    }
+
+    @Override
+    public void export(String queryId, VisibilityBindingSet result) throws ResultExportException {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
index 7c4b3cc..0c26d65 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
@@ -20,18 +20,21 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.fluo.api.client.TransactionBase;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.log4j.Logger;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
+import com.google.common.collect.Sets;
+
 /**
  * Incrementally exports SPARQL query results to Kafka topics.
  */
@@ -57,17 +60,15 @@ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter {
      * Send the results to the topic using the queryID as the topicname
      */
     @Override
-    public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
-        checkNotNull(fluoTx);
+    public void export(final String queryId, final VisibilityBindingSet result) throws ResultExportException {
         checkNotNull(queryId);
         checkNotNull(result);
         try {
-            final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
-            final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
+            final String msg = "Out to Kafka topic: " + queryId + ", Result: " + result;
             log.trace(msg);
 
             // Send the result to the topic whose name matches the PCJ ID.
-            final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result);
+            final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(queryId, result);
             final Future<RecordMetadata> future = producer.send(rec);
 
             // Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
@@ -84,4 +85,14 @@ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter {
     public void close() throws Exception {
         producer.close(5, TimeUnit.SECONDS);
     }
+
+    @Override
+    public Set<QueryType> getQueryTypes() {
+        return Sets.newHashSet(QueryType.PROJECTION);
+    }
+
+    @Override
+    public ExportStrategy getExportStrategy() {
+        return ExportStrategy.KAFKA;
+    }
 }
\ No newline at end of file