You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/18 18:51:27 UTC

[2/5] incubator-rya git commit: RYA-282-Nested-Query. Closes #192.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
index 7a73b41..f44db6c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
@@ -25,10 +25,12 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.apache.rya.indexing.pcj.fluo.client.util.Report.ReportItem;
@@ -56,26 +58,41 @@ public class QueryReportRenderer {
 
         final FluoQuery metadata = queryReport.getFluoQuery();
 
-        switch (metadata.getQueryType()) {
-        case Projection:
-            final QueryMetadata queryMetadata = metadata.getQueryMetadata().get();
-            builder.appendItem(new ReportItem("QUERY NODE"));
-            builder.appendItem(new ReportItem("Node ID", queryMetadata.getNodeId()));
-            builder.appendItem(new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()));
-            builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(queryMetadata.getSparql())));
-            builder.appendItem(new ReportItem("Child Node ID", queryMetadata.getChildNodeId()));
-            builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())));
-            break;
-        case Construct:
+        QueryMetadata queryMetadata = metadata.getQueryMetadata();
+        builder.appendItem( new ReportItem("") );
+        
+        builder.appendItem(new ReportItem("QUERY NODE"));
+        builder.appendItem(new ReportItem("Node ID", queryMetadata.getNodeId()));
+        builder.appendItem(new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()));
+        builder.appendItem( new ReportItem("SPARQL", queryMetadata.getSparql()) );
+        builder.appendItem(new ReportItem("Child Node ID", queryMetadata.getChildNodeId()));
+        builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())));
+        
+        
+        
+        if (metadata.getQueryType() == QueryType.Construct) {
+            builder.appendItem( new ReportItem("") );
+            
             final ConstructQueryMetadata constructMetadata = metadata.getConstructQueryMetadata().get();
             builder.appendItem(new ReportItem("CONSTRUCT QUERY NODE"));
             builder.appendItem(new ReportItem("Node ID", constructMetadata.getNodeId()));
             builder.appendItem(new ReportItem("Variable Order", constructMetadata.getVariableOrder().toString()));
-            builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(constructMetadata.getSparql())));
+            builder.appendItem( new ReportItem("Parent Node ID", constructMetadata.getParentNodeId()) );
             builder.appendItem(new ReportItem("Child Node ID", constructMetadata.getChildNodeId()));
             builder.appendItem(new ReportItem("Construct Graph", constructMetadata.getConstructGraph().toString()));
             builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(constructMetadata.getNodeId())));
         }
+        
+        for (ProjectionMetadata projectionMetadata : metadata.getProjectionMetadata()) {
+            builder.appendItem( new ReportItem("") );
+            
+            builder.appendItem(new ReportItem("PROJECTION NODE"));
+            builder.appendItem(new ReportItem("Node ID", projectionMetadata.getNodeId()));
+            builder.appendItem(new ReportItem("Variable Order", projectionMetadata.getVariableOrder().toString()));
+            builder.appendItem( new ReportItem("Parent Node ID", projectionMetadata.getParentNodeId()) );
+            builder.appendItem(new ReportItem("Child Node ID", projectionMetadata.getChildNodeId()));
+            builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(projectionMetadata.getNodeId())));
+        }
 
         for(final FilterMetadata filterMetadata : metadata.getFilterMetadata()) {
             builder.appendItem( new ReportItem("") );

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index c8dc737..f25b573 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -33,7 +33,7 @@ import org.apache.rya.api.domain.RyaType;
 import org.apache.rya.api.domain.RyaURI;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.api.resolver.RyaToRdfConversions;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
@@ -179,7 +179,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
             pcjId = pcjStorage.createPcj(sparql);
 
             // Tell the Fluo app to maintain it.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix);
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix);
 
         } catch (MalformedQueryException | PcjException | RyaDAOException e) {
             throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
index 124569b..bbdcfec 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
@@ -30,6 +30,7 @@ import org.junit.Assert;
 import org.openrdf.model.Statement;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 
 public class ConstructGraphTestUtils {
 
@@ -47,8 +48,18 @@ public class ConstructGraphTestUtils {
     
     public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
         Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>();
-        subgraph1.forEach(x->subGraphMap.put(getKey(x), x));
-        subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements()));
+        for(RyaSubGraph subgraph: subgraph1) {
+            int key = getKey(subgraph);
+            subGraphMap.put(key, subgraph);
+        }
+        
+        for(RyaSubGraph subgraph: subgraph2) {
+            int key = getKey(subgraph);
+            RyaSubGraph sub = subGraphMap.get(key);
+            Preconditions.checkNotNull(sub);
+            Set<RyaStatement> statements = sub.getStatements();
+            ryaStatementsEqualIgnoresBlankNode(subgraph.getStatements(), statements);
+        }
     }
     
     private static int getKey(RyaSubGraph subgraph) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index d5c0e5f..263a19e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -68,7 +68,7 @@ public class GetPcjMetadataIT extends RyaExportITBase {
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
 
             // Fetch the PCJ's Metadata through the GetPcjMetadata interactor.
             final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0);
