You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/06/22 18:05:14 UTC

[2/4] incubator-rya git commit: RYA-273-Construct Query Support. Closes #161.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 77d6a49..3396114 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -46,6 +46,18 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  *   </table>
  * </p>
  * <p>
+ *   <b>Construct Query Metadata</b>
+ *   <table border="1" style="width:100%">
+ *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:graph</td> <td>The construct graph used to project BindingSets to statements.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:statements</td> <td>The RDF statements produced by this construct query node.</td> </tr>
+ *   </table>
+ * </p>
+ * <p>
  *   <b>Filter Metadata</b>
  *   <table border="1" style="width:100%">
  *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
@@ -104,6 +116,7 @@ public class FluoQueryColumns {
     public static final String JOIN_METADATA_CF = "joinMetadata";
     public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata";
     public static final String AGGREGATION_METADATA_CF = "aggregationMetadata";
+    public static final String CONSTRUCT_METADATA_CF = "constructMetadata";
 
     /**
      * New triples that have been added to Rya are written as a row in this
@@ -151,6 +164,14 @@ public class FluoQueryColumns {
     public static final Column QUERY_CHILD_NODE_ID = new Column(QUERY_METADATA_CF, "childNodeId");
     public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet");
 
+ // Construct Query Metadata columns.
+    public static final Column CONSTRUCT_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "nodeId");
+    public static final Column CONSTRUCT_VARIABLE_ORDER = new Column(CONSTRUCT_METADATA_CF, "variableOrder");
+    public static final Column CONSTRUCT_GRAPH = new Column(CONSTRUCT_METADATA_CF, "graph");
+    public static final Column CONSTRUCT_CHILD_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "childNodeId");
+    public static final Column CONSTRUCT_STATEMENTS = new Column(CONSTRUCT_METADATA_CF, "statements");
+    public static final Column CONSTRUCT_SPARQL = new Column(CONSTRUCT_METADATA_CF, "sparql");
+
     // Filter Metadata columns.
     public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, "nodeId");
     public static final Column FILTER_VARIABLE_ORDER = new Column(FILTER_METADATA_CF, "veriableOrder");
@@ -201,6 +222,18 @@ public class FluoQueryColumns {
                         QUERY_CHILD_NODE_ID)),
 
         /**
+         * The columns a {@link ConstructQueryMetadata} object's fields are stored within.
+         */
+        CONSTRUCT_COLUMNS(
+                Arrays.asList(CONSTRUCT_NODE_ID,
+                        CONSTRUCT_VARIABLE_ORDER,
+                        CONSTRUCT_GRAPH,
+                        CONSTRUCT_CHILD_NODE_ID,
+                        CONSTRUCT_SPARQL,
+                        CONSTRUCT_STATEMENTS)),
+
+        
+        /**
          * The columns a {@link FilterMetadata} object's fields are stored within.
          */
         FILTER_COLUMNS(

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index dfc3333..5e9d654 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -31,7 +31,8 @@ import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
@@ -39,6 +40,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -103,6 +105,59 @@ public class FluoQueryMetadataDAO {
     }
 
     /**
+     * Write an instance of {@link ConstructQueryMetadata} to the Fluo table.
+     *
+     * @param tx - The transaction that will be used to commit the metadata. (not null)
+     * @param metadata - The Construct Query node metadata that will be written to the table. (not null)
+     */
+    public void write(final TransactionBase tx, final ConstructQueryMetadata metadata) {
+        requireNonNull(tx);
+        requireNonNull(metadata);
+
+        final String rowId = metadata.getNodeId();
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_NODE_ID, rowId);
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, metadata.getChildNodeId() );
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_SPARQL, metadata.getSparql());
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_GRAPH, ConstructGraphSerializer.toConstructString(metadata.getConstructGraph()));
+    }
+
+    /**
+     * Read an instance of {@link ConstructQueryMetadata} from the Fluo table.
+     *
+     * @param sx - The snapshot that will be used to read the metadata . (not null)
+     * @param nodeId - The nodeId of the Construct Query node that will be read. (not null)
+     * @return The {@link ConstructQueryMetadata} that was read from table.
+     */
+    public ConstructQueryMetadata readConstructQueryMetadata(final SnapshotBase sx, final String nodeId) {
+        return readConstructQueryMetadataBuilder(sx, nodeId).build();
+    }
+
+    private ConstructQueryMetadata.Builder readConstructQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) {
+        requireNonNull(sx);
+        requireNonNull(nodeId);
+
+        // Fetch the values from the Fluo table.
+        final String rowId = nodeId;
+        final Map<Column, String> values = sx.gets(rowId, 
+                FluoQueryColumns.CONSTRUCT_GRAPH,
+                FluoQueryColumns.CONSTRUCT_SPARQL,
+                FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID);
+
+        final String graphString = values.get(FluoQueryColumns.CONSTRUCT_GRAPH);
+        final ConstructGraph graph = ConstructGraphSerializer.toConstructGraph(graphString);
+        final String childNodeId = values.get(FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID);
+        final String sparql = values.get(FluoQueryColumns.CONSTRUCT_SPARQL);
+
+        return ConstructQueryMetadata.builder()
+                .setNodeId(nodeId)
+                .setConstructGraph(graph)
+                .setSparql(sparql)
+                .setChildNodeId(childNodeId);
+    }
+    
+    
+    /**
      * Write an instance of {@link FilterMetadata} to the Fluo table.
      *
      * @param tx - The transaction that will be used to commit the metadata. (not null)
@@ -376,13 +431,25 @@ public class FluoQueryMetadataDAO {
         requireNonNull(tx);
         requireNonNull(query);
 
-        // Store the Query ID so that it may be looked up from the original SPARQL string.
-        final String sparql = query.getQueryMetadata().getSparql();
-        final String queryId = query.getQueryMetadata().getNodeId();
-        tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
-
         // Write the rest of the metadata objects.
-        write(tx, query.getQueryMetadata());
+        switch(query.getQueryType()) {
+        case Construct:
+            ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get();
+            // Store the Query ID so that it may be looked up from the original SPARQL string.
+            final String constructSparql = constructMetadata.getSparql();
+            final String constructQueryId = constructMetadata.getNodeId();
+            tx.set(Bytes.of(constructSparql), FluoQueryColumns.QUERY_ID, Bytes.of(constructQueryId));
+            write(tx, constructMetadata);
+            break;
+        case Projection:
+            QueryMetadata queryMetadata = query.getQueryMetadata().get();
+            // Store the Query ID so that it may be looked up from the original SPARQL string.
+            final String sparql = queryMetadata.getSparql();
+            final String queryId = queryMetadata.getNodeId();
+            tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
+            write(tx, queryMetadata);
+            break;
+        }
 
         for(final FilterMetadata filter : query.getFilterMetadata()) {
             write(tx, filter);
@@ -423,50 +490,62 @@ public class FluoQueryMetadataDAO {
         requireNonNull(childNodeId);
 
         final NodeType childType = NodeType.fromNodeId(childNodeId).get();
-        switch(childType) {
-            case QUERY:
-                // Add this node's metadata.
-                final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId);
-                builder.setQueryMetadata(queryBuilder);
-
-                // Add it's child's metadata.
-                addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId());
-                break;
-
-            case JOIN:
-                // Add this node's metadata.
-                final JoinMetadata.Builder joinBuilder = readJoinMetadataBuilder(sx, childNodeId);
-                builder.addJoinMetadata(joinBuilder);
-
-                // Add it's children's metadata.
-                final JoinMetadata joinMetadata = joinBuilder.build();
-                addChildMetadata(sx, builder, joinMetadata.getLeftChildNodeId());
-                addChildMetadata(sx, builder, joinMetadata.getRightChildNodeId());
-                break;
-
-            case FILTER:
-                // Add this node's metadata.
-                final FilterMetadata.Builder filterBuilder = readFilterMetadataBuilder(sx, childNodeId);
-                builder.addFilterMetadata(filterBuilder);
-
-                // Add it's child's metadata.
-                addChildMetadata(sx, builder, filterBuilder.build().getChildNodeId());
-                break;
-
-            case STATEMENT_PATTERN:
-                // Add this node's metadata.
-                final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId);
-                builder.addStatementPatternBuilder(spBuilder);
-                break;
-
-            case AGGREGATION:
-                // Add this node's metadata.
-                final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId);
-                builder.addAggregateMetadata(aggregationBuilder);
-
-                // Add it's child's metadata.
-                addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId());
-                break;
+        switch (childType) {
+        case QUERY:
+            // Add this node's metadata.
+            final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId);
+            Preconditions.checkArgument(!builder.getQueryBuilder().isPresent());
+            builder.setQueryMetadata(queryBuilder);
+
+            // Add it's child's metadata.
+            addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId());
+            break;
+
+        case CONSTRUCT:
+            final ConstructQueryMetadata.Builder constructBuilder = readConstructQueryMetadataBuilder(sx, childNodeId);
+            Preconditions.checkArgument(!builder.getQueryBuilder().isPresent());
+            builder.setConstructQueryMetadata(constructBuilder);
+            
+            // Add it's child's metadata.
+            addChildMetadata(sx, builder, constructBuilder.build().getChildNodeId());
+            break;
+
+        case AGGREGATION:
+            // Add this node's metadata.
+            final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId);
+            builder.addAggregateMetadata(aggregationBuilder);
+            
+            // Add it's child's metadata.
+            addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId());
+            break;
+            
+        case JOIN:
+            // Add this node's metadata.
+            final JoinMetadata.Builder joinBuilder = readJoinMetadataBuilder(sx, childNodeId);
+            builder.addJoinMetadata(joinBuilder);
+
+            // Add it's children's metadata.
+            final JoinMetadata joinMetadata = joinBuilder.build();
+            addChildMetadata(sx, builder, joinMetadata.getLeftChildNodeId());
+            addChildMetadata(sx, builder, joinMetadata.getRightChildNodeId());
+            break;
+
+        case FILTER:
+            // Add this node's metadata.
+            final FilterMetadata.Builder filterBuilder = readFilterMetadataBuilder(sx, childNodeId);
+            builder.addFilterMetadata(filterBuilder);
+
+            // Add it's child's metadata.
+            addChildMetadata(sx, builder, filterBuilder.build().getChildNodeId());
+            break;
+
+        case STATEMENT_PATTERN:
+            // Add this node's metadata.
+            final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId);
+            builder.addStatementPatternBuilder(spBuilder);
+            break;
+        default:
+            break;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index 064cfe8..23ac286 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -129,7 +129,7 @@ public class QueryMetadata extends CommonNodeMetadata {
     @DefaultAnnotation(NonNull.class)
     public static final class Builder {
 
-        private final String nodeId;
+        private String nodeId;
         private VariableOrder varOrder;
         private String sparql;
         private String childNodeId;
@@ -143,6 +143,7 @@ public class QueryMetadata extends CommonNodeMetadata {
             this.nodeId = checkNotNull(nodeId);
         }
 
+        
         /**
          * Set the variable order of binding sets that are emitted by this node.
          *

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 562470a..631ce60 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -19,7 +19,9 @@
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
@@ -28,6 +30,7 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -35,6 +38,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection;
 import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
@@ -42,21 +47,33 @@ import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.Aggregatio
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.BNodeImpl;
 import org.openrdf.query.algebra.AggregateOperator;
+import org.openrdf.query.algebra.BNodeGenerator;
 import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.ExtensionElem;
 import org.openrdf.query.algebra.Filter;
 import org.openrdf.query.algebra.Group;
 import org.openrdf.query.algebra.GroupElem;
 import org.openrdf.query.algebra.Join;
 import org.openrdf.query.algebra.LeftJoin;
+import org.openrdf.query.algebra.MultiProjection;
 import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
 import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.Reduced;
 import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.UnaryTupleOperator;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
 import org.openrdf.query.algebra.Var;
 import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
 import org.openrdf.query.parser.ParsedQuery;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -158,16 +175,18 @@ public class SparqlFluoQueryBuilder {
 
             // Create the prefix of the id. This makes it a little bit more human readable.
             String prefix;
-            if(node instanceof StatementPattern) {
+            if (node instanceof StatementPattern) {
                 prefix = SP_PREFIX;
-            } else if(node instanceof Filter) {
+            } else if (node instanceof Filter) {
                 prefix = FILTER_PREFIX;
-            } else if(node instanceof Join || node instanceof LeftJoin) {
+            } else if (node instanceof Join || node instanceof LeftJoin) {
                 prefix = JOIN_PREFIX;
-            } else if(node instanceof Projection) {
+            } else if (node instanceof Projection) {
                 prefix = QUERY_PREFIX;
             } else if(node instanceof Extension) {
                 prefix = AGGREGATION_PREFIX;
+            }  else if (node instanceof Reduced) {
+                prefix = CONSTRUCT_PREFIX;
             } else {
                 throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass());
             }
@@ -402,7 +421,7 @@ public class SparqlFluoQueryBuilder {
 
             final QueryModelNode child = node.getArg();
             if(child == null) {
-                throw new IllegalArgumentException("Filter arg connot be null.");
+                throw new IllegalArgumentException("Projection arg connot be null.");
             }
 
             final String childNodeId = nodeIds.getOrMakeId(child);
@@ -417,6 +436,60 @@ public class SparqlFluoQueryBuilder {
             // Walk to the next node.
             super.meet(node);
         }
+        
+        
+        public void meet(Reduced node) {
+            //create id, initialize ConstructQueryMetadata builder, register ConstructQueryMetadata 
+            //builder with FluoQueryBuilder, and add metadata that we currently have
+            final String constructId = nodeIds.getOrMakeId(node);
+            final ConstructQueryMetadata.Builder constructBuilder = ConstructQueryMetadata.builder();
+            constructBuilder.setNodeId(constructId);
+            fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
+            constructBuilder.setSparql(sparql);
+            
+            //get child node
+            QueryModelNode child = node.getArg();
+            Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection);
+            UnaryTupleOperator unary = (UnaryTupleOperator) child;
+            
+            //get ProjectionElemList to build ConstructGraph
+            final List<ProjectionElemList> projections = new ArrayList<>();
+            if(unary instanceof Projection) {
+                projections.add(((Projection) unary).getProjectionElemList());
+            } else {
+                projections.addAll(((MultiProjection)unary).getProjections());
+            }
+            
+            //get ExtensionElems to build ConstructGraph
+            QueryModelNode grandChild = unary.getArg();
+            Preconditions.checkArgument(grandChild instanceof Extension);
+            Extension extension = (Extension) grandChild;
+            final List<ExtensionElem> extensionElems = extension.getElements();
+            final ConstructGraph graph = getConstructGraph(projections, extensionElems);
+            constructBuilder.setConstructGraph(graph);
+            
+            //set child to the next node we care about in Fluo
+            //if Extension's arg is a Group node, then it is an Aggregation, so set child to Extension
+            //otherwise set child to Extension's child (only care about Extensions if they are Aggregations)
+            if(extension.getArg() instanceof Group) {
+                child = extension;
+            } else {
+                child = extension.getArg();
+            }
+            
+            //Set the child node in the ConstructQueryMetadataBuilder
+            String childNodeId = nodeIds.getOrMakeId(child);
+            constructBuilder.setChildNodeId(childNodeId);
+            
+            // Update the child node's metadata.
+            final Set<String> childVars = getVars((TupleExpr)child);
+            final VariableOrder childVarOrder = new VariableOrder(childVars);
+            setChildMetadata(childNodeId, childVarOrder, constructId);
+            
+            //fast forward visitor to next node we care about
+            child.visit(this);
+        }
+        
 
         /**
          * Update a query node's metadata to include it's binding set variable order
@@ -433,57 +506,102 @@ public class SparqlFluoQueryBuilder {
             checkNotNull(parentNodeId);
 
             final NodeType childType = NodeType.fromNodeId(childNodeId).get();
-            switch(childType) {
-                case STATEMENT_PATTERN:
-                    StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
-                    if(spBuilder == null) {
-                        spBuilder = StatementPatternMetadata.builder(childNodeId);
-                        fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
-                    }
+            switch (childType) {
+            case STATEMENT_PATTERN:
+                StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
+                if (spBuilder == null) {
+                    spBuilder = StatementPatternMetadata.builder(childNodeId);
+                    fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
+                }
 
-                    spBuilder.setVarOrder(childVarOrder);
-                    spBuilder.setParentNodeId(parentNodeId);
-                    break;
+                spBuilder.setVarOrder(childVarOrder);
+                spBuilder.setParentNodeId(parentNodeId);
+                break;
 
-                case JOIN:
-                    JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
-                    if(joinBuilder == null) {
-                        joinBuilder = JoinMetadata.builder(childNodeId);
-                        fluoQueryBuilder.addJoinMetadata(joinBuilder);
-                    }
+            case JOIN:
+                JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
+                if (joinBuilder == null) {
+                    joinBuilder = JoinMetadata.builder(childNodeId);
+                    fluoQueryBuilder.addJoinMetadata(joinBuilder);
+                }
 
-                    joinBuilder.setVariableOrder(childVarOrder);
-                    joinBuilder.setParentNodeId(parentNodeId);
-                    break;
+                joinBuilder.setVariableOrder(childVarOrder);
+                joinBuilder.setParentNodeId(parentNodeId);
+                break;
 
-                case FILTER:
-                    FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
-                    if(filterBuilder == null) {
-                        filterBuilder = FilterMetadata.builder(childNodeId);
-                        fluoQueryBuilder.addFilterMetadata(filterBuilder);
-                    }
+            case FILTER:
+                FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
+                if (filterBuilder == null) {
+                    filterBuilder = FilterMetadata.builder(childNodeId);
+                    fluoQueryBuilder.addFilterMetadata(filterBuilder);
+                }
 
-                    filterBuilder.setVarOrder(childVarOrder);
-                    filterBuilder.setParentNodeId(parentNodeId);
-                    break;
+                filterBuilder.setVarOrder(childVarOrder);
+                filterBuilder.setParentNodeId(parentNodeId);
+                break;
 
-                case AGGREGATION:
-                    AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
-                    if(aggregationBuilder == null) {
-                        aggregationBuilder = AggregationMetadata.builder(childNodeId);
-                        fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
-                    }
+            case AGGREGATION:
+                AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
+                if (aggregationBuilder == null) {
+                    aggregationBuilder = AggregationMetadata.builder(childNodeId);
+                    fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
+                }
 
-                    aggregationBuilder.setVariableOrder(childVarOrder);
-                    aggregationBuilder.setParentNodeId(parentNodeId);
-                    break;
+                aggregationBuilder.setVariableOrder(childVarOrder);
+                aggregationBuilder.setParentNodeId(parentNodeId);
+                break;
 
-                case QUERY:
-                    throw new IllegalArgumentException("QUERY nodes do not have children.");
-                default:
-                    throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+            case QUERY:
+                throw new IllegalArgumentException("A QUERY node cannot be the child of another node.");
+            case CONSTRUCT:
+                throw new IllegalArgumentException("A CONSTRUCT node cannot be the child of another node.");
+            default:
+                throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+            }
+        }
+        
+        private ConstructGraph getConstructGraph(List<ProjectionElemList> projections, List<ExtensionElem> extensionElems) {
+            Map<String, Value> valueMap = new HashMap<>();
+            //create valueMap to associate source names with Values
+            for(ExtensionElem elem: extensionElems) {
+                String name = elem.getName();
+                ValueExpr expr = elem.getExpr();
+                if(expr instanceof ValueConstant) {
+                    Value value = ((ValueConstant) expr).getValue();
+                    valueMap.put(name, value);
+                } else if(expr instanceof BNodeGenerator) {
+                    valueMap.put(name, new BNodeImpl(UUID.randomUUID().toString()));
+                }
+            }
+            
+            Set<ConstructProjection> constructProj = new HashSet<>();
+            //build ConstructProjection for each ProjectionElemList
+            for(ProjectionElemList list: projections) {
+                validateProjectionElemList(list);
+                List<Var> vars = new ArrayList<>();
+                for(ProjectionElem elem: list.getElements()) {
+                    String sourceName = elem.getSourceName();
+                    Var var = new Var(sourceName);
+                    if(valueMap.containsKey(sourceName)) {
+                        var.setValue(valueMap.get(sourceName));
+                    }
+                    vars.add(var);
+                }
+                constructProj.add(new ConstructProjection(vars.get(0), vars.get(1), vars.get(2)));
             }
+            
+            return new ConstructGraph(constructProj);
+        }
+        
+        private void validateProjectionElemList(ProjectionElemList list) {
+            List<ProjectionElem> elements = list.getElements();
+            checkArgument(elements.size() == 3);
+            checkArgument(elements.get(0).getTargetName().equals("subject"));
+            checkArgument(elements.get(1).getTargetName().equals("predicate"));
+            checkArgument(elements.get(2).getTargetName().equals("object"));
         }
+        
+        
 
         /**
          * Get the non-constant variables from a {@link TupleExpr}.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java
new file mode 100644
index 0000000..94c8571
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java
@@ -0,0 +1,145 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.collect.Sets;
+
+public class ConstructGraphTest {
+
+    private ValueFactory vf = new ValueFactoryImpl();
+    
+    @Test
+    public void testConstructGraph() throws MalformedQueryException, UnsupportedEncodingException {
+        String query = "select ?x where { ?x <uri:talksTo> <uri:Bob>. ?y <uri:worksAt> ?z }";
+
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructGraph graph = new ConstructGraph(patterns);
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("x", vf.createURI("uri:Joe"));
+        bs.addBinding("y", vf.createURI("uri:Bob"));
+        bs.addBinding("z", vf.createURI("uri:BurgerShack"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs,"FOUO");
+        Set<RyaStatement> statements = graph.createGraphFromBindingSet(vBs);
+        
+        RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Bob"), new RyaURI("uri:worksAt"), new RyaURI("uri:BurgerShack"));
+        Set<RyaStatement> expected = Sets.newHashSet(Arrays.asList(statement1, statement2));
+        expected.forEach(x-> x.setColumnVisibility("FOUO".getBytes()));
+        ConstructGraphTestUtils.ryaStatementSetsEqualIgnoresTimestamp(expected, statements);
+    }
+    
+    @Test
+    public void testConstructGraphBNode() throws MalformedQueryException {
+        String query = "select ?x where { _:b <uri:talksTo> ?x. _:b <uri:worksAt> ?z }";
+
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructGraph graph = new ConstructGraph(patterns);
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("x", vf.createURI("uri:Joe"));
+        bs.addBinding("z", vf.createURI("uri:BurgerShack"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs, "FOUO");
+        Set<RyaStatement> statements = graph.createGraphFromBindingSet(vBs);
+        Set<RyaStatement> statements2 = graph.createGraphFromBindingSet(vBs);
+        
+        RyaURI subject = null;
+        for(RyaStatement statement: statements) {
+            RyaURI subjURI = statement.getSubject();
+            if(subject == null) {
+                subject = subjURI;
+            } else {
+                assertEquals(subjURI, subject);
+            }
+        }
+        RyaURI subject2 = null;
+        for(RyaStatement statement: statements2) {
+            RyaURI subjURI = statement.getSubject();
+            if(subject2 == null) {
+                subject2 = subjURI;
+            } else {
+                assertEquals(subjURI, subject2);
+            }
+        }
+        
+        assertTrue(!subject.equals(subject2));
+
+        ConstructGraphTestUtils.ryaStatementsEqualIgnoresBlankNode(statements, statements2);
+    }
+    
+    
+    @Test
+    public void testConstructGraphSerializer() throws MalformedQueryException {
+        
+        String query = "select ?x where { ?x <uri:talksTo> <uri:Bob>. ?y <uri:worksAt> ?z }";
+
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructGraph graph = new ConstructGraph(patterns);
+        
+        String constructString = ConstructGraphSerializer.toConstructString(graph);
+        ConstructGraph deserialized = ConstructGraphSerializer.toConstructGraph(constructString);
+        
+        assertEquals(graph, deserialized);
+        
+    }
+    
+    @Test
+    public void testConstructGraphSerializerBlankNode() throws MalformedQueryException {
+        
+        String query = "select ?x where { _:b <uri:talksTo> ?x. _:b <uri:worksAt> ?y }";
+
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructGraph graph = new ConstructGraph(patterns);
+        
+        String constructString = ConstructGraphSerializer.toConstructString(graph);
+        ConstructGraph deserialized = ConstructGraphSerializer.toConstructGraph(constructString);
+        
+        assertEquals(graph, deserialized);
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java
new file mode 100644
index 0000000..a12b6de
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java
@@ -0,0 +1,126 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.junit.Assert;
+import org.openrdf.model.Statement;
+
+import com.google.common.base.Objects;
+
+public class ConstructGraphTestUtils {
+
+    public static void ryaStatementSetsEqualIgnoresTimestamp(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+        Assert.assertEquals(new VisibilityStatementSet(statements1), new VisibilityStatementSet(statements2));
+    }
+
+    public static void subGraphsEqualIgnoresTimestamp(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+        Set<VisibilityStatementSet> set1 = new HashSet<>();
+        Set<VisibilityStatementSet> set2 = new HashSet<>();
+        subgraph1.forEach(x->set1.add(new VisibilityStatementSet(x.getStatements())));
+        subgraph2.forEach(x->set2.add(new VisibilityStatementSet(x.getStatements())));
+        Assert.assertEquals(set1, set2);
+    }
+    
+    public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+        Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>();
+        subgraph1.forEach(x->subGraphMap.put(getKey(x), x));
+        subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements()));
+    }
+    
+    private static int getKey(RyaSubGraph subgraph) {
+        int key = 0;
+        for(RyaStatement statement: subgraph.getStatements()) {
+            key += statement.getObject().hashCode();
+        }
+        return key;
+    }
+    
+    public static void ryaStatementsEqualIgnoresBlankNode(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+        Map<String, RyaURI> bNodeMap = new HashMap<>();
+        statements1.forEach(x-> bNodeMap.put(x.getPredicate().getData(), x.getSubject()));
+        statements2.forEach(x -> x.setSubject(bNodeMap.get(x.getPredicate().getData())));
+        ryaStatementSetsEqualIgnoresTimestamp(statements1, statements2);
+    }
+    
+    
+    /**
+     *  Class used for comparing Sets of RyaStatements while ignoring timestamps.
+     *  It is assumed that all RyaStatements in the Set used to construct this class
+     *  have the same visibility.
+     */
+    public static class VisibilityStatementSet {
+        
+        private Set<Statement> statements;
+        private String visibility;
+        
+        public VisibilityStatementSet(Set<RyaStatement> statements) {
+            this.statements = new HashSet<>();
+            statements.forEach(x -> {
+                this.statements.add(RyaToRdfConversions.convertStatement(x));
+                if (visibility == null) {
+                    if (x.getColumnVisibility() != null) {
+                        visibility = new String(x.getColumnVisibility());
+                    } else {
+                        this.visibility = "";
+                    }
+                }
+            });
+        }
+        
+        public VisibilityStatementSet(RyaSubGraph subgraph) {
+            this(subgraph.getStatements());
+        }
+        
+        @Override
+        public boolean equals(Object o) {
+            if(this == o) {
+                return true;
+            }
+            
+            if(o instanceof VisibilityStatementSet) {
+                VisibilityStatementSet that = (VisibilityStatementSet) o;
+                return Objects.equal(this.visibility, that.visibility) && Objects.equal(this.statements, that.statements);
+            }
+            
+            return false;
+        }
+        
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(visibility, statements);
+        }
+        
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            return builder.append("Visiblity Statement Set \n").append("   Statements: " + statements + "\n")
+                    .append("   Visibilities: " + visibility + " \n").toString();
+        }
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java
new file mode 100644
index 0000000..080031e
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java
@@ -0,0 +1,112 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static org.junit.Assert.assertEquals;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.BNode;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+public class ConstructProjectionTest {
+
+    private static final ValueFactory vf = new ValueFactoryImpl();
+    
+    @Test
+    public void testConstructProjectionProjectSubj() throws MalformedQueryException, UnsupportedEncodingException {
+        String query = "select ?x where { ?x <uri:talksTo> <uri:Bob> }";
+        
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructProjection projection = new ConstructProjection(patterns.get(0));
+        
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("x", vf.createURI("uri:Joe"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs, "FOUO");
+        RyaStatement statement = projection.projectBindingSet(vBs, new HashMap<>());
+        
+        RyaStatement expected = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob"));
+        expected.setColumnVisibility("FOUO".getBytes("UTF-8"));
+        expected.setTimestamp(statement.getTimestamp());
+        
+        assertEquals(expected, statement);
+    }
+    
+    @Test
+    public void testConstructProjectionProjPred() throws MalformedQueryException {
+        String query = "select ?p where { <uri:Joe> ?p <uri:Bob> }";
+        
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructProjection projection = new ConstructProjection(patterns.get(0));
+        
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("p", vf.createURI("uri:worksWith"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+        RyaStatement statement = projection.projectBindingSet(vBs, new HashMap<>());
+        
+        RyaStatement expected = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:worksWith"), new RyaURI("uri:Bob"));
+        expected.setTimestamp(statement.getTimestamp());
+        expected.setColumnVisibility(new byte[0]);
+        
+        assertEquals(expected, statement);
+    }
+    
+    @Test
+    public void testConstructProjectionBNodes() throws MalformedQueryException {
+        String query = "select ?o where { _:b <uri:talksTo> ?o }";
+        
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructProjection projection = new ConstructProjection(patterns.get(0));
+        
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("o", vf.createURI("uri:Bob"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+        BNode bNode = vf.createBNode();
+        Map<String, BNode> bNodeMap = new HashMap<>();
+        bNodeMap.put("-anon-1", bNode);
+        RyaStatement statement = projection.projectBindingSet(vBs,bNodeMap);
+        
+        RyaStatement expected = new RyaStatement(RdfToRyaConversions.convertResource(bNode), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob"));
+        expected.setTimestamp(statement.getTimestamp());
+        expected.setColumnVisibility(new byte[0]);
+        
+        assertEquals(expected, statement);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
index 4ad5189..60e1bc1 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
@@ -38,10 +38,8 @@ public class FluoStringConverterTest {
         // Setup a StatementPattern that represents "?x <http://worksAt> <http://Chipotle>."
         final Var subject = new Var("x");
         final Var predicate = new Var("-const-http://worksAt", new URIImpl("http://worksAt"));
-        predicate.setAnonymous(true);
         predicate.setConstant(true);
         final Var object = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle"));
-        object.setAnonymous(true);
         object.setConstant(true);
         final StatementPattern pattern = new StatementPattern(subject, predicate, object);
 
@@ -69,10 +67,8 @@ public class FluoStringConverterTest {
         // Enusre it converted to the expected result.
         final Var subject = new Var("x");
         final Var predicate = new Var("-const-http://worksAt", new URIImpl("http://worksAt"));
-        predicate.setAnonymous(true);
         predicate.setConstant(true);
         final Var object = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle"));
-        object.setAnonymous(true);
         object.setConstant(true);
         final StatementPattern expected = new StatementPattern(subject, predicate, object);
 
@@ -89,7 +85,6 @@ public class FluoStringConverterTest {
 
         // Ensure it converted to the expected result.
         final Var expected = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle"));
-        expected.setAnonymous(true);
         expected.setConstant(true);
 
         assertEquals(expected, var);
@@ -105,7 +100,6 @@ public class FluoStringConverterTest {
 
         // Ensure it converted to the expected result.
         final Var expected = new Var("-const-5", new LiteralImpl("5", XMLSchema.INTEGER));
-        expected.setAnonymous(true);
         expected.setConstant(true);
 
         assertEquals(expected, result);
@@ -121,7 +115,6 @@ public class FluoStringConverterTest {
 
         // Ensure it converted to the expected result.
         final Var expected = new Var("-const-Chipotle", new LiteralImpl("Chipotle", XMLSchema.STRING));
-        expected.setAnonymous(true);
         expected.setConstant(true);
 
         assertEquals(expected, result);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java
new file mode 100644
index 0000000..8b9feaf
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java
@@ -0,0 +1,57 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static org.junit.Assert.assertEquals;
+
+import java.util.UUID;
+
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.junit.Test;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+public class RyaSubGraphKafkaSerDeTest {
+
+    private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe();
+    
+    @Test
+    public void serializationTestWithURI() {
+        RyaSubGraph bundle = new RyaSubGraph(UUID.randomUUID().toString());
+        bundle.addStatement(new RyaStatement(new RyaURI("uri:123"), new RyaURI("uri:234"), new RyaURI("uri:345")));
+        bundle.addStatement(new RyaStatement(new RyaURI("uri:345"), new RyaURI("uri:567"), new RyaURI("uri:789")));
+        byte[] bundleBytes = serializer.toBytes(bundle);
+        RyaSubGraph deserializedBundle = serializer.fromBytes(bundleBytes);
+        assertEquals(bundle, deserializedBundle);
+    }
+    
+    
+    @Test
+    public void serializationTestWithLiteral() {
+        RyaSubGraph bundle = new RyaSubGraph(UUID.randomUUID().toString());
+        bundle.addStatement(new RyaStatement(new RyaURI("uri:123"), new RyaURI("uri:234"), new RyaType(XMLSchema.INTEGER, "345")));
+        bundle.addStatement(new RyaStatement(new RyaURI("uri:345"), new RyaURI("uri:567"), new RyaType(XMLSchema.INTEGER, "789")));
+        byte[] bundleBytes = serializer.toBytes(bundle);
+        RyaSubGraph deserializedBundle = serializer.fromBytes(bundleBytes);
+        assertEquals(bundle, deserializedBundle);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
index 74193cf..b9c10d4 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
@@ -28,7 +28,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
 import org.junit.Test;
 
 /**
@@ -93,7 +93,7 @@ public class KafkaExportParametersTest {
 
     @Test
     public void testKafkaResultExporterFactory() {
-        KafkaResultExporterFactory factory = new KafkaResultExporterFactory();
+        KafkaBindingSetExporterFactory factory = new KafkaBindingSetExporterFactory();
         assertNotNull(factory);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
index e1c386d..99ccc58 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
@@ -25,6 +25,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
@@ -55,13 +56,26 @@ public class QueryReportRenderer {
 
         final FluoQuery metadata = queryReport.getFluoQuery();
 
-        final QueryMetadata queryMetadata = metadata.getQueryMetadata();
-        builder.appendItem( new ReportItem("QUERY NODE") );
-        builder.appendItem( new ReportItem("Node ID", queryMetadata.getNodeId()) );
-        builder.appendItem( new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()) );
-        builder.appendItem( new ReportItem("SPARQL", prettyFormatSparql( queryMetadata.getSparql()) ) );
-        builder.appendItem( new ReportItem("Child Node ID", queryMetadata.getChildNodeId()) );
-        builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())) );
+        switch (metadata.getQueryType()) {
+        case Projection:
+            final QueryMetadata queryMetadata = metadata.getQueryMetadata().get();
+            builder.appendItem(new ReportItem("QUERY NODE"));
+            builder.appendItem(new ReportItem("Node ID", queryMetadata.getNodeId()));
+            builder.appendItem(new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()));
+            builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(queryMetadata.getSparql())));
+            builder.appendItem(new ReportItem("Child Node ID", queryMetadata.getChildNodeId()));
+            builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())));
+            break;
+        case Construct:
+            final ConstructQueryMetadata constructMetadata = metadata.getConstructQueryMetadata().get();
+            builder.appendItem(new ReportItem("CONSTRUCT QUERY NODE"));
+            builder.appendItem(new ReportItem("Node ID", constructMetadata.getNodeId()));
+            builder.appendItem(new ReportItem("Variable Order", constructMetadata.getVariableOrder().toString()));
+            builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(constructMetadata.getSparql())));
+            builder.appendItem(new ReportItem("Child Node ID", constructMetadata.getChildNodeId()));
+            builder.appendItem(new ReportItem("Construct Graph", constructMetadata.getConstructGraph().toString()));
+            builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(constructMetadata.getNodeId())));
+        }
 
         for(final FilterMetadata filterMetadata : metadata.getFilterMetadata()) {
             builder.appendItem( new ReportItem("") );

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index 9263362..85edb11 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -1,100 +1,98 @@
 <?xml version="1.0" encoding="utf-8"?>
 <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-    license agreements. See the NOTICE file distributed with this work for additional 
-    information regarding copyright ownership. The ASF licenses this file to 
-    you under the Apache License, Version 2.0 (the "License"); you may not use 
-    this file except in compliance with the License. You may obtain a copy of 
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-    by applicable law or agreed to in writing, software distributed under the 
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-    OF ANY KIND, either express or implied. See the License for the specific 
-    language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	you under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
-    <parent>
-        <groupId>org.apache.rya</groupId>
-        <artifactId>rya.pcj.fluo.parent</artifactId>
-        <version>3.2.11-incubating-SNAPSHOT</version>
-    </parent>
+	<parent>
+		<groupId>org.apache.rya</groupId>
+		<artifactId>rya.pcj.fluo.parent</artifactId>
+		<version>3.2.11-incubating-SNAPSHOT</version>
+	</parent>
 
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>rya.pcj.fluo.integration</artifactId>
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>rya.pcj.fluo.integration</artifactId>
 
-    <name>Apache Rya PCJ Fluo Integration Tests</name>
-    <description>Integration tests for the Rya Fluo application.</description>
+	<name>Apache Rya PCJ Fluo Integration Tests</name>
+	<description>Integration tests for the Rya Fluo application.</description>
 
-    <dependencies>
-        <!-- Rya Runtime Dependencies. -->
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.pcj.fluo.api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.pcj.fluo.client</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.indexing</artifactId>
-        </dependency>
-         <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-api</artifactId>
-        </dependency>
+	<dependencies>
+		<!-- Rya Runtime Dependencies. -->
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.pcj.fluo.api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.pcj.fluo.client</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.indexing</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.fluo</groupId>
+			<artifactId>fluo-api</artifactId>
+		</dependency>
 
-        <!-- Testing dependencies. -->
-        <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-mini</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-         <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-api</artifactId>
-        </dependency>
+		<!-- Testing dependencies. -->
+		<dependency>
+			<groupId>org.apache.fluo</groupId>
+			<artifactId>fluo-mini</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
 
-        <dependency>
-          <groupId>org.apache.kafka</groupId>
-          <artifactId>kafka-clients</artifactId>
-          <version>0.10.1.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
-            <version>0.10.1.0</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>slf4j-log4j12</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <!-- Testing dependencies. -->
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
-            <version>0.10.1.0</version>
-            <classifier>test</classifier>
-            <exclusions>
-                <exclusion>
-                    <artifactId>slf4j-log4j12</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-             <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-recipes-test</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>0.10.1.0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.11</artifactId>
+			<version>0.10.1.0</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>slf4j-log4j12</artifactId>
+					<groupId>org.slf4j</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<!-- Testing dependencies. -->
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.11</artifactId>
+			<version>0.10.1.0</version>
+			<classifier>test</classifier>
+			<exclusions>
+				<exclusion>
+					<artifactId>slf4j-log4j12</artifactId>
+					<groupId>org.slf4j</groupId>
+				</exclusion>
+			</exclusions>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.fluo</groupId>
+			<artifactId>fluo-recipes-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
new file mode 100644
index 0000000..124569b
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
@@ -0,0 +1,126 @@
+package org.apache.rya.indexing.pcj.fluo;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.junit.Assert;
+import org.openrdf.model.Statement;
+
+import com.google.common.base.Objects;
+
+public class ConstructGraphTestUtils {
+
+    public static void ryaStatementSetsEqualIgnoresTimestamp(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+        Assert.assertEquals(new VisibilityStatementSet(statements1), new VisibilityStatementSet(statements2));
+    }
+
+    public static void subGraphsEqualIgnoresTimestamp(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+        Set<VisibilityStatementSet> set1 = new HashSet<>();
+        Set<VisibilityStatementSet> set2 = new HashSet<>();
+        subgraph1.forEach(x->set1.add(new VisibilityStatementSet(x.getStatements())));
+        subgraph2.forEach(x->set2.add(new VisibilityStatementSet(x.getStatements())));
+        Assert.assertEquals(set1, set2);
+    }
+    
+    public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+        Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>();
+        subgraph1.forEach(x->subGraphMap.put(getKey(x), x));
+        subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements()));
+    }
+    
+    private static int getKey(RyaSubGraph subgraph) {
+        int key = 0;
+        for(RyaStatement statement: subgraph.getStatements()) {
+            key += statement.getObject().hashCode();
+        }
+        return key;
+    }
+    
+    public static void ryaStatementsEqualIgnoresBlankNode(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+        Map<String, RyaURI> bNodeMap = new HashMap<>();
+        statements1.forEach(x-> bNodeMap.put(x.getPredicate().getData(), x.getSubject()));
+        statements2.forEach(x -> x.setSubject(bNodeMap.get(x.getPredicate().getData())));
+        ryaStatementSetsEqualIgnoresTimestamp(statements1, statements2);
+    }
+    
+    
+    /**
+     *  Class used for comparing Sets of RyaStatements while ignoring timestamps.
+     *  It is assumed that all RyaStatements in the Set used to construct this class
+     *  have the same visibility.
+     */
+    public static class VisibilityStatementSet {
+        
+        private Set<Statement> statements;
+        private String visibility;
+        
+        public VisibilityStatementSet(Set<RyaStatement> statements) {
+            this.statements = new HashSet<>();
+            statements.forEach(x -> {
+                this.statements.add(RyaToRdfConversions.convertStatement(x));
+                if (visibility == null) {
+                    if (x.getColumnVisibility() != null) {
+                        visibility = new String(x.getColumnVisibility());
+                    } else {
+                        this.visibility = "";
+                    }
+                }
+            });
+        }
+        
+        public VisibilityStatementSet(RyaSubGraph subgraph) {
+            this(subgraph.getStatements());
+        }
+        
+        @Override
+        public boolean equals(Object o) {
+            if(this == o) {
+                return true;
+            }
+            
+            if(o instanceof VisibilityStatementSet) {
+                VisibilityStatementSet that = (VisibilityStatementSet) o;
+                return Objects.equal(this.visibility, that.visibility) && Objects.equal(this.statements, that.statements);
+            }
+            
+            return false;
+        }
+        
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(visibility, statements);
+        }
+        
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            return builder.append("Visiblity Statement Set \n").append("   Statements: " + statements + "\n")
+                    .append("   Visibilities: " + visibility + " \n").toString();
+        }
+        
+    }
+    
+}