You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/25 19:35:34 UTC

[3/5] incubator-rya git commit: RYA-246-Query-Export-Strategy. Closes #213.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index 65db02c..17ab14f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -27,7 +27,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
@@ -92,9 +93,9 @@ public class FluoQuery {
         this.filterMetadata = requireNonNull(filterMetadata);
         this.joinMetadata = requireNonNull(joinMetadata);
         if(constructMetadata.isPresent()) {
-            this.type = QueryType.Construct;
+            this.type = QueryType.CONSTRUCT;
         } else {
-            this.type = QueryType.Projection;
+            this.type = QueryType.PROJECTION;
         }
     }
     
@@ -568,8 +569,9 @@ public class FluoQuery {
         
         /**
          * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder.
+         * @throws UnsupportedQueryException 
          */
-        public FluoQuery build() {
+        public FluoQuery build() throws UnsupportedQueryException {
             checkArgument((projectionBuilders.size() > 0 || constructBuilder != null));
             
             Optional<PeriodicQueryMetadata.Builder> optionalPeriodicQueryBuilder = getPeriodicQueryBuilder();
@@ -603,12 +605,18 @@ public class FluoQuery {
                 aggregateMetadata.put(entry.getKey(), entry.getValue().build());
             }
 
+            QueryMetadata qMetadata = queryBuilder.build();
+            
             if(constructBuilder != null) {
                 if(periodicQueryMetadata != null) {
-                    throw new IllegalArgumentException("Queries containing sliding window filters and construct query patterns are not supported.");
+                    throw new UnsupportedQueryException("Queries containing sliding window filters and construct query patterns are not supported.");
                 }
-                return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.of(constructBuilder.build()), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+                return new FluoQuery(qMetadata, projectionMetadata.build(), Optional.of(constructBuilder.build()), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
             } else {
+                if(aggregationBuilders.size() > 0 && qMetadata.getQueryType() == QueryType.PROJECTION && qMetadata.getExportStrategies().contains(ExportStrategy.RYA)) {
+                    throw new UnsupportedQueryException("Exporting to Rya PCJ tables is currently not supported for queries containing aggregations.");
+                }
+                
                 return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.absent(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
             }
             

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 2eae4ff..8569a48 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -162,32 +162,6 @@ public class FluoQueryColumns {
      */
     public static final Column TRIPLES = new Column("triples", "SPO");
 
-    /**
-     * Stores the Rya assigned PCJ ID that the query's results reflect. This
-     * value defines where the results will be exported to.
-     * <p>
-     *   <table border="1" style="width:100%">
-     *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
-     *     <tr> <td>Query ID</td> <td>query:ryaPcjId</td> <td>Identifies which PCJ the results of this query will be exported to.</td> </tr>
-     *   </table>
-     * </p>
-     */
-    public static final Column RYA_PCJ_ID = new Column("query", "ryaPcjId");
-
-    /**
-     * Associates a PCJ ID with a Query ID. This enables a quick lookup of the Query ID from the PCJ ID and is useful of Deleting PCJs.
-     * <p>
-     *   <table border="1" style="width:100%">
-     *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
-     *     <tr> <td>PCJ ID</td> <td>ryaPcjId:queryId</td> <td>Identifies which Query ID is associated with the given PCJ ID.</td> </tr>
-     *   </table>
-     * </p>
-     */
-    public static final Column PCJ_ID_QUERY_ID = new Column("ryaPcjId", "queryId");
-
-    // Sparql to Query ID used to list all queries that are in the system.
-    public static final Column QUERY_ID = new Column("sparql", "queryId");
-
     // Query Metadata columns.
     public static final Column QUERY_NODE_ID = new Column(QUERY_METADATA_CF, "nodeId");
     public static final Column QUERY_VARIABLE_ORDER = new Column(QUERY_METADATA_CF, "variableOrder");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 1c34836..d5d9fe7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -34,11 +34,11 @@ import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
@@ -585,21 +585,13 @@ public class FluoQueryMetadataDAO {
         requireNonNull(tx);
         requireNonNull(query);
         
-        QueryMetadata queryMetadata = query.getQueryMetadata();
-        final String sparql = queryMetadata.getSparql();
-        final String queryId = queryMetadata.getNodeId();
-        final String pcjId = queryMetadata.getExportId();
-        
         // The results of the query are eventually exported to an instance
         // of Rya, so store the Rya ID for the PCJ.
-        tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
-        tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
-        tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
-        write(tx, queryMetadata);
+        write(tx, query.getQueryMetadata());
 
         // Write the rest of the metadata objects.
         
-        if (query.getQueryType() == QueryType.Construct) {
+        if (query.getQueryType() == QueryType.CONSTRUCT) {
             ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get();
             write(tx, constructMetadata);
         }
@@ -636,8 +628,9 @@ public class FluoQueryMetadataDAO {
      * @param sx - The snapshot that will be used to read the metadata from the Fluo table. (not null)
      * @param queryId - The ID of the query whose nodes will be read. (not null)
      * @return The {@link FluoQuery} that was read from table.
+     * @throws UnsupportedQueryException 
      */
-    public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) {
+    public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) throws UnsupportedQueryException {
         requireNonNull(sx);
         requireNonNull(queryId);
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index e46b405..40c9e03 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -24,12 +24,11 @@ import java.util.Optional;
 import java.util.Set;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 
 import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 7bf6f45..7b21575 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -40,12 +40,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
@@ -106,7 +106,7 @@ public class SparqlFluoQueryBuilder {
   
     //Default behavior is to export to Kafka - subject to change when user can 
     //specify their own export strategy
-    private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.Kafka));
+    private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.KAFKA));
     
     public SparqlFluoQueryBuilder setSparql(String sparql) {
         this.sparql = Preconditions.checkNotNull(sparql);
@@ -145,7 +145,7 @@ public class SparqlFluoQueryBuilder {
         return this;
     }
     
-    public FluoQuery build() {
+    public FluoQuery build() throws UnsupportedQueryException {
         Preconditions.checkNotNull(sparql);
         Preconditions.checkNotNull(queryId);
         Preconditions.checkNotNull(exportStrategies);
@@ -172,10 +172,12 @@ public class SparqlFluoQueryBuilder {
         QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId);
         //sets {@link QueryType} and VariableOrder
         setVarOrderAndQueryType(queryBuilder, te);
-        queryBuilder.setSparql(sparql);
-        queryBuilder.setChildNodeId(childNodeId);
-        queryBuilder.setExportStrategies(exportStrategies);
-        queryBuilder.setJoinBatchSize(joinBatchSize);
+        queryBuilder
+            .setSparql(sparql)
+            .setChildNodeId(childNodeId)
+            .setExportStrategies(exportStrategies)
+            .setJoinBatchSize(joinBatchSize);
+        
         fluoQueryBuilder.setQueryMetadata(queryBuilder);
         
         setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId);
@@ -800,7 +802,7 @@ public class SparqlFluoQueryBuilder {
             }
             
             if(queryType == null) {
-                queryType = QueryType.Projection;
+                queryType = QueryType.PROJECTION;
             }
             super.meet(node);
         }
@@ -811,14 +813,14 @@ public class SparqlFluoQueryBuilder {
             }
             
             if(queryType == null) {
-                queryType = QueryType.Construct;
+                queryType = QueryType.CONSTRUCT;
             }
             super.meet(node);
         }
         
         public void meetOther(final QueryModelNode node) throws Exception {
             if (node instanceof PeriodicQueryNode) {
-                queryType = QueryType.Periodic;
+                queryType = QueryType.PERIODIC;
             } else {
                 super.meetOther(node);
             }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java
new file mode 100644
index 0000000..155b8da
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+
+/**
+ * This Exception thrown if the Rya Fluo Application does not support
+ * the given SPARQL query.  This could happen for a number of reasons. The
+ * two most common reasons are that the query possesses some combination of query nodes
+ * that the application can't evaluate, or that the {@link ExportStrategy} of the query
+ * is incompatible with one of its query nodes.  
+ *
+ */
+public class UnsupportedQueryException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public UnsupportedQueryException(final String message) {
+        super(message);
+    }
+
+    public UnsupportedQueryException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
index ac41160..7a5b439 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.util;
 
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
@@ -60,6 +61,13 @@ public class FluoQueryUtils {
     }
     
     /**
+     * @return - A new pcjId, which is a UUID with all dashes removed
+     */
+    public static String createNewPcjId() {
+        return UUID.randomUUID().toString().replaceAll("-", "");
+    }
+    
+    /**
      * Uses a {@link NodeIdCollector} visitor to do a pre-order traverse of the
      * FluoQuery and gather the nodeIds of the metadata nodes.
      * @param query - FluoQuery to be traversed

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
index b9c10d4..cd21ed6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
@@ -27,12 +27,13 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameterBase;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters;
 import org.junit.Test;
 
 /**
- * Tests the methods of {@link KafkaExportParameters}.
+ * Tests the methods of {@link KafkaExportParameterBase}.
  */
 public class KafkaExportParametersTest {
 
@@ -41,19 +42,19 @@ public class KafkaExportParametersTest {
         final Map<String, String> params = new HashMap<>();
 
         // Load some values into the params using the wrapper.
-        final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
-        kafkaParams.setExportToKafka(true);
+        final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(params);
+        kafkaParams.setUseKafkaBindingSetExporter(true);
 
         // Ensure the params map has the expected values.
         final Map<String, String> expectedParams = new HashMap<>();
-        expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "true");
-        assertTrue(kafkaParams.isExportToKafka());
+        expectedParams.put(KafkaBindingSetExporterParameters.CONF_USE_KAFKA_BINDING_SET_EXPORTER, "true");
+        assertTrue(kafkaParams.getUseKafkaBindingSetExporter());
         assertEquals(expectedParams, params);
 
         // now go the other way.
-        expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "false");
-        kafkaParams.setExportToKafka(false);
-        assertFalse(kafkaParams.isExportToKafka());
+        expectedParams.put(KafkaBindingSetExporterParameters.CONF_USE_KAFKA_BINDING_SET_EXPORTER, "false");
+        kafkaParams.setUseKafkaBindingSetExporter(false);
+        assertFalse(kafkaParams.getUseKafkaBindingSetExporter());
         assertEquals(expectedParams, params);
     }
     @Test
@@ -68,7 +69,7 @@ public class KafkaExportParametersTest {
         // Make sure export key1 is NOT kept separate from producer config key1
         // This is a change, originally they were kept separate.
         params.put(key1, value1First);
-        final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
+        final KafkaExportParameterBase kafkaParams = new KafkaExportParameterBase(params);
         // Load some values into the properties using the wrapper.
         Properties props = new Properties();
         props.put(key1, value1Second);
@@ -87,8 +88,8 @@ public class KafkaExportParametersTest {
         final Map<String, String> params = new HashMap<>();
 
         // Ensure an unconfigured parameters map will say kafka export is disabled.
-        final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
-        assertFalse(kafkaParams.isExportToKafka());
+        final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(params);
+        assertFalse(kafkaParams.getUseKafkaBindingSetExporter());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
index 9ac5139..5653312 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
@@ -37,7 +37,7 @@ public class RyaExportParametersTest {
 
         // Load some values into the params using the wrapper.
         final RyaExportParameters ryaParams = new RyaExportParameters(params);
-        ryaParams.setExportToRya(true);
+        ryaParams.setUseRyaBindingSetExporter(true);
         ryaParams.setAccumuloInstanceName("demoAccumulo");
         ryaParams.setZookeeperServers("zoo1;zoo2");
         ryaParams.setExporterUsername("fluo");
@@ -45,7 +45,7 @@ public class RyaExportParametersTest {
 
         // Ensure the params map has the expected values.
         final Map<String, String> expectedParams = new HashMap<>();
-        expectedParams.put(RyaExportParameters.CONF_EXPORT_TO_RYA, "true");
+        expectedParams.put(RyaExportParameters.CONF_USE_RYA_BINDING_SET_EXPORTER, "true");
         expectedParams.put(RyaExportParameters.CONF_ACCUMULO_INSTANCE_NAME, "demoAccumulo");
         expectedParams.put(RyaExportParameters.CONF_ZOOKEEPER_SERVERS, "zoo1;zoo2");
         expectedParams.put(RyaExportParameters.CONF_EXPORTER_USERNAME, "fluo");
@@ -60,6 +60,6 @@ public class RyaExportParametersTest {
 
         // Ensure an unconfigured parameters map will say rya export is disabled.
         final RyaExportParameters ryaParams = new RyaExportParameters(params);
-        assertFalse(ryaParams.isExportToRya());
+        assertFalse(ryaParams.getUseRyaBindingSetExporter());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
index b40ba3f..55455a7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
@@ -154,7 +154,7 @@ public class PeriodicQueryUtilTest {
     }
     
     @Test
-    public void testFluoQueryVarOrders() throws MalformedQueryException {
+    public void testFluoQueryVarOrders() throws MalformedQueryException, UnsupportedQueryException {
         String query = "prefix function: <http://org.apache.rya/function#> " //n
                 + "prefix time: <http://www.w3.org/2006/time#> " //n
                 + "select (count(?obs) as ?total) where {" //n

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
index 5c89a75..48f2f39 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
@@ -29,7 +29,7 @@ import org.junit.Test;
 public class QueryMetadataVisitorTest {
 
     @Test
-    public void builderTest() {
+    public void builderTest() throws UnsupportedQueryException {
         String query = "prefix function: <http://org.apache.rya/function#> " // n
                 + "prefix time: <http://www.w3.org/2006/time#> " // n
                 + "select ?id (count(?obs) as ?total) where {" // n

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
index 901f39d..cc74f6b 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand.ArgumentsException;
 import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand.ExecutionException;
 import org.apache.rya.indexing.pcj.fluo.client.command.CountUnprocessedStatementsCommand;
@@ -152,6 +153,9 @@ public class PcjAdminClient {
             System.err.println("Could not execute the command.");
             e.printStackTrace();
             System.exit(-1);
+        } catch (UnsupportedQueryException e) {
+            System.err.println("Could not execute the command because the query is invalid.");
+            e.printStackTrace();
         } finally {
             log.trace("Shutting down the PCJ Admin Client.");
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
index 2b3b105..a944b33 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
@@ -24,6 +24,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 import org.apache.accumulo.core.client.Connector;
 
 import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.rdftriplestore.RyaSailRepository;
 
 /**
@@ -57,13 +58,14 @@ public interface PcjAdminClientCommand {
      * @param rya - A connection to the Rya instance used to search for historic PCJ matches. (not null)
      * @param client - A connection to the Fluo app that is updating the PCJs. (not null)
      * @param args - Command line arguments that configure how the command will execute. (not null)
+     * @throws UnsupportedQueryException 
      */
     public void execute(
             final Connector accumulo,
             final String ryaTablePrefix,
             final RyaSailRepository rya,
             final FluoClient fluo,
-            final String[] args) throws ArgumentsException, ExecutionException;
+            final String[] args) throws ArgumentsException, ExecutionException, UnsupportedQueryException;
 
     /**
      * A {@link PcjAdminClientCommand} could not be executed because of a problem with

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
index 3f335f4..78515d9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
@@ -42,6 +42,7 @@ import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand;
 import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest;
 import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -94,7 +95,7 @@ public class NewQueryCommand implements PcjAdminClientCommand {
     }
 
     @Override
-    public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException {
+    public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException, UnsupportedQueryException {
         checkNotNull(accumulo);
         checkNotNull(fluo);
         checkNotNull(args);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java
index 675a844..2a7f787 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java
@@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand;
 import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport;
 import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.fluo.client.util.QueryReportRenderer;
 
 import com.beust.jcommander.JCommander;
@@ -69,7 +70,7 @@ public class QueryReportCommand implements PcjAdminClientCommand {
     }
 
     @Override
-    public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException {
+    public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException, UnsupportedQueryException {
         checkNotNull(accumulo);
         checkNotNull(ryaTablePrefix);
         checkNotNull(rya);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
index f44db6c..d1b3e25 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
@@ -20,12 +20,9 @@ package org.apache.rya.indexing.pcj.fluo.client.util;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
@@ -38,6 +35,9 @@ import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 /**
  * Pretty renders a {@link QueryReport}.
  */
@@ -70,7 +70,7 @@ public class QueryReportRenderer {
         
         
         
-        if (metadata.getQueryType() == QueryType.Construct) {
+        if (metadata.getQueryType() == QueryType.CONSTRUCT) {
             builder.appendItem( new ReportItem("") );
             
             final ConstructQueryMetadata constructMetadata = metadata.getConstructQueryMetadata().get();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java
index e8f10b8..1ae02dd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java
@@ -304,7 +304,7 @@ public class DemoDriver {
         // Provide export parameters child test classes may provide to the export observer.
         final HashMap<String, String> exportParams = new HashMap<>();
         final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
-        ryaParams.setExportToRya(true);
+        ryaParams.setUseRyaBindingSetExporter(true);
         ryaParams.setAccumuloInstanceName(accumulo.getInstanceName());
         ryaParams.setZookeeperServers(accumulo.getZooKeepers());
         ryaParams.setExporterUsername("root");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index f25b573..4070849 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -18,7 +18,6 @@
  */
 package org.apache.rya.indexing.pcj.fluo.demo;
 
-import java.io.IOException;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.Connector;
@@ -35,22 +34,20 @@ import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.api.resolver.RyaToRdfConversions;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.rdftriplestore.RyaSailRepository;
 import org.openrdf.model.Statement;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
 import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
 import org.openrdf.repository.RepositoryConnection;
 import org.openrdf.repository.RepositoryException;
-import org.openrdf.sail.SailException;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
@@ -181,7 +178,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
             // Tell the Fluo app to maintain it.
             new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix);
 
-        } catch (MalformedQueryException | PcjException | RyaDAOException e) {
+        } catch (MalformedQueryException | PcjException | RyaDAOException | UnsupportedQueryException e) {
             throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index 263a19e..7676657 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -32,6 +32,7 @@ import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException;
 import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
@@ -53,7 +54,7 @@ import com.google.common.collect.Sets;
 public class GetPcjMetadataIT extends RyaExportITBase {
 
     @Test
-    public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, RyaDAOException {
+    public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, RyaDAOException, UnsupportedQueryException {
         final String sparql =
                 "SELECT ?x " +
                   "WHERE { " +
@@ -82,7 +83,7 @@ public class GetPcjMetadataIT extends RyaExportITBase {
     }
 
     @Test
-    public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException {
+    public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException, UnsupportedQueryException {
         final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
index e3914bd..3310690 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
@@ -18,7 +18,7 @@
  */
 package org.apache.rya.indexing.pcj.fluo.api;
 
-import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_NODE_ID;
 import static org.junit.Assert.assertEquals;
 
 import java.util.List;
@@ -49,10 +49,10 @@ public class ListQueryIdsIT extends RyaExportITBase {
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Store a few SPARQL/Query ID pairs in the Fluo table.
             try(Transaction tx = fluoClient.newTransaction()) {
-                tx.set("SPARQL_3", QUERY_ID, "ID_3");
-                tx.set("SPARQL_1", QUERY_ID, "ID_1");
-                tx.set("SPARQL_4", QUERY_ID, "ID_4");
-                tx.set("SPARQL_2", QUERY_ID, "ID_2");
+                tx.set("SPARQL_3", QUERY_NODE_ID, "ID_3");
+                tx.set("SPARQL_1", QUERY_NODE_ID, "ID_1");
+                tx.set("SPARQL_4", QUERY_NODE_ID, "ID_4");
+                tx.set("SPARQL_2", QUERY_NODE_ID, "ID_2");
                 tx.commit();
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index 315dddb..45492de 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -29,9 +29,9 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
@@ -148,11 +148,11 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         // Create the object that will be serialized.
         String queryId = NodeType.generateNewFluoIdForType(NodeType.QUERY);
         final QueryMetadata.Builder builder = QueryMetadata.builder(queryId);
-        builder.setQueryType(QueryType.Projection);
+        builder.setQueryType(QueryType.PROJECTION);
         builder.setVarOrder(new VariableOrder("y;s;d"));
         builder.setSparql("sparql string");
         builder.setChildNodeId("childNodeId");
-        builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.Kafka)));
+        builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.KAFKA)));
         final QueryMetadata originalMetadata = builder.build();
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -338,7 +338,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
     }
 
     @Test
-    public void fluoQueryTest() throws MalformedQueryException {
+    public void fluoQueryTest() throws MalformedQueryException, UnsupportedQueryException {
         final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
 
         // Create the object that will be serialized.
@@ -357,7 +357,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
         final FluoQuery originalQuery = builder.build();
 
-        assertEquals(QueryType.Projection, originalQuery.getQueryType());
+        assertEquals(QueryType.PROJECTION, originalQuery.getQueryType());
         assertEquals(false, originalQuery.getConstructQueryMetadata().isPresent());
         
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -379,7 +379,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
     }
     
     @Test
-    public void fluoConstructQueryTest() throws MalformedQueryException {
+    public void fluoConstructQueryTest() throws MalformedQueryException, UnsupportedQueryException {
         final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
 
         // Create the object that will be serialized.
@@ -398,7 +398,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
         final FluoQuery originalQuery = builder.build();
         
-        assertEquals(QueryType.Construct, originalQuery.getQueryType());
+        assertEquals(QueryType.CONSTRUCT, originalQuery.getQueryType());
         assertEquals(true, originalQuery.getConstructQueryMetadata().isPresent());
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -421,7 +421,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
     
     
     @Test
-    public void fluoNestedQueryTest() throws MalformedQueryException {
+    public void fluoNestedQueryTest() throws MalformedQueryException, UnsupportedQueryException {
         final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
 
         // Create the object that will be serialized.
@@ -442,7 +442,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
         final FluoQuery originalQuery = builder.build();
         
-        assertEquals(QueryType.Projection, originalQuery.getQueryType());
+        assertEquals(QueryType.PROJECTION, originalQuery.getQueryType());
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Write it to the Fluo table.
@@ -463,7 +463,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
     }
     
     @Test
-    public void fluoNestedConstructQueryTest() throws MalformedQueryException {
+    public void fluoNestedConstructQueryTest() throws MalformedQueryException, UnsupportedQueryException {
         final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
 
         // Create the object that will be serialized.
@@ -488,7 +488,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
         final FluoQuery originalQuery = builder.build();
         
-        assertEquals(QueryType.Construct, originalQuery.getQueryType());
+        assertEquals(QueryType.CONSTRUCT, originalQuery.getQueryType());
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Write it to the Fluo table.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
index 32d0e41..47a2f29 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
@@ -53,6 +53,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
@@ -343,7 +344,7 @@ public class BatchIT extends RyaExportITBase {
         return statements;
     }
 
-    private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) {
+    private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) throws UnsupportedQueryException {
         List<String> nodeStrings;
         try (Snapshot sx = fluoClient.newSnapshot()) {
             FluoQuery query = dao.readFluoQuery(sx, queryId);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 7c4caa4..a1d76cb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -33,6 +33,7 @@ import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.client.scanner.RowScanner;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Span;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj;
@@ -79,7 +80,7 @@ public class CreateDeleteIT extends RyaExportITBase {
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Ensure the data was loaded.
             final List<Bytes> rows = getFluoTableEntries(fluoClient);
-            assertEquals(20, rows.size());
+            assertEquals(18, rows.size());
 
             // Delete the PCJ from the Fluo application.
             new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
@@ -111,7 +112,7 @@ public class CreateDeleteIT extends RyaExportITBase {
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Ensure the data was loaded.
             final List<Bytes> rows = getFluoTableEntries(fluoClient);
-            assertEquals(12, rows.size());
+            assertEquals(10, rows.size());
 
             // Delete the PCJ from the Fluo application.
             new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
@@ -130,7 +131,7 @@ public class CreateDeleteIT extends RyaExportITBase {
         // Register the PCJ with Rya.
         final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), getAccumuloConnector());
 
-        final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
+        final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet(ExportStrategy.NO_OP_EXPORT));
 
         // Write the data to Rya.
         final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index f9f55d0..8911f56 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -92,6 +92,8 @@ public class KafkaExportIT extends KafkaExportITBase {
         // Create the PCJ in Fluo and load the statements into Rya.
         final String pcjId = loadData(sparql, statements);
 
+        FluoITHelper.printFluoTable(super.getFluoConfiguration());
+        
         // The expected results of the SPARQL query once the PCJ has been computed.
         final Set<BindingSet> expectedResult = new HashSet<>();
 
@@ -590,9 +592,9 @@ public class KafkaExportIT extends KafkaExportITBase {
         // Read all of the results from the Kafka topic.
         final Set<VisibilityBindingSet> results = new HashSet<>();
 
-        try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
-            final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000);
-            final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator();
+        try(final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
+            final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
+            final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
             while (recordIterator.hasNext()) {
                 results.add( recordIterator.next().value() );
             }
@@ -607,9 +609,9 @@ public class KafkaExportIT extends KafkaExportITBase {
         // Read the results from the Kafka topic. The last one has the final aggregation result.
         VisibilityBindingSet result = null;
 
-        try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
-            final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000);
-            final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator();
+        try(final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
+            final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
+            final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
             while (recordIterator.hasNext()) {
                 result = recordIterator.next().value();
             }
@@ -625,9 +627,9 @@ public class KafkaExportIT extends KafkaExportITBase {
         // The key in this map is a Binding Set containing only the group by variables.
         final Map<BindingSet, VisibilityBindingSet> results = new HashMap<>();
 
-        try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
-            final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000);
-            final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator();
+        try(final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
+            final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
+            final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
             while (recordIterator.hasNext()) {
                 final VisibilityBindingSet visBindingSet = recordIterator.next().value();
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
index ca8de0d..b2944ca 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
@@ -33,13 +33,12 @@ import java.util.stream.Collectors;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.recipes.test.FluoITHelper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaSubGraph;
@@ -48,13 +47,14 @@ import org.apache.rya.api.domain.RyaURI;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
 import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
@@ -88,22 +88,18 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
         observers.add(new ObserverSpecification(FilterObserver.class.getName()));
         observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
         observers.add(new ObserverSpecification(ProjectionObserver.class.getName()));
+        observers.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName()));
+        
 
         // Configure the export observer to export new PCJ results to the mini
         // accumulo cluster.
         final HashMap<String, String> exportParams = new HashMap<>();
 
-        final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
-        kafkaParams.setExportToKafka(true);
-
-        // Configure the Kafka Producer
-        final Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName());
-        kafkaParams.addAllProducerConfig(producerConfig);
+        final KafkaSubGraphExporterParameters kafkaParams = new KafkaSubGraphExporterParameters(exportParams);
+        kafkaParams.setUseKafkaSubgraphExporter(true);
+        kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT);
 
-        final ObserverSpecification exportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
+        final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(),
                 exportParams);
         observers.add(exportObserverConfig);
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 6ecec02..0aefaca 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -34,9 +34,11 @@ import javax.xml.datatype.DatatypeFactory;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
@@ -877,7 +879,7 @@ public class QueryIT extends RyaExportITBase {
         runTest(query, statements, expectedResults, ExporterType.Periodic);
     }
 
-    @Test(expected= IllegalArgumentException.class)
+    @Test(expected= UnsupportedQueryException.class)
     public void nestedConstructPeriodicQueryWithAggregationAndGroupBy() throws Exception {
         String query = "prefix function: <http://org.apache.rya/function#> " // n
                 + "prefix time: <http://www.w3.org/2006/time#> " // n
@@ -924,7 +926,7 @@ public class QueryIT extends RyaExportITBase {
             PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName());
             String periodicId = periodicStorage.createPeriodicQuery(sparql);
             try (FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) {
-                new CreateFluoPcj().createPcj(periodicId, sparql, fluo);
+                new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo);
             }
             addStatementsAndWait(statements);
             

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index c828a20..ed9ce60 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -37,25 +37,25 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.recipes.test.AccumuloExportITBase;
-import org.apache.fluo.recipes.test.FluoITHelper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.client.Install.InstallConfiguration;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer;
 import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
@@ -74,6 +74,8 @@ import org.openrdf.model.Statement;
 import org.openrdf.repository.sail.SailRepositoryConnection;
 import org.openrdf.sail.Sail;
 
+import com.google.common.collect.Sets;
+
 import kafka.admin.AdminUtils;
 import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
@@ -119,41 +121,20 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         observers.add(new ObserverSpecification(FilterObserver.class.getName()));
         observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
         observers.add(new ObserverSpecification(ProjectionObserver.class.getName()));
+        observers.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName()));
 
         // Configure the export observer to export new PCJ results to the mini
         // accumulo cluster.
         final HashMap<String, String> exportParams = new HashMap<>();
-
-        final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
-        kafkaParams.setExportToKafka(true);
-
-        // Configure the Kafka Producer
-        final Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
-        kafkaParams.addAllProducerConfig(producerConfig);
+        final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(exportParams);
+        kafkaParams.setUseKafkaBindingSetExporter(true);
+        kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT);
+        
+        final KafkaSubGraphExporterParameters kafkaConstructParams = new KafkaSubGraphExporterParameters(exportParams);
+        kafkaConstructParams.setUseKafkaSubgraphExporter(true);
 
         final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
         observers.add(exportObserverConfig);
-        
-        //create construct query observer and tell it not to export to Kafka
-        //it will only add results back into Fluo
-        HashMap<String, String> constructParams = new HashMap<>();
-        final KafkaExportParameters kafkaConstructParams = new KafkaExportParameters(constructParams);
-        kafkaConstructParams.setExportToKafka(true);
-        
-        // Configure the Kafka Producer
-        final Properties constructProducerConfig = new Properties();
-        constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
-        constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName());
-        kafkaConstructParams.addAllProducerConfig(constructProducerConfig);
-
-        final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
-                constructParams);
-        observers.add(constructExportObserverConfig);
 
         // Add the observers to the Fluo Configuration.
         super.getFluoConfiguration().addObservers(observers);
@@ -323,21 +304,19 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         consumer.close();
     }
 
-    protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) {
+    protected KafkaConsumer<String, VisibilityBindingSet> makeConsumer(final String TopicName) {
         // setup consumer
         final Properties consumerProps = new Properties();
         consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
         consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
         consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
-        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.IntegerDeserializer");
-        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoVisibilityBindingSetSerializer.class.getName());
 
         // to make sure the consumer starts from the beginning of the topic
         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
+        final KafkaConsumer<String, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
         consumer.subscribe(Arrays.asList(TopicName));
         return consumer;
     }
@@ -353,7 +332,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER,
                 ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn);
 
-        final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
+        final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA));
 
         // Write the data to Rya.
         final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
index 9c5732f..1c02db3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
@@ -65,7 +65,8 @@ public class RyaExportITBase extends FluoITBase {
         // Configure the export observer to export new PCJ results to the mini accumulo cluster.
         final HashMap<String, String> exportParams = new HashMap<>();
         final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
-        ryaParams.setExportToRya(true);
+        ryaParams.setUseRyaBindingSetExporter(true);
+        ryaParams.setUsePeriodicBindingSetExporter(true);
         ryaParams.setRyaInstanceName(getRyaInstanceName());
         ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName());
         ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
index 4d1bc75..cf24974 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
@@ -26,6 +26,7 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.fluo.recipes.test.AccumuloExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
 import org.apache.rya.periodic.notification.notification.TimestampedNotification;
@@ -38,7 +39,7 @@ import org.junit.Assert;
 public class PeriodicNotificationProviderIT extends AccumuloExportITBase {
 
     @Test
-    public void testProvider() throws MalformedQueryException, InterruptedException {
+    public void testProvider() throws MalformedQueryException, InterruptedException, UnsupportedQueryException {
         
         String sparql = "prefix function: <http://org.apache.rya/function#> " // n
                 + "prefix time: <http://www.w3.org/2006/time#> " // n

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
index 27acc9c..bb98b7f 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
@@ -38,12 +38,10 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.core.client.FluoClientImpl;
-import org.apache.fluo.recipes.test.FluoITHelper;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
@@ -252,14 +250,14 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
         }
     }
     
-    private void compareFluoCounts(FluoClient client, String queryId, long bin) {
+    private void compareFluoCounts(FluoClient client, String pcjId, long bin) {
         QueryBindingSet bs = new QueryBindingSet();
         bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG));
         
         VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID);
         
         try(Snapshot sx = client.newSnapshot()) {
-            String fluoQueryId = sx.get(Bytes.of(queryId), FluoQueryColumns.PCJ_ID_QUERY_ID).toString();
+            String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
             Set<String> ids = new HashSet<>();
             PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, ids);
             for(String id: ids) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
index 6aade52..60a3e7c 100644
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
@@ -21,8 +21,10 @@ package org.apache.rya.periodic.notification.api;
 import java.util.Optional;
 
 import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
@@ -32,7 +34,7 @@ import org.apache.rya.periodic.notification.notification.PeriodicNotification;
 import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.algebra.evaluation.function.Function;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 
 /**
  * Object that creates a Periodic Query.  A Periodic Query is any query
@@ -82,17 +84,22 @@ public class CreatePeriodicQuery {
             Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
             if(optNode.isPresent()) {
                 PeriodicQueryNode periodicNode = optNode.get();
+                String pcjId = FluoQueryUtils.createNewPcjId();
+               
+                //register query with Fluo
                 CreateFluoPcj createPcj = new CreateFluoPcj();
-                String queryId = createPcj.createPcj(sparql, fluoClient).getQueryId();
-                queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId);
-                periodicStorage.createPeriodicQuery(queryId, sparql);
-                PeriodicNotification notification = PeriodicNotification.builder().id(queryId).period(periodicNode.getPeriod())
+                createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluoClient);
+                
+                //register query with PeriodicResultStorage table
+                periodicStorage.createPeriodicQuery(pcjId, sparql);
+                //create notification
+                PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod())
                         .timeUnit(periodicNode.getUnit()).build();
                 return notification;
             } else {
                 throw new RuntimeException("Invalid PeriodicQuery.  Query must possess a PeriodicQuery Filter.");
             }
-        } catch (MalformedQueryException | PeriodicQueryStorageException e) {
+        } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) {
             throw new RuntimeException(e);
         }
     }