@@ -95,7 +95,7 @@ public class GetPcjMetadataIT extends RyaExportITBase {
                             "?x <http://worksAt> <http://Chipotle>." +
                             "}";
             final String q1PcjId = pcjStorage.createPcj(q1Sparql);
-            final CreatePcj createPcj = new CreatePcj();
+            final CreateFluoPcj createPcj = new CreateFluoPcj();
             createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
 
             final String q2Sparql =

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index 965a7b9..c0f0f16 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -85,7 +85,7 @@ public class GetQueryReportIT extends RyaExportITBase {
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
 
             // Stream the data into Fluo.
             new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
@@ -106,7 +106,7 @@ public class GetQueryReportIT extends RyaExportITBase {
 
             final FluoQuery fluoQuery = report.getFluoQuery();
 
-            final String queryNodeId = fluoQuery.getQueryMetadata().get().getNodeId();
+            final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId();
             expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
 
             final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index d403404..315dddb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -20,6 +20,8 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -28,11 +30,12 @@ import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
-import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
@@ -113,7 +116,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
 
         // Create the object that will be serialized.
         final JoinMetadata.Builder builder = JoinMetadata.builder("nodeId");
-        builder.setVariableOrder(new VariableOrder("g;y;s"));
+        builder.setVarOrder(new VariableOrder("g;y;s"));
         builder.setJoinType(JoinType.NATURAL_JOIN);
         builder.setParentNodeId("parentNodeId");
         builder.setLeftChildNodeId("leftChildNodeId");
@@ -143,10 +146,13 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
 
         // Create the object that will be serialized.
-        final QueryMetadata.Builder builder = QueryMetadata.builder("nodeId");
-        builder.setVariableOrder(new VariableOrder("y;s;d"));
+        String queryId = NodeType.generateNewFluoIdForType(NodeType.QUERY);
+        final QueryMetadata.Builder builder = QueryMetadata.builder(queryId);
+        builder.setQueryType(QueryType.Projection);
+        builder.setVarOrder(new VariableOrder("y;s;d"));
         builder.setSparql("sparql string");
         builder.setChildNodeId("childNodeId");
+        builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.Kafka)));
         final QueryMetadata originalMetadata = builder.build();
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -159,7 +165,37 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
             // Read it from the Fluo table.
             QueryMetadata storedMetdata = null;
             try(Snapshot sx = fluoClient.newSnapshot()) {
-                storedMetdata = dao.readQueryMetadata(sx, "nodeId");
+                storedMetdata = dao.readQueryMetadata(sx, queryId);
+            }
+
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalMetadata, storedMetdata);
+        }
+    }
+    
+    @Test
+    public void projectionMetadataTest() {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final ProjectionMetadata.Builder builder = ProjectionMetadata.builder("nodeId");
+        builder.setVarOrder(new VariableOrder("y;s;d"));
+        builder.setProjectedVars(new VariableOrder("x;y;z"));
+        builder.setChildNodeId("childNodeId");
+        builder.setParentNodeId("parentNodeId");
+        final ProjectionMetadata originalMetadata = builder.build();
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
+
+            // Read it from the Fluo table.
+            ProjectionMetadata storedMetdata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetdata = dao.readProjectionMetadata(sx, "nodeId");
             }
 
             // Ensure the deserialized object is the same as the serialized one.
@@ -180,8 +216,9 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         // Create the object that will be serialized.
         final ConstructQueryMetadata.Builder builder = ConstructQueryMetadata.builder();
         builder.setNodeId("nodeId");
-        builder.setSparql(query);
         builder.setChildNodeId("childNodeId");
+        builder.setParentNodeId("parentNodeId");
+        builder.setVarOrder(new VariableOrder("a;b;c"));
         builder.setConstructGraph(new ConstructGraph(patterns));
         final ConstructQueryMetadata originalMetadata = builder.build();
 
@@ -209,7 +246,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
 
         // Create the object that will be serialized.
         final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId")
-                .setVariableOrder(new VariableOrder("totalCount"))
+                .setVarOrder(new VariableOrder("totalCount"))
                 .setParentNodeId("parentNodeId")
                 .setChildNodeId("childNodeId")
                 .setGroupByVariableOrder(new VariableOrder("a", "b", "c"))
@@ -240,7 +277,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
 
         // Create the object that will be serialized.
         final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId")
-                .setVariableOrder(new VariableOrder("totalCount"))
+                .setVarOrder(new VariableOrder("totalCount"))
                 .setParentNodeId("parentNodeId")
                 .setChildNodeId("childNodeId")
                 .addAggregation(new AggregationElement(AggregationType.COUNT, "count", "totalCount"))
@@ -315,8 +352,10 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
                   "?worker <http://worksAt> <http://Chipotle>. " +
                 "}";
 
-        final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
-        final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds());
+        SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+        builder.setSparql(sparql);
+        builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+        final FluoQuery originalQuery = builder.build();
 
         assertEquals(QueryType.Projection, originalQuery.getQueryType());
         assertEquals(false, originalQuery.getConstructQueryMetadata().isPresent());
@@ -331,7 +370,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         // Read it from the Fluo table.
         FluoQuery storedQuery = null;
         try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().get().getNodeId());
+            storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
         }
 
             // Ensure the deserialized object is the same as the serialized one.
@@ -354,11 +393,102 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
                   "?worker <http://worksAt> <http://Chipotle>. " +
                 "}";
 
-        final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
-        final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds());
+        SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+        builder.setSparql(sparql);
+        builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+        final FluoQuery originalQuery = builder.build();
+        
+        assertEquals(QueryType.Construct, originalQuery.getQueryType());
+        assertEquals(true, originalQuery.getConstructQueryMetadata().isPresent());
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalQuery);
+                tx.commit();
+            }
+
+        // Read it from the Fluo table.
+        FluoQuery storedQuery = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
+        }
+
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalQuery, storedQuery);
+        }
+    }
+    
+    
+    @Test
+    public void fluoNestedQueryTest() throws MalformedQueryException {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final String sparql =
+                "SELECT ?id ?type ?location ?averagePrice ?vendor {" +
+                "FILTER(?averagePrice > 4) " +
+                "?type <urn:purchasedFrom> ?vendor ." +
+                "{SELECT ?type ?location (avg(?price) as ?averagePrice) {" +
+                    "?id <urn:type> ?type . " +
+                    "?id <urn:location> ?location ." +
+                    "?id <urn:price> ?price ." +
+                "} " +
+                "GROUP BY ?type ?location }}";
+        
+        
+        SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+        builder.setSparql(sparql);
+        builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+        final FluoQuery originalQuery = builder.build();
+        
+        assertEquals(QueryType.Projection, originalQuery.getQueryType());
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalQuery);
+                tx.commit();
+            }
+
+        // Read it from the Fluo table.
+        FluoQuery storedQuery = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
+        }
+
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalQuery, storedQuery);
+        }
+    }
+    
+    @Test
+    public void fluoNestedConstructQueryTest() throws MalformedQueryException {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final String sparql = "CONSTRUCT { "
+                + "_:b a <urn:highSpeedTrafficArea> . "
+                + "_:b <urn:hasCount> ?obsCount . "
+                + "_:b <urn:hasLocation> ?location ."
+                + "_:b <urn:hasAverageVelocity> ?avgVelocity ."
+                + "} WHERE { "
+                + "FILTER(?obsCount > 1) "
+                + "{ "
+                + "SELECT ?location (count(?obs) AS ?obsCount) (avg(?velocity) AS ?avgVelocity) "
+                + "WHERE { "
+                + "FILTER(?velocity > 75) "
+                + "?obs <urn:hasVelocity> ?velocity. " 
+                + "?obs <urn:hasLocation> ?location. " 
+                + "}GROUP BY ?location }}";
+        
+        
+        SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+        builder.setSparql(sparql);
+        builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+        final FluoQuery originalQuery = builder.build();
         
         assertEquals(QueryType.Construct, originalQuery.getQueryType());
-        assertEquals(false, originalQuery.getQueryMetadata().isPresent());
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Write it to the Fluo table.
@@ -370,11 +500,12 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         // Read it from the Fluo table.
         FluoQuery storedQuery = null;
         try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedQuery = dao.readFluoQuery(sx, originalQuery.getConstructQueryMetadata().get().getNodeId());
+            storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
         }
 
             // Ensure the deserialized object is the same as the serialized one.
             assertEquals(originalQuery, storedQuery);
         }
     }
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
index 0cd7cfb..1707308 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
@@ -41,7 +41,7 @@ import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.log4j.Logger;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
@@ -91,7 +91,7 @@ public class BatchDeleteIT extends RyaExportITBase {
             final String pcjId = pcjStorage.createPcj(sparql);
 
             // Tell the Fluo app to maintain the PCJ.
-            String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
             List<String> prefixes = Arrays.asList("urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
@@ -130,7 +130,7 @@ public class BatchDeleteIT extends RyaExportITBase {
             final String pcjId = pcjStorage.createPcj(sparql);
 
             // Tell the Fluo app to maintain the PCJ.
-            String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
             String joinId = ids.get(1);
@@ -176,7 +176,7 @@ public class BatchDeleteIT extends RyaExportITBase {
             final String pcjId = pcjStorage.createPcj(sparql);
 
             // Tell the Fluo app to maintain the PCJ.
-            String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
             String joinId = ids.get(1);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 0f2d892..7c4caa4 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -35,7 +35,7 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Span;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
+import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 import org.openrdf.model.Statement;
@@ -79,10 +79,10 @@ public class CreateDeleteIT extends RyaExportITBase {
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Ensure the data was loaded.
             final List<Bytes> rows = getFluoTableEntries(fluoClient);
-            assertEquals(17, rows.size());
+            assertEquals(20, rows.size());
 
             // Delete the PCJ from the Fluo application.
-            new DeletePcj(1).deletePcj(fluoClient, pcjId);
+            new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
 
             // Ensure all data related to the query has been removed.
             final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
@@ -111,10 +111,10 @@ public class CreateDeleteIT extends RyaExportITBase {
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Ensure the data was loaded.
             final List<Bytes> rows = getFluoTableEntries(fluoClient);
-            assertEquals(10, rows.size());
+            assertEquals(12, rows.size());
 
             // Delete the PCJ from the Fluo application.
-            new DeletePcj(1).deletePcj(fluoClient, pcjId);
+            new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
 
             // Ensure all data related to the query has been removed.
             final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index f330825..d623043 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -29,7 +29,7 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
@@ -102,7 +102,7 @@ public class InputIT extends RyaExportITBase {
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
 
             // Verify the end results of the query match the expected results.
             super.getMiniFluo().waitForObservers();
@@ -162,7 +162,7 @@ public class InputIT extends RyaExportITBase {
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
 
             // Ensure the query has no results yet.
             super.getMiniFluo().waitForObservers();
@@ -228,7 +228,7 @@ public class InputIT extends RyaExportITBase {
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
 
             // Ensure Alice is a match.
             super.getMiniFluo().waitForObservers();
@@ -317,7 +317,7 @@ public class InputIT extends RyaExportITBase {
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
 
             // Ensure Alice is a match.
             super.getMiniFluo().waitForObservers();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index ab7610d..3ee07a7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -29,6 +29,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.recipes.test.FluoITHelper;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -244,6 +247,10 @@ public class KafkaExportIT extends KafkaExportITBase {
 
         // Create the PCJ in Fluo and load the statements into Rya.
         final String pcjId = loadData(sparql, statements);
+        
+        try(FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) {
+            FluoITHelper.printFluoTable(fluo);
+        }
 
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final MapBindingSet expectedResult = new MapBindingSet();
@@ -350,7 +357,7 @@ public class KafkaExportIT extends KafkaExportITBase {
     }
 
     @Test
-    public void groupByManyBindings_avaerages() throws Exception {
+    public void groupByManyBindings_averages() throws Exception {
         // A query that groups what is aggregated by two of the keys.
         final String sparql =
                 "SELECT ?type ?location (avg(?price) as ?averagePrice) {" +
@@ -425,6 +432,160 @@ public class KafkaExportIT extends KafkaExportITBase {
         assertEquals(expectedResults, results);
     }
 
+    
+    @Test
+    public void nestedGroupByManyBindings_averages() throws Exception {
+        // A query that groups what is aggregated by two of the keys.
+        final String sparql =
+                "SELECT ?type ?location ?averagePrice {" +
+                "FILTER(?averagePrice > 4) " +
+                "{SELECT ?type ?location (avg(?price) as ?averagePrice) {" +
+                    "?id <urn:type> ?type . " +
+                    "?id <urn:location> ?location ." +
+                    "?id <urn:price> ?price ." +
+                "} " +
+                "GROUP BY ?type ?location }}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                // American items that will be averaged.
+                vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:type"), vf.createLiteral("apple")),
+                vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:location"), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:price"), vf.createLiteral(2.50)),
+
+                vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:type"), vf.createLiteral("cheese")),
+                vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:location"), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:price"), vf.createLiteral(4.25)),
+
+                vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:type"), vf.createLiteral("cheese")),
+                vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:location"), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:price"), vf.createLiteral(5.25)),
+
+                // French items that will be averaged.
+                vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:type"), vf.createLiteral("cheese")),
+                vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:location"), vf.createLiteral("France")),
+                vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:price"), vf.createLiteral(8.5)),
+
+                vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:type"), vf.createLiteral("cigarettes")),
+                vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:location"), vf.createLiteral("France")),
+                vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:price"), vf.createLiteral(3.99)),
+
+                vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:type"), vf.createLiteral("cigarettes")),
+                vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:location"), vf.createLiteral("France")),
+                vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has been computed.
+        final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING));
+        bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING));
+        bs.addBinding("averagePrice", vf.createLiteral("8.5", XMLSchema.DECIMAL));
+        expectedResults.add( new VisibilityBindingSet(bs));
+
+        bs = new MapBindingSet();
+        bs.addBinding("type", vf.createLiteral("cigarettes", XMLSchema.STRING));
+        bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING));
+        bs.addBinding("averagePrice", vf.createLiteral("4.49", XMLSchema.DECIMAL));
+        expectedResults.add( new VisibilityBindingSet(bs) );
+        
+        bs = new MapBindingSet();
+        bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING));
+        bs.addBinding("location", vf.createLiteral("USA", XMLSchema.STRING));
+        bs.addBinding("averagePrice", vf.createLiteral("4.75", XMLSchema.DECIMAL));
+        expectedResults.add( new VisibilityBindingSet(bs) );
+
+        // Verify the end results of the query match the expected results.
+        final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location"));
+        System.out.println(results);
+        assertEquals(expectedResults, results);
+    }
+    
+    
+    @Test
+    public void nestedWithJoinGroupByManyBindings_averages() throws Exception {
+       
+        // A query that groups what is aggregated by two of the keys.
+        final String sparql =
+                "SELECT ?type ?location ?averagePrice ?milkType {" +
+                "FILTER(?averagePrice > 4) " +
+                "?type <urn:hasMilkType> ?milkType ." +
+                "{SELECT ?type ?location (avg(?price) as ?averagePrice) {" +
+                    "?id <urn:type> ?type . " +
+                    "?id <urn:location> ?location ." +
+                    "?id <urn:price> ?price ." +
+                "} " +
+                "GROUP BY ?type ?location }}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+               
+                vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:type"), vf.createURI("urn:blue")),
+                vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:location"), vf.createLiteral("France")),
+                vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:price"), vf.createLiteral(8.5)),
+                vf.createStatement(vf.createURI("urn:blue"), vf.createURI("urn:hasMilkType"), vf.createLiteral("cow", XMLSchema.STRING)),
+
+                vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:type"), vf.createURI("urn:american")),
+                vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:location"), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:price"), vf.createLiteral(.99)),
+
+                vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:type"), vf.createURI("urn:cheddar")),
+                vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:location"), vf.createLiteral("USA")),
+                vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:price"), vf.createLiteral(5.25)),
+
+                // French items that will be averaged.
+                vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:type"), vf.createURI("urn:goat")),
+                vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:location"), vf.createLiteral("France")),
+                vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:price"), vf.createLiteral(6.5)),
+                vf.createStatement(vf.createURI("urn:goat"), vf.createURI("urn:hasMilkType"), vf.createLiteral("goat", XMLSchema.STRING)),
+                
+                vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:type"), vf.createURI("urn:fontina")),
+                vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:location"), vf.createLiteral("Italy")),
+                vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:price"), vf.createLiteral(3.99)),
+                vf.createStatement(vf.createURI("urn:fontina"), vf.createURI("urn:hasMilkType"), vf.createLiteral("cow", XMLSchema.STRING)),
+
+                vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:type"), vf.createURI("urn:fontina")),
+                vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:location"), vf.createLiteral("Italy")),
+                vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        // Create the expected results of the SPARQL query once the PCJ has been computed.
+        final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("type", vf.createURI("urn:blue"));
+        bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING));
+        bs.addBinding("averagePrice", vf.createLiteral("8.5", XMLSchema.DECIMAL));
+        bs.addBinding("milkType", vf.createLiteral("cow", XMLSchema.STRING));
+        expectedResults.add( new VisibilityBindingSet(bs));
+
+        bs = new MapBindingSet();
+        bs.addBinding("type", vf.createURI("urn:goat"));
+        bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING));
+        bs.addBinding("averagePrice", vf.createLiteral("6.5", XMLSchema.DECIMAL));
+        bs.addBinding("milkType", vf.createLiteral("goat", XMLSchema.STRING));
+        expectedResults.add( new VisibilityBindingSet(bs) );
+        
+        bs = new MapBindingSet();
+        bs.addBinding("type", vf.createURI("urn:fontina"));
+        bs.addBinding("location", vf.createLiteral("Italy", XMLSchema.STRING));
+        bs.addBinding("averagePrice", vf.createLiteral("4.49", XMLSchema.DECIMAL));
+        bs.addBinding("milkType", vf.createLiteral("cow", XMLSchema.STRING));
+        expectedResults.add( new VisibilityBindingSet(bs) );
+
+        // Verify the end results of the query match the expected results.
+        final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location"));
+        System.out.println(results);
+        assertEquals(expectedResults, results);
+    }
+
 
     private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception {
         requireNonNull(pcjId);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
index 7a4ed8d..ca8de0d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
@@ -43,24 +43,28 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaType;
 import org.apache.rya.api.domain.RyaURI;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.junit.Assert;
 import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.XMLSchema;
 
 import com.google.common.collect.Sets;
 
@@ -83,6 +87,7 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
         observers.add(new ObserverSpecification(JoinObserver.class.getName()));
         observers.add(new ObserverSpecification(FilterObserver.class.getName()));
         observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
+        observers.add(new ObserverSpecification(ProjectionObserver.class.getName()));
 
         // Configure the export observer to export new PCJ results to the mini
         // accumulo cluster.
@@ -285,6 +290,77 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
         ConstructGraphTestUtils.subGraphsEqualIgnoresBlankNode(expectedResults, results);
     }
     
+    
+    @Test
+    public void nestedConstructQuery() throws Exception {
+        // A query that groups what is aggregated by one of the keys.
+        final String sparql = "CONSTRUCT { "
+                + "_:b a <urn:highSpeedTrafficArea> . "
+                + "_:b <urn:hasCount> ?obsCount . "
+                + "_:b <urn:hasLocation> ?location ."
+                + "_:b <urn:hasAverageVelocity> ?avgVelocity ."
+                + "} WHERE { "
+                + "FILTER(?obsCount > 1) "
+                + "{ "
+                + "SELECT ?location (count(?obs) AS ?obsCount) (avg(?velocity) AS ?avgVelocity) "
+                + "WHERE { "
+                + "FILTER(?velocity > 75) "
+                + "?obs <urn:hasVelocity> ?velocity. " 
+                + "?obs <urn:hasLocation> ?location. " 
+                + "}GROUP BY ?location }}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs1"), vf.createURI("urn:hasVelocity"), vf.createLiteral(77)),
+                vf.createStatement(vf.createURI("urn:obs1"), vf.createURI("urn:hasLocation"), vf.createLiteral("OldTown")),
+                vf.createStatement(vf.createURI("urn:obs2"), vf.createURI("urn:hasVelocity"), vf.createLiteral(81)),
+                vf.createStatement(vf.createURI("urn:obs2"), vf.createURI("urn:hasLocation"), vf.createLiteral("OldTown")),
+                vf.createStatement(vf.createURI("urn:obs3"), vf.createURI("urn:hasVelocity"), vf.createLiteral(70)),
+                vf.createStatement(vf.createURI("urn:obs3"), vf.createURI("urn:hasLocation"), vf.createLiteral("OldTown")),
+                vf.createStatement(vf.createURI("urn:obs5"), vf.createURI("urn:hasVelocity"), vf.createLiteral(87)),
+                vf.createStatement(vf.createURI("urn:obs5"), vf.createURI("urn:hasLocation"), vf.createLiteral("Rosslyn")),
+                vf.createStatement(vf.createURI("urn:obs6"), vf.createURI("urn:hasVelocity"), vf.createLiteral(81)),
+                vf.createStatement(vf.createURI("urn:obs6"), vf.createURI("urn:hasLocation"), vf.createLiteral("Rosslyn")),
+                vf.createStatement(vf.createURI("urn:obs7"), vf.createURI("urn:hasVelocity"), vf.createLiteral(67)),
+                vf.createStatement(vf.createURI("urn:obs7"), vf.createURI("urn:hasLocation"), vf.createLiteral("Clarendon")),
+                vf.createStatement(vf.createURI("urn:obs8"), vf.createURI("urn:hasVelocity"), vf.createLiteral(77)),
+                vf.createStatement(vf.createURI("urn:obs8"), vf.createURI("urn:hasLocation"), vf.createLiteral("Ballston")),
+                vf.createStatement(vf.createURI("urn:obs9"), vf.createURI("urn:hasVelocity"), vf.createLiteral(87)),
+                vf.createStatement(vf.createURI("urn:obs9"), vf.createURI("urn:hasLocation"), vf.createLiteral("FallsChurch")));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadStatements(sparql, statements);
+
+        // Verify the end results of the query match the expected results.
+        final Set<RyaSubGraph> results = readAllResults(pcjId);
+        
+        RyaStatement statement1 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI("urn:hasCount"), new RyaType(XMLSchema.INTEGER, "2"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI("urn:hasAverageVelocity"), new RyaType(XMLSchema.DECIMAL, "84"));
+        RyaStatement statement3 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI("urn:hasLocation"), new RyaType("Rosslyn"));
+        RyaStatement statement4 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI(RDF.TYPE.toString()), new RyaURI("urn:highSpeedTrafficArea"));
+        RyaStatement statement5 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI("urn:hasCount"), new RyaType(XMLSchema.INTEGER, "2"));
+        RyaStatement statement6 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI("urn:hasAverageVelocity"), new RyaType(XMLSchema.DECIMAL, "79"));
+        RyaStatement statement7 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI("urn:hasLocation"), new RyaType("OldTown"));
+        RyaStatement statement8 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI(RDF.TYPE.toString()), new RyaURI("urn:highSpeedTrafficArea"));
+
+        final Set<RyaSubGraph> expectedResults = new HashSet<>();
+
+        RyaSubGraph subGraph1 = new RyaSubGraph(pcjId);
+        Set<RyaStatement> stmnts1 = new HashSet<>(Arrays.asList(statement1, statement2, statement3, statement4));
+        subGraph1.setStatements(stmnts1);
+        expectedResults.add(subGraph1);
+        
+        RyaSubGraph subGraph2 = new RyaSubGraph(pcjId);
+        Set<RyaStatement> stmnts2 = new HashSet<>(Arrays.asList(statement5, statement6, statement7, statement8));
+        subGraph2.setStatements(stmnts2);
+        expectedResults.add(subGraph2);
+        
+        Assert.assertEquals(expectedResults.size(), results.size());
+        ConstructGraphTestUtils.subGraphsEqualIgnoresBlankNode(expectedResults, results);;
+    }
+    
+    
     protected KafkaConsumer<String, RyaSubGraph> makeRyaSubGraphConsumer(final String TopicName) {
         // setup consumer
         final Properties consumerProps = new Properties();
@@ -330,9 +406,9 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
         FluoClient client = null;
 
         try {
-            CreatePcj createPcj = new CreatePcj();
+            CreateFluoPcj createPcj = new CreateFluoPcj();
             client = new FluoClientImpl(super.getFluoConfiguration());
-            FluoQuery fluoQuery = createPcj.createFluoPcj(client, sparql);
+            String id = createPcj.createPcj(sparql, client).getQueryId();
 
             AccumuloRyaDAO dao = getRyaDAO();
             dao.add(statements.iterator());
@@ -341,7 +417,7 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
             super.getMiniFluo().waitForObservers();
 
             // FluoITHelper.printFluoTable(client);
-            return fluoQuery.getConstructQueryMetadata().get().getNodeId();
+            return id;
         } finally {
             if (client != null) {
                 client.close();