You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/18 18:51:29 UTC

[4/5] incubator-rya git commit: RYA-282-Nested-Query. Closes #192.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index 2084907..4b6f44e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -35,9 +35,13 @@ public class IncrementalUpdateConstants {
     public static final String FILTER_PREFIX = "FILTER";
     public static final String AGGREGATION_PREFIX = "AGGREGATION";
     public static final String QUERY_PREFIX = "QUERY";
+    public static final String PROJECTION_PREFIX = "PROJECTION";
     public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
     public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
     
+    public static enum QueryType{Construct, Projection, Periodic};
+    public static enum ExportStrategy{Rya, Kafka};
+    
     public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId;
 
     public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI";

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index 9b65b34..0f448a6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -139,9 +139,9 @@ public class JoinResultUpdater {
 
             // Create the Row Key for the emitted binding set. It does not contain visibilities.
             final Bytes resultRow = RowKeyUtil.makeRowKey(joinMetadata.getNodeId(), joinVarOrder, newJoinResult);
-
-            // Only insert the join Binding Set if it is new.
-            if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null) {
+            
+            // Only insert the join Binding Set if it is new or BindingSet contains values not used in resultRow.
+            if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null || joinVarOrder.getVariableOrders().size() < newJoinResult.size()) {
                 // Create the Node Value. It does contain visibilities.
                 final Bytes nodeValueBytes = BS_SERDE.serialize(newJoinResult);
 
@@ -210,18 +210,28 @@ public class JoinResultUpdater {
         final NodeType nodeType = NodeType.fromNodeId(nodeId).get();
         switch(nodeType) {
             case STATEMENT_PATTERN:
-                return queryDao.readStatementPatternMetadata(tx, nodeId).getVariableOrder();
-
+                return removeBinIdFromVarOrder(queryDao.readStatementPatternMetadata(tx, nodeId).getVariableOrder());
             case FILTER:
-                return queryDao.readFilterMetadata(tx, nodeId).getVariableOrder();
-
+                return removeBinIdFromVarOrder(queryDao.readFilterMetadata(tx, nodeId).getVariableOrder());
             case JOIN:
-                return queryDao.readJoinMetadata(tx, nodeId).getVariableOrder();
-
+                return removeBinIdFromVarOrder(queryDao.readJoinMetadata(tx, nodeId).getVariableOrder());
+            case PROJECTION: 
+                return removeBinIdFromVarOrder(queryDao.readProjectionMetadata(tx, nodeId).getVariableOrder());
             default:
                 throw new IllegalArgumentException("Could not figure out the variable order for node with ID: " + nodeId);
         }
     }
+    
+    private VariableOrder removeBinIdFromVarOrder(VariableOrder varOrder) {
+        List<String> varOrderList = varOrder.getVariableOrders();
+        if(varOrderList.get(0).equals(IncrementalUpdateConstants.PERIODIC_BIN_ID)) {
+            List<String> updatedVarOrderList = Lists.newArrayList(varOrderList);
+            updatedVarOrderList.remove(0);
+            return new VariableOrder(updatedVarOrderList);
+        } else {
+            return varOrder;
+        }
+    }
 
     /**
      * Assuming that the common variables between two children are already
@@ -285,6 +295,9 @@ public class JoinResultUpdater {
             case JOIN:
                 column = FluoQueryColumns.JOIN_BINDING_SET;
                 break;
+            case PROJECTION:
+                column = FluoQueryColumns.PROJECTION_BINDING_SET;
+                break;
             default:
                 throw new IllegalArgumentException("The child node's sibling is not of type StatementPattern, Join, Left Join, or Filter.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
index b8fc2d9..a6fc5ea 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
@@ -24,10 +24,12 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CO
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
 
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.fluo.api.data.Column;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
@@ -39,13 +41,14 @@ import com.google.common.base.Optional;
  * Represents the different types of nodes that a Query may have.
  */
 public enum NodeType {
-    PERIODIC_QUERY(QueryNodeMetadataColumns.PERIODIC_QUERY_COLUMNS, FluoQueryColumns.PERIODIC_QUERY_BINDING_SET),
-    FILTER (QueryNodeMetadataColumns.FILTER_COLUMNS, FluoQueryColumns.FILTER_BINDING_SET),
-    JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET),
-    STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET),
-    QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET),
-    AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET),
-    CONSTRUCT(QueryNodeMetadataColumns.CONSTRUCT_COLUMNS, FluoQueryColumns.CONSTRUCT_STATEMENTS);
+    PERIODIC_QUERY(IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX, QueryNodeMetadataColumns.PERIODIC_QUERY_COLUMNS, FluoQueryColumns.PERIODIC_QUERY_BINDING_SET),
+    FILTER (IncrementalUpdateConstants.FILTER_PREFIX, QueryNodeMetadataColumns.FILTER_COLUMNS, FluoQueryColumns.FILTER_BINDING_SET),
+    JOIN(IncrementalUpdateConstants.JOIN_PREFIX, QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET),
+    STATEMENT_PATTERN(IncrementalUpdateConstants.SP_PREFIX, QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET),
+    QUERY(IncrementalUpdateConstants.QUERY_PREFIX, QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET),
+    AGGREGATION(IncrementalUpdateConstants.AGGREGATION_PREFIX, QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET),
+    PROJECTION(IncrementalUpdateConstants.PROJECTION_PREFIX, QueryNodeMetadataColumns.PROJECTION_COLUMNS, FluoQueryColumns.PROJECTION_BINDING_SET),
+    CONSTRUCT(IncrementalUpdateConstants.CONSTRUCT_PREFIX, QueryNodeMetadataColumns.CONSTRUCT_COLUMNS, FluoQueryColumns.CONSTRUCT_STATEMENTS);
 
     //Metadata Columns associated with given NodeType
     private QueryNodeMetadataColumns metadataColumns;
@@ -53,15 +56,25 @@ public enum NodeType {
     //Column where results are stored for given NodeType
     private Column resultColumn;
 
+    //Prefix for the given node type
+    private String nodePrefix;
     /**
      * Constructs an instance of {@link NodeType}.
      *
      * @param metadataColumns - Metadata {@link Column}s associated with this {@link NodeType}. (not null)
      * @param resultColumn - The {@link Column} used to store this {@link NodeType}'s results. (not null)
      */
-    private NodeType(QueryNodeMetadataColumns metadataColumns, Column resultColumn) {
+    private NodeType(String nodePrefix, QueryNodeMetadataColumns metadataColumns, Column resultColumn) {
     	this.metadataColumns = requireNonNull(metadataColumns);
     	this.resultColumn = requireNonNull(resultColumn);
+    	this.nodePrefix = requireNonNull(nodePrefix);
+    }
+    
+    /**
+     * @return the prefix for the given node type
+     */
+    public String getNodeTypePrefix() {
+        return nodePrefix;
     }
 
     /**
@@ -103,10 +116,38 @@ public enum NodeType {
             type = AGGREGATION;
         } else if(nodeId.startsWith(CONSTRUCT_PREFIX)) {
             type = CONSTRUCT;
+        } else if(nodeId.startsWith(PROJECTION_PREFIX)) {
+            type = PROJECTION;
         } else if(nodeId.startsWith(PERIODIC_QUERY_PREFIX)) {
             type = PERIODIC_QUERY;
         }
 
         return Optional.fromNullable(type);
     }
+    
+    /**
+     * Creates an id for a given NodeType that is of the form {@link NodeType#getNodeTypePrefix()} + "_" + pcjId,
+     * where the pcjId is an auto generated UUID with all dashes removed.
+     * @param type {@link NodeType}
+     * @return id for the given NodeType
+     */
+    public static String generateNewFluoIdForType(NodeType type) {
+        String unique = UUID.randomUUID().toString().replaceAll("-", "");
+        // Put them together to create the Node ID.
+        return type.getNodeTypePrefix() + "_" + unique;
+    }
+    
+    /**
+     * Creates an id for a given NodeType that is of the form {@link NodeType#getNodeTypePrefix()} + "_" + pcjId
+     * 
+     * @param type {@link NodeType}
+     * @return id for the given NodeType
+     */
+    public static String generateNewIdForType(NodeType type, String pcjId) {
+        // Put them together to create the Node ID.
+        return type.getNodeTypePrefix() + "_" + pcjId;
+    }
+    
+    
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
index ae4912b..cb331cf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
@@ -34,7 +34,6 @@ import org.openrdf.model.Literal;
 import org.openrdf.model.Value;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.Binding;
 import org.openrdf.query.algebra.evaluation.QueryBindingSet;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
new file mode 100644
index 0000000..f9d8257
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+import org.openrdf.query.BindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Updates the results of a Projection node when one of its children has added a
+ * new Binding Set to its results.
+ */
+@DefaultAnnotation(NonNull.class)
+public class ProjectionResultUpdater {
+    private static final Logger log = Logger.getLogger(QueryResultUpdater.class);
+
+    private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
+
+    /**
+     * Updates the results of a Projection node when one of its children has added a
+     * new Binding Set to its results.
+     *
+     * @param tx - The transaction all Fluo queries will use. (not null)
+     * @param childBindingSet - A binding set that the query's child node has emmitted. (not null)
+     * @param projectionMetadata - The metadata of the Query whose results will be updated. (not null)
+     * @throws Exception A problem caused the update to fail.
+     */
+    public void updateProjectionResults(
+            final TransactionBase tx,
+            final VisibilityBindingSet childBindingSet,
+            final ProjectionMetadata projectionMetadata) throws Exception {
+        checkNotNull(tx);
+        checkNotNull(childBindingSet);
+        checkNotNull(projectionMetadata);
+
+        log.trace(
+                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                "Node ID: " + projectionMetadata.getNodeId() + "\n" +
+                "Parent Node ID: " + projectionMetadata.getParentNodeId() + "\n" +        
+                "Child Node ID: " + projectionMetadata.getChildNodeId() + "\n" +
+                "Child Binding Set:\n" + childBindingSet + "\n");
+
+        // Create the query's Binding Set from the child node's binding set.
+        final VariableOrder queryVarOrder = projectionMetadata.getVariableOrder();
+        final VariableOrder projectionVarOrder = projectionMetadata.getProjectedVars();
+        final BindingSet queryBindingSet = BindingSetUtil.keepBindings(projectionVarOrder, childBindingSet);
+
+        // Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables.
+        Bytes resultRow  = RowKeyUtil.makeRowKey(projectionMetadata.getNodeId(), queryVarOrder, queryBindingSet);
+
+        // Create the Binding Set that goes in the Node Value. It does contain visibilities.
+        final Bytes nodeValueBytes = BS_SERDE.serialize(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()));
+
+        log.trace(
+                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                "New Binding Set: " + childBindingSet + "\n");
+
+        tx.set(resultRow, FluoQueryColumns.PROJECTION_BINDING_SET, nodeValueBytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index 44fc9bd..37d7256 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -23,16 +23,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
 import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
-import org.openrdf.query.BindingSet;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -45,7 +41,6 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 public class QueryResultUpdater {
     private static final Logger log = Logger.getLogger(QueryResultUpdater.class);
 
-    private static final FluoQueryMetadataDAO METADATA_DA0 = new FluoQueryMetadataDAO();
     private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
 
     /**
@@ -73,23 +68,12 @@ public class QueryResultUpdater {
 
         // Create the query's Binding Set from the child node's binding set.
         final VariableOrder queryVarOrder = queryMetadata.getVariableOrder();
-        final BindingSet queryBindingSet = BindingSetUtil.keepBindings(queryVarOrder, childBindingSet);
 
         // Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables.
-        final Bytes resultRow;
-
-        final String childNodeId = queryMetadata.getChildNodeId();
-        final boolean isGrouped = childNodeId.startsWith( IncrementalUpdateConstants.AGGREGATION_PREFIX );
-        if(isGrouped) {
-            final AggregationMetadata aggMetadata = METADATA_DA0.readAggregationMetadata(tx, childNodeId);
-            final VariableOrder groupByVars = aggMetadata.getGroupByVariableOrder();
-            resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), groupByVars, queryBindingSet);
-        } else {
-            resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), queryVarOrder, queryBindingSet);
-        }
+        final Bytes resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), queryVarOrder, childBindingSet);
 
         // Create the Binding Set that goes in the Node Value. It does contain visibilities.
-        final Bytes nodeValueBytes = BS_SERDE.serialize(new VisibilityBindingSet(queryBindingSet,childBindingSet.getVisibility()));
+        final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);
 
         log.trace(
                 "Transaction ID: " + tx.getStartTimestamp() + "\n" +

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
index a15743f..fa27b46 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
@@ -30,8 +30,6 @@ import org.apache.rya.api.domain.RyaSubGraph;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application 
  *

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index 3a731c2..7d0fd5e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -31,6 +31,7 @@ import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.ProjectionResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
@@ -38,6 +39,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
@@ -61,6 +63,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
     private final QueryResultUpdater queryUpdater = new QueryResultUpdater();
     private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater();
     private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater();
+    private final ProjectionResultUpdater projectionUpdater = new ProjectionResultUpdater();
     private final PeriodicQueryUpdater periodicQueryUpdater = new PeriodicQueryUpdater();
 
     @Override
@@ -107,6 +110,15 @@ public abstract class BindingSetUpdater extends AbstractObserver {
                 }
                 break;
 
+            case PROJECTION:
+                final ProjectionMetadata projectionQuery = queryDao.readProjectionMetadata(tx, parentNodeId);
+                try {
+                    projectionUpdater.updateProjectionResults(tx, observedBindingSet, projectionQuery);
+                } catch (final Exception e) {
+                    throw new RuntimeException("Could not process a Query node.", e);
+                }
+                break;    
+                
             case CONSTRUCT: 
                 final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId);
                 try{
@@ -154,7 +166,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
 
 
             default:
-                throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, PeriodicBin or Query, but was " + parentNodeType);
+                throw new IllegalArgumentException("The parent node's NodeType must be of type Aggregation, Projection, ConstructQuery, Filter, Join, PeriodicBin or Query, but was " + parentNodeType);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
new file mode 100644
index 0000000..b712606
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+
+/**
+ * Performs incremental result exporting to the configured destinations.
+ */
+public class ProjectionObserver extends BindingSetUpdater {
+    private static final Logger log = Logger.getLogger(ProjectionObserver.class);
+
+    private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
+    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+        return new ObservedColumn(FluoQueryColumns.PROJECTION_BINDING_SET, NotificationType.STRONG);
+    }
+
+    @Override
+    public Observation parseObservation(final TransactionBase tx, final Bytes row) throws Exception {
+        requireNonNull(tx);
+        requireNonNull(row);
+
+        // Read the Filter metadata.
+        final String projectionNodeId = BindingSetRow.make(row).getNodeId();
+        final ProjectionMetadata projectionMetadata = queryDao.readProjectionMetadata(tx, projectionNodeId);
+
+        // Read the Visibility Binding Set from the value.
+        final Bytes valueBytes = tx.get(row, FluoQueryColumns.PROJECTION_BINDING_SET);
+        final VisibilityBindingSet projectionBindingSet = BS_SERDE.deserialize(valueBytes);
+
+        // Figure out which node needs to handle the new metadata.
+        final String parentNodeId = projectionMetadata.getParentNodeId();
+
+        return new Observation(projectionNodeId, projectionBindingSet, parentNodeId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index fbdca08..e6368ba 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -107,7 +107,7 @@ public class QueryResultObserver extends AbstractObserver {
         // Read the Child Binding Set that will be exported.
         final Bytes valueBytes = tx.get(brow, col);
         final VisibilityBindingSet result = BS_SERDE.deserialize(valueBytes);
-
+        
         // Simplify the result's visibilities.
         final String visibility = result.getVisibility();
         if(!simplifiedVisibilities.containsKey(visibility)) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
index ff42a0f..eaa072f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
@@ -287,7 +287,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
      * Builds instances of {@link AggregationMetadata}.
      */
     @DefaultAnnotation(NonNull.class)
-    public static final class Builder {
+    public static final class Builder implements CommonNodeMetadata.Builder {
 
         private final String nodeId;
         private VariableOrder varOrder;
@@ -317,7 +317,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
          *   single variable because aggregations are only able to emit the aggregated value.
          * @return This builder so that method invocations may be chained.
          */
-        public Builder setVariableOrder(@Nullable final VariableOrder varOrder) {
+        public Builder setVarOrder(@Nullable final VariableOrder varOrder) {
             this.varOrder = varOrder;
             return this;
         }
@@ -350,6 +350,10 @@ public class AggregationMetadata extends CommonNodeMetadata {
             this.childNodeId = childNodeId;
             return this;
         }
+        
+        public String getChildNodeId() {
+            return childNodeId;
+        }
 
         /**
          * @param aggregation - An aggregation that will be performed over the BindingSets that are emitted from the child node.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java
index e54acf1..a20fe4d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java
@@ -99,4 +99,18 @@ public abstract class CommonNodeMetadata {
                 .append("}")
                 .toString();
     }
+    
+    /**
+     * Base interface for all metadata Builders.  Using this type def
+     * allows for the implementation of a Builder visitor for navigating
+     * the Builder tree.
+     *
+     */
+    public static interface Builder {
+        
+        public String getNodeId();
+        
+        public VariableOrder getVariableOrder();
+    }
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
index e836c5d..6bf968e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
@@ -38,7 +38,7 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
 
     private String childNodeId;
     private ConstructGraph graph;
-    private String sparql;
+    private String parentNodeId;
 
     /**
      * Creates ConstructQueryMetadata object from the provided metadata arguments.
@@ -47,21 +47,11 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
      * @param graph - {@link ConstructGraph} used to project {@link BindingSet}s onto sets of statement representing construct graph
      * @param sparql - SPARQL query containing construct graph
      */
-    public ConstructQueryMetadata(String nodeId, String childNodeId, ConstructGraph graph, String sparql) {
-        super(nodeId, new VariableOrder("subject", "predicate", "object"));
-        Preconditions.checkNotNull(childNodeId);
-        Preconditions.checkNotNull(graph);
-        Preconditions.checkNotNull(sparql);
-        this.childNodeId = childNodeId;
-        this.graph = graph;
-        this.sparql = sparql;
-    }
-
-    /**
-     * @return sparql query string representing this construct query
-     */
-    public String getSparql() {
-        return sparql;
+    public ConstructQueryMetadata(String nodeId, String parentNodeId, String childNodeId, VariableOrder varOrder, ConstructGraph graph) {
+        super(nodeId, varOrder);
+        this.childNodeId = Preconditions.checkNotNull(childNodeId);
+        this.parentNodeId = Preconditions.checkNotNull(parentNodeId);
+        this.graph = Preconditions.checkNotNull(graph);
     }
 
     /**
@@ -71,6 +61,13 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
     public String getChildNodeId() {
         return childNodeId;
     }
+    
+    /**
+     * @return The parent of this construct node
+     */
+    public String getParentNodeId() {
+        return parentNodeId;
+    }
 
     /**
      * @return The ConstructGraph used to form statement {@link BindingSet}s for
@@ -82,7 +79,7 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), childNodeId, graph, sparql);
+        return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), parentNodeId, childNodeId, graph);
     }
 
     @Override
@@ -94,8 +91,8 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
         if (o instanceof ConstructQueryMetadata) {
             ConstructQueryMetadata queryMetadata = (ConstructQueryMetadata) o;
             if (super.equals(queryMetadata)) {
-                return new EqualsBuilder().append(childNodeId, queryMetadata.childNodeId).append(graph, queryMetadata.graph)
-                        .append(sparql, queryMetadata.sparql).isEquals();
+                return new EqualsBuilder().append(parentNodeId, queryMetadata.parentNodeId).append(childNodeId, queryMetadata.childNodeId).append(graph, queryMetadata.graph)
+                        .isEquals();
             }
             return false;
         }
@@ -105,7 +102,7 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
     @Override
     public String toString() {
         return new StringBuilder().append("Construct Query Metadata {\n").append("    Node ID: " + super.getNodeId() + "\n")
-                .append("    SPARQL QUERY: " + sparql + "\n").append("    Variable Order: " + super.getVariableOrder() + "\n")
+                .append("    Variable Order: " + super.getVariableOrder() + "\n")
                 .append("    Child Node ID: " + childNodeId + "\n").append("    Construct Graph: " + graph.getProjections() + "\n")
                 .append("}").toString();
     }
@@ -123,13 +120,14 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
      * Builds instances of {@link QueryMetadata}.
      */
     @DefaultAnnotation(NonNull.class)
-    public static final class Builder {
+    public static final class Builder implements CommonNodeMetadata.Builder {
 
 
         private String nodeId;
         private ConstructGraph graph;
+        private String parentNodeId;
         private String childNodeId;
-        private String sparql;
+        private VariableOrder varOrder;
 
         /**
          * Set the node Id that identifies this Construct Query Node
@@ -144,21 +142,31 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
         }
         
         /**
-         * Set the SPARQL String representing this construct query
-         * @param SPARQL string representing this construct query
+         * @return the node id for this construct query
+         */
+        public String getNodeId() {
+            return nodeId;
+        }
+        
+        /**
+         * Sets the VariableOrder that determines how results will be written
+         * @param varOrder
+         * @return This builder so that method invocations may be chained.
          */
-        public Builder setSparql(String sparql) {
-            this.sparql = sparql;
+        public Builder setVarOrder(VariableOrder varOrder) {
+            this.varOrder = varOrder;
             return this;
         }
+        
+        @Override
+        public VariableOrder getVariableOrder() {
+            return varOrder;
+        }
 
         /**
-         * Set the ConstructGraph used to form statement {@link BindingSet}s for
-         * this Construct Query
+         * Set the ConstructGraph used to form statement {@link BindingSet}s for this Construct Query
          *
-         * @param varOrder
-         *            - ConstructGraph to project {@link BindingSet}s onto RDF
-         *            statements
+         * @param varOrder - ConstructGraph to project {@link BindingSet}s onto RDF statements
          * @return This builder so that method invocations may be chained.
          */
         public Builder setConstructGraph(ConstructGraph graph) {
@@ -167,25 +175,37 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
         }
 
         /**
-         * Set the node whose results are projected onto the given
-         * {@link ConstructGraph}.
+         * Set the node whose results are projected onto the given {@link ConstructGraph}.
          *
-         * @param childNodeId
-         *            - The node whose results are projected onto the given
-         *            {@link ConstructGraph}.
+         * @param childNodeId - The node whose results are projected onto the given {@link ConstructGraph}.
          * @return This builder so that method invocations may be chained.
          */
         public Builder setChildNodeId(String childNodeId) {
             this.childNodeId = childNodeId;
             return this;
         }
+        
+        public String getChildNodeId() {
+            return childNodeId;
+        }
+        
+        /**
+         * Set the parent node of this {@link ConstructGraph}.
+         *
+         * @param parentNodeId - The the parent node of this {@link ConstructGraph}.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setParentNodeId(String parentNodeId) {
+            this.parentNodeId = parentNodeId;
+            return this;
+        }
 
         /**
          * @return An instance of {@link ConstructQueryMetadata} build using
          *         this builder's values.
          */
         public ConstructQueryMetadata build() {
-            return new ConstructQueryMetadata(nodeId, childNodeId, graph, sparql);
+            return new ConstructQueryMetadata(nodeId, parentNodeId, childNodeId, varOrder, graph);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
index 7e2e995..a821d8c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
@@ -145,7 +145,7 @@ public class FilterMetadata extends CommonNodeMetadata {
      * Builds instances of {@link FilterMetadata}.
      */
     @DefaultAnnotation(NonNull.class)
-    public static final class Builder {
+    public static final class Builder implements CommonNodeMetadata.Builder{
 
         private final String nodeId;
         private VariableOrder varOrder;
@@ -179,6 +179,11 @@ public class FilterMetadata extends CommonNodeMetadata {
             this.varOrder = varOrder;
             return this;
         }
+        
+        @Override
+        public VariableOrder getVariableOrder() {
+            return varOrder;
+        }
 
         /**
          * Set the original SPARQL query the filter is derived from.
@@ -212,6 +217,10 @@ public class FilterMetadata extends CommonNodeMetadata {
             this.childNodeId = childNodeId;
             return this;
         }
+        
+        public String getChildNodeId() {
+            return childNodeId;
+        }
 
         /**
          * @return Returns an instance of {@link FilterMetadata} using this builder's values.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index 8d218af..65db02c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
@@ -44,7 +45,8 @@ import net.jcip.annotations.Immutable;
 @DefaultAnnotation(NonNull.class)
 public class FluoQuery {
 
-    private final Optional<QueryMetadata> queryMetadata;
+    private final QueryMetadata queryMetadata;
+    private final ImmutableMap<String, ProjectionMetadata> projectionMetadata;
     private final Optional<ConstructQueryMetadata> constructMetadata;
     private final Optional<PeriodicQueryMetadata> periodicQueryMetadata;
     private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata;
@@ -52,13 +54,15 @@ public class FluoQuery {
     private final ImmutableMap<String, JoinMetadata> joinMetadata;
     private final ImmutableMap<String, AggregationMetadata> aggregationMetadata;
     private final QueryType type;
-    public static enum QueryType {Projection, Construct};
+    private final String queryId;
 
     /**
      * Constructs an instance of {@link FluoQuery}. Private because applications
      * must use {@link Builder} instead.
      *
-     * @param queryMetadata - The root node of a query that is updated in Fluo. (not null)
+     * @param queryMetadata - metadata for the query for handling results (not null)
+     * @param projectionMetadata - projection nodes of query that project results (not null)
+     * @param constructMetadata - construct node of query that creates subgraphs
      * @param periodicQueryMetadata - The periodic query node that is updated in Fluo.
      * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as
      *   it is represented within the Fluo app. (not null)
@@ -71,52 +75,27 @@ public class FluoQuery {
      */
     private FluoQuery(
             final QueryMetadata queryMetadata,
+            final ImmutableMap<String, ProjectionMetadata> projectionMetadata,
+            final Optional<ConstructQueryMetadata> constructMetadata,
             final Optional<PeriodicQueryMetadata> periodicQueryMetadata,
             final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
             final ImmutableMap<String, FilterMetadata> filterMetadata,
             final ImmutableMap<String, JoinMetadata> joinMetadata, 
             final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
                 this.aggregationMetadata = requireNonNull(aggregationMetadata);
-        this.queryMetadata = Optional.of(requireNonNull(queryMetadata));
-        this.constructMetadata = Optional.absent();
+        this.queryMetadata = requireNonNull(queryMetadata);
+        this.queryId = queryMetadata.getNodeId();
+        this.projectionMetadata = requireNonNull(projectionMetadata);
+        this.constructMetadata = constructMetadata;
         this.periodicQueryMetadata = periodicQueryMetadata;
         this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
         this.filterMetadata = requireNonNull(filterMetadata);
         this.joinMetadata = requireNonNull(joinMetadata);
-        this.type = QueryType.Projection;
-    }
-    
-    
-    /**
-     * Constructs an instance of {@link FluoQuery}. Private because applications
-     * must use {@link Builder} instead.
-     *
-     * @param constructMetadata - The root node of a query that is updated in Fluo. (not null)
-     * @param periodicQueryMetadata - The periodic query node that is updated in Fluo.
-     * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as
-     *   it is represented within the Fluo app. (not null)
-     * @param filterMetadata A map from Node ID to Filter metadata as it is represented
-     *   within the Fluo app. (not null)
-     * @param joinMetadata - A map from Node ID to Join metadata as it is represented
-     *   within the Fluo app. (not null)
-     * @param aggregationMetadata - A map from Node ID to Aggregation metadata as it is
-     *   represented within the Fluo app. (not null)
-     */
-    private FluoQuery(
-            final ConstructQueryMetadata constructMetadata,
-            final Optional<PeriodicQueryMetadata> periodicQueryMetadata,
-            final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
-            final ImmutableMap<String, FilterMetadata> filterMetadata,
-            final ImmutableMap<String, JoinMetadata> joinMetadata,
-            final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
-        this.constructMetadata = Optional.of(requireNonNull(constructMetadata));
-        this.queryMetadata = Optional.absent();
-        this.periodicQueryMetadata = periodicQueryMetadata;
-        this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
-        this.filterMetadata = requireNonNull(filterMetadata);
-        this.joinMetadata = requireNonNull(joinMetadata);
-        this.aggregationMetadata = aggregationMetadata;
-        this.type = QueryType.Construct;
+        if(constructMetadata.isPresent()) {
+            this.type = QueryType.Construct;
+        } else {
+            this.type = QueryType.Projection;
+        }
     }
     
     /**
@@ -126,24 +105,86 @@ public class FluoQuery {
     public QueryType getQueryType() {
         return type;
     }
+    
+    /**
+     * @return the unique id of this query
+     */
+    public String getQueryId() {
+        return queryId;
+    }
 
     /**
      * @return Metadata about the root node of a query that is updated within the Fluo app.
      */
-    public Optional<QueryMetadata> getQueryMetadata() {
+    public QueryMetadata getQueryMetadata() {
         return queryMetadata;
     }
     
+    /**
+     * @param nodeId - node id of the query metadata
+     * @return Optional containing the queryMetadata if it matches the specified nodeId
+     */
+    public Optional<QueryMetadata> getQueryMetadata(String nodeId) {
+        if(queryMetadata.getNodeId().equals(nodeId)) {
+            return Optional.of(queryMetadata);
+        } else {
+            return Optional.absent();
+        }
+    }
+    
+    /**
+     * @return construct query metadata for generating subgraphs
+     */
     public Optional<ConstructQueryMetadata> getConstructQueryMetadata() {
         return constructMetadata;
     }
     
     /**
+     * @param nodeId - node id of the ConstructMetadata
+     * @return Optional containing the ConstructMetadata if it is present and has the given nodeId
+     */
+    public Optional<ConstructQueryMetadata> getConstructQueryMetadata(String nodeId) {
+        if(constructMetadata.isPresent() && constructMetadata.get().getNodeId().equals(nodeId)) {
+            return constructMetadata;
+        } else {
+            return Optional.absent();
+        }
+    }
+    
+    /**
+     * @param nodeId - id of the Projection metadata you want (not null)
+     * @return projection metadata corresponding to give nodeId
+     */
+    public Optional<ProjectionMetadata> getProjectionMetadata(String nodeId) {
+        return Optional.fromNullable(projectionMetadata.get(nodeId));
+    }
+    
+    /**
+     * @return All of the projection metadata that is stored for the query
+     */
+    public Collection<ProjectionMetadata> getProjectionMetadata() {
+        return projectionMetadata.values();
+    }
+    
+    /**
      * @return All of the Periodic Query metadata that is stored for the query.
      */
     public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata() {
         return periodicQueryMetadata;
     }
+    
+    /**
+     * @param nodeId - id of the PeriodicQueryMetadata
+     * @return Optional containing the PeriodicQueryMetadata if it is present and has the given nodeId
+     */
+    public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata(String nodeId) {
+        
+        if(periodicQueryMetadata.isPresent() && periodicQueryMetadata.get().getNodeId().equals(nodeId)) {
+            return periodicQueryMetadata;
+        } else {
+            return Optional.absent();
+        }
+    }
 
     /**
      * Get a Statement Pattern node's metadata.
@@ -254,8 +295,11 @@ public class FluoQuery {
     public String toString() {
         final StringBuilder builder = new StringBuilder();
 
-        if(queryMetadata.isPresent()) {
-            builder.append( queryMetadata.get().toString() );
+        builder.append(queryMetadata.toString());
+        builder.append("\n");
+        
+        for(final ProjectionMetadata metadata : projectionMetadata.values()) {
+            builder.append(metadata);
             builder.append("\n");
         }
         
@@ -305,9 +349,10 @@ public class FluoQuery {
     @DefaultAnnotation(NonNull.class)
     public static final class Builder {
 
-        private QueryMetadata.Builder queryBuilder = null;
-        private ConstructQueryMetadata.Builder constructBuilder = null;
-        private PeriodicQueryMetadata.Builder periodicQueryBuilder = null;
+        private QueryMetadata.Builder queryBuilder;
+        private ConstructQueryMetadata.Builder constructBuilder;
+        private PeriodicQueryMetadata.Builder periodicQueryBuilder;
+        private final Map<String, ProjectionMetadata.Builder> projectionBuilders = new HashMap<>();
         private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>();
         private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>();
         private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>();
@@ -319,23 +364,55 @@ public class FluoQuery {
          * @param queryBuilder - The builder representing the query's results.
          * @return This builder so that method invocation may be chained.
          */
-        public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryBuilder) {
-            this.queryBuilder = queryBuilder;
+        public Builder setQueryMetadata(final QueryMetadata.Builder queryBuilder) {
+            this.queryBuilder = requireNonNull(queryBuilder);
             return this;
         }
 
         /**
          * @return The Query metadata builder if one has been set.
          */
-        public Optional<QueryMetadata.Builder> getQueryBuilder() {
-            return Optional.fromNullable( queryBuilder );
+        public QueryMetadata.Builder getQueryBuilder() {
+            return queryBuilder;
+        }
+        
+        /**
+         * @param nodeId - id of the QueryMetadata.Builder
+         * @return Optional containing the QueryMetadata.Builder if it has the specified nodeId
+         */
+        public Optional<QueryMetadata.Builder> getQueryBuilder(String nodeId) {
+            if(queryBuilder.getNodeId().equals(nodeId)) {
+                return Optional.of(queryBuilder);
+            } else {
+                return Optional.absent();
+            }
+            
+        }
+        
+        /**
+         * Sets the {@link ProjectionMetadata.Builder} that is used by this builder.
+         *
+         * @param projectionBuilder - The builder representing this query's projection
+         * @return This builder so that method invocation may be chained.
+         */
+        public Builder addProjectionBuilder(@Nullable final ProjectionMetadata.Builder projectionBuilder) {
+            requireNonNull(projectionBuilder);
+            projectionBuilders.put(projectionBuilder.getNodeId(), projectionBuilder);
+            return this;
+        }
+
+        /**
+         * @return The ProjectionMetadata builder if one has been set.
+         */
+        public Optional<ProjectionMetadata.Builder> getProjectionBuilder(String nodeId) {
+            requireNonNull(nodeId);
+            return Optional.fromNullable( projectionBuilders.get(nodeId) );
         }
         
         /**
          * Sets the {@link ConstructQueryMetadata.Builder} that is used by this builder.
          *
-         * @param constructBuilder
-         *            - The builder representing the query's results.
+         * @param constructBuilder - The builder representing the query's results.
          * @return This builder so that method invocation may be chained.
          */
         public Builder setConstructQueryMetadata(@Nullable final ConstructQueryMetadata.Builder constructBuilder) {
@@ -344,6 +421,18 @@ public class FluoQuery {
         }
 
         /**
+         * @param id of the ConstructQueryMetadata.Builder
+         * @return Optional containing the ConstructQueryMetadata.Builder if it has been set and has the given nodeId.
+         */
+        public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder(String nodeId) {
+            if(constructBuilder != null && constructBuilder.getNodeId().equals(nodeId)) {
+                return Optional.of(constructBuilder);
+            } else {
+                return Optional.absent();
+            }
+        }
+        
+        /**
          * @return The Construct Query metadata builder if one has been set.
          */
         public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder() {
@@ -442,8 +531,6 @@ public class FluoQuery {
             this.aggregationBuilders.put(aggregationBuilder.getNodeId(), aggregationBuilder);
             return this;
         }
-
-        
         
         /**
          * Adds a new {@link PeriodicQueryMetadata.Builder} to this builder.
@@ -457,7 +544,6 @@ public class FluoQuery {
             return this;
         }
 
-        
         /**
          * Get a PeriodicQuery builder from this builder.
          *
@@ -467,24 +553,35 @@ public class FluoQuery {
             return Optional.fromNullable( periodicQueryBuilder);
         }
         
-
         /**
-         * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder.
+         * @param - id of the PeriodicQueryMetadata.Builder
+         * @return - Optional containing the PeriodicQueryMetadata.Builder if one has been set and it has the given nodeId
          */
-        public FluoQuery build() {
-            checkArgument((queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null));
+        public Optional<PeriodicQueryMetadata.Builder> getPeriodicQueryBuilder(String nodeId) {
             
-            Optional<QueryMetadata.Builder> optionalQueryBuilder = getQueryBuilder();
-            QueryMetadata queryMetadata = null;
-            if(optionalQueryBuilder.isPresent()) {
-                queryMetadata = optionalQueryBuilder.get().build();
+            if(periodicQueryBuilder != null && periodicQueryBuilder.getNodeId().equals(nodeId)) {
+                return Optional.of(periodicQueryBuilder);
+            } else {
+                return Optional.absent();
             }
+        }
+        
+        /**
+         * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder.
+         */
+        public FluoQuery build() {
+            checkArgument((projectionBuilders.size() > 0 || constructBuilder != null));
             
             Optional<PeriodicQueryMetadata.Builder> optionalPeriodicQueryBuilder = getPeriodicQueryBuilder();
             PeriodicQueryMetadata periodicQueryMetadata = null;
             if(optionalPeriodicQueryBuilder.isPresent()) {
                 periodicQueryMetadata = optionalPeriodicQueryBuilder.get().build();
             }
+            
+            final ImmutableMap.Builder<String, ProjectionMetadata> projectionMetadata = ImmutableMap.builder();
+            for(final Entry<String, ProjectionMetadata.Builder> entry : projectionBuilders.entrySet()) {
+                projectionMetadata.put(entry.getKey(), entry.getValue().build());
+            }
 
             final ImmutableMap.Builder<String, StatementPatternMetadata> spMetadata = ImmutableMap.builder();
             for(final Entry<String, StatementPatternMetadata.Builder> entry : spBuilders.entrySet()) {
@@ -506,12 +603,13 @@ public class FluoQuery {
                 aggregateMetadata.put(entry.getKey(), entry.getValue().build());
             }
 
-            if(queryBuilder != null) {
-                return new FluoQuery(queryBuilder.build(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
-            }
-            //constructBuilder non-null in this case, but no need to check
-            else {
-                return new FluoQuery(constructBuilder.build(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+            if(constructBuilder != null) {
+                if(periodicQueryMetadata != null) {
+                    throw new IllegalArgumentException("Queries containing sliding window filters and construct query patterns are not supported.");
+                }
+                return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.of(constructBuilder.build()), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+            } else {
+                return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.absent(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
             }
             
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index ed18d49..8cd25d0 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -42,6 +42,20 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  *     <tr> <td>Node ID</td> <td>queryMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr>
  *     <tr> <td>Node ID</td> <td>queryMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr>
  *     <tr> <td>Node ID</td> <td>queryMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>queryMetadata:queryType</td> <td>The {@link QueryType} of this query.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>queryMetadata:exportStrategies</td> <td>Strategies for exporting results from Rya Fluo app</td> </tr>
+ *     <tr> <td>Node ID + DELIM + Binding Set String</td> <td>queryMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr>
+ *   </table>
+ * </p>
+ * <p>
+ *   <b>Projection Metadata</b>
+ *   <table border="1" style="width:100%">
+ *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
+ *     <tr> <td>Node ID</td> <td>projectionMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>*     
+ *     <tr> <td>Node ID</td> <td>projectionMetadata:variableOrder</td> <td>The Variable Order that Binding values are written in in the Row to identify solutions.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>projectionMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>projectionMetadata:parentNodeId</td> <td>The Node ID of the parent of this node.</td> </tr>
  *     <tr> <td>Node ID + DELIM + Binding Set String</td> <td>queryMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr>
  *   </table>
  * </p>
@@ -50,11 +64,11 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  *   <table border="1" style="width:100%">
  *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
  *     <tr> <td>Node ID</td> <td>constructMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr>
- *     <tr> <td>Node ID</td> <td>constructMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr>
  *     <tr> <td>Node ID</td> <td>constructMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr>
  *     <tr> <td>Node ID</td> <td>constructMetadata:graph</td> <td>The construct graph used to project BindingSets to statements.</td> </tr>
  *     <tr> <td>Node ID</td> <td>constructMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
- *     <tr> <td>Node ID</td> <td>constructMetadata:statements</td> <td>The RDF statements produced by this construct query node.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:parentNodeId</td> <td>The Node ID of the parent that this node feeds.</td> </tr>
+ *     <tr> <td>Node ID + DELIM + Binding Set String</td> <td>constructMetadata:statements</td> <td>The RDF statements produced by this construct query node.</td> </tr>
  *   </table>
  * </p>
  * <p>
@@ -131,6 +145,7 @@ public class FluoQueryColumns {
     public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata";
     public static final String AGGREGATION_METADATA_CF = "aggregationMetadata";
     public static final String CONSTRUCT_METADATA_CF = "constructMetadata";
+    public static final String PROJECTION_METADATA_CF = "projectionMetadata";
     public static final String PERIODIC_QUERY_METADATA_CF = "periodicQueryMetadata";
 
     /**
@@ -178,14 +193,24 @@ public class FluoQueryColumns {
     public static final Column QUERY_SPARQL = new Column(QUERY_METADATA_CF, "sparql");
     public static final Column QUERY_CHILD_NODE_ID = new Column(QUERY_METADATA_CF, "childNodeId");
     public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet");
+    public static final Column QUERY_EXPORT_STRATEGIES = new Column(QUERY_METADATA_CF, "exportStrategies");
+    public static final Column QUERY_TYPE = new Column(QUERY_METADATA_CF, "queryType");
+    
+    // Query Metadata columns.
+    public static final Column PROJECTION_NODE_ID = new Column(PROJECTION_METADATA_CF, "nodeId");
+    public static final Column PROJECTION_PROJECTED_VARS = new Column(PROJECTION_METADATA_CF, "projectedVars");
+    public static final Column PROJECTION_VARIABLE_ORDER = new Column(PROJECTION_METADATA_CF, "variableOrder");
+    public static final Column PROJECTION_CHILD_NODE_ID = new Column(PROJECTION_METADATA_CF, "childNodeId");
+    public static final Column PROJECTION_PARENT_NODE_ID = new Column(PROJECTION_METADATA_CF, "parentNodeId");
+    public static final Column PROJECTION_BINDING_SET = new Column(PROJECTION_METADATA_CF, "bindingSet");
 
  // Construct Query Metadata columns.
     public static final Column CONSTRUCT_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "nodeId");
     public static final Column CONSTRUCT_VARIABLE_ORDER = new Column(CONSTRUCT_METADATA_CF, "variableOrder");
     public static final Column CONSTRUCT_GRAPH = new Column(CONSTRUCT_METADATA_CF, "graph");
     public static final Column CONSTRUCT_CHILD_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "childNodeId");
+    public static final Column CONSTRUCT_PARENT_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "parentNodeId");
     public static final Column CONSTRUCT_STATEMENTS = new Column(CONSTRUCT_METADATA_CF, "statements");
-    public static final Column CONSTRUCT_SPARQL = new Column(CONSTRUCT_METADATA_CF, "sparql");
 
     // Filter Metadata columns.
     public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, "nodeId");
@@ -256,8 +281,20 @@ public class FluoQueryColumns {
                 Arrays.asList(QUERY_NODE_ID,
                         QUERY_VARIABLE_ORDER,
                         QUERY_SPARQL,
+                        QUERY_TYPE,
+                        QUERY_EXPORT_STRATEGIES,
                         QUERY_CHILD_NODE_ID)),
         
+        /**
+         * The columns a {@link ProjectionMetadata} object's fields are stored within.
+         */
+        PROJECTION_COLUMNS(
+                Arrays.asList(PROJECTION_NODE_ID,
+                        PROJECTION_PROJECTED_VARS,
+                        PROJECTION_VARIABLE_ORDER,
+                        PROJECTION_PARENT_NODE_ID,
+                        PROJECTION_CHILD_NODE_ID)),
+        
         
         /**
          * The columns a {@link PeriodicBinMetadata} object's fields are stored within.
@@ -280,7 +317,7 @@ public class FluoQueryColumns {
                         CONSTRUCT_VARIABLE_ORDER,
                         CONSTRUCT_GRAPH,
                         CONSTRUCT_CHILD_NODE_ID,
-                        CONSTRUCT_SPARQL,
+                        CONSTRUCT_PARENT_NODE_ID,
                         CONSTRUCT_STATEMENTS)),
 
         

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 8675b80..5ba7383 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -25,7 +25,9 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.fluo.api.client.SnapshotBase;
@@ -34,6 +36,9 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
@@ -42,7 +47,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -64,10 +68,14 @@ public class FluoQueryMetadataDAO {
         requireNonNull(tx);
         requireNonNull(metadata);
 
+        Joiner joiner = Joiner.on(IncrementalUpdateConstants.VAR_DELIM);
+        
         final String rowId = metadata.getNodeId();
         tx.set(rowId, FluoQueryColumns.QUERY_NODE_ID, rowId);
         tx.set(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, metadata.getVariableOrder().toString());
         tx.set(rowId, FluoQueryColumns.QUERY_SPARQL, metadata.getSparql() );
+        tx.set(rowId, FluoQueryColumns.QUERY_EXPORT_STRATEGIES, joiner.join(metadata.getExportStrategies()));
+        tx.set(rowId, FluoQueryColumns.QUERY_TYPE, metadata.getQueryType().toString());
         tx.set(rowId, FluoQueryColumns.QUERY_CHILD_NODE_ID, metadata.getChildNodeId() );
     }
 
@@ -91,6 +99,8 @@ public class FluoQueryMetadataDAO {
         final Map<Column, String> values = sx.gets(rowId,
                 FluoQueryColumns.QUERY_VARIABLE_ORDER,
                 FluoQueryColumns.QUERY_SPARQL,
+                FluoQueryColumns.QUERY_TYPE,
+                FluoQueryColumns.QUERY_EXPORT_STRATEGIES,
                 FluoQueryColumns.QUERY_CHILD_NODE_ID);
 
         // Return an object holding them.
@@ -99,13 +109,81 @@ public class FluoQueryMetadataDAO {
 
         final String sparql = values.get(FluoQueryColumns.QUERY_SPARQL);
         final String childNodeId = values.get(FluoQueryColumns.QUERY_CHILD_NODE_ID);
+        final String queryType = values.get(FluoQueryColumns.QUERY_TYPE);
+        final String[] exportStrategies = values.get(FluoQueryColumns.QUERY_EXPORT_STRATEGIES).split(IncrementalUpdateConstants.VAR_DELIM);
+        
+        Set<ExportStrategy> strategies = new HashSet<>();
+        for(String strategy: exportStrategies) {
+            strategies.add(ExportStrategy.valueOf(strategy));
+        }
 
         return QueryMetadata.builder(nodeId)
-                .setVariableOrder( varOrder )
+                .setVarOrder( varOrder )
                 .setSparql( sparql )
+                .setExportStrategies(strategies)
+                .setQueryType(QueryType.valueOf(queryType))
                 .setChildNodeId( childNodeId );
     }
+    
+    
+    /**
+     * Write an instance of {@link ProjectionMetadata} to the Fluo table.
+     *
+     * @param tx - The transaction that will be used to commit the metadata. (not null)
+     * @param metadata - The Query node metadata that will be written to the table. (not null)
+     */
+    public void write(final TransactionBase tx, final ProjectionMetadata metadata) {
+        requireNonNull(tx);
+        requireNonNull(metadata);
+
+        final String rowId = metadata.getNodeId();
+        tx.set(rowId, FluoQueryColumns.PROJECTION_NODE_ID, rowId);
+        tx.set(rowId, FluoQueryColumns.PROJECTION_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+        tx.set(rowId, FluoQueryColumns.PROJECTION_PROJECTED_VARS, metadata.getProjectedVars().toString());
+        tx.set(rowId, FluoQueryColumns.PROJECTION_PARENT_NODE_ID, metadata.getParentNodeId());
+        tx.set(rowId, FluoQueryColumns.PROJECTION_CHILD_NODE_ID, metadata.getChildNodeId() );
+    }
+
+    /**
+     * Read an instance of {@link ProjectionMetadata} from the Fluo table.
+     *
+     * @param sx - The snapshot that will be used to read the metadata . (not null)
+     * @param nodeId - The nodeId of the Projection node that will be read. (not null)
+     * @return The {@link ProjectionMetadata} that was read from the table.
+     */
+    public ProjectionMetadata readProjectionMetadata(final SnapshotBase sx, final String nodeId) {
+        return readProjectionMetadataBuilder(sx, nodeId).build();
+    }
+
+    private ProjectionMetadata.Builder readProjectionMetadataBuilder(final SnapshotBase sx, final String nodeId) {
+        requireNonNull(sx);
+        requireNonNull(nodeId);
+
+        // Fetch the values from the Fluo table.
+        final String rowId = nodeId;
+        final Map<Column, String> values = sx.gets(rowId,
+                FluoQueryColumns.PROJECTION_VARIABLE_ORDER,
+                FluoQueryColumns.PROJECTION_PROJECTED_VARS,
+                FluoQueryColumns.PROJECTION_PARENT_NODE_ID,
+                FluoQueryColumns.PROJECTION_CHILD_NODE_ID);
 
+        // Return an object holding them.
+        final String varOrderString = values.get(FluoQueryColumns.PROJECTION_VARIABLE_ORDER);
+        final String projectedVarString = values.get(FluoQueryColumns.PROJECTION_PROJECTED_VARS);
+        final VariableOrder varOrder = new VariableOrder(varOrderString);
+        final VariableOrder projectedVars = new VariableOrder(projectedVarString);
+        final String childNodeId = values.get(FluoQueryColumns.PROJECTION_CHILD_NODE_ID);
+        final String parentNodeId = values.get(FluoQueryColumns.PROJECTION_PARENT_NODE_ID);
+
+        
+        return ProjectionMetadata.builder(nodeId)
+                .setVarOrder( varOrder )
+                .setProjectedVars(projectedVars)
+                .setParentNodeId(parentNodeId)
+                .setChildNodeId( childNodeId );
+    }
+    
+    
     /**
      * Write an instance of {@link ConstructQueryMetadata} to the Fluo table.
      *
@@ -120,7 +198,7 @@ public class FluoQueryMetadataDAO {
         tx.set(rowId, FluoQueryColumns.CONSTRUCT_NODE_ID, rowId);
         tx.set(rowId, FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER, metadata.getVariableOrder().toString());
         tx.set(rowId, FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, metadata.getChildNodeId() );
-        tx.set(rowId, FluoQueryColumns.CONSTRUCT_SPARQL, metadata.getSparql());
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID, metadata.getParentNodeId() );
         tx.set(rowId, FluoQueryColumns.CONSTRUCT_GRAPH, ConstructGraphSerializer.toConstructString(metadata.getConstructGraph()));
     }
 
@@ -143,18 +221,22 @@ public class FluoQueryMetadataDAO {
         final String rowId = nodeId;
         final Map<Column, String> values = sx.gets(rowId, 
                 FluoQueryColumns.CONSTRUCT_GRAPH,
-                FluoQueryColumns.CONSTRUCT_SPARQL,
-                FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID);
+                FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, 
+                FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID,
+                FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER);
 
         final String graphString = values.get(FluoQueryColumns.CONSTRUCT_GRAPH);
         final ConstructGraph graph = ConstructGraphSerializer.toConstructGraph(graphString);
         final String childNodeId = values.get(FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID);
-        final String sparql = values.get(FluoQueryColumns.CONSTRUCT_SPARQL);
+        final String parentNodeId = values.get(FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID);
+        final String varOrderString = values.get(FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER);
+        
 
         return ConstructQueryMetadata.builder()
                 .setNodeId(nodeId)
+                .setParentNodeId(parentNodeId)
                 .setConstructGraph(graph)
-                .setSparql(sparql)
+                .setVarOrder(new VariableOrder(varOrderString))
                 .setChildNodeId(childNodeId);
     }
     
@@ -342,7 +424,7 @@ public class FluoQueryMetadataDAO {
         final String rightChildNodeId = values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID);
 
         return JoinMetadata.builder(nodeId)
-                .setVariableOrder(varOrder)
+                .setVarOrder(varOrder)
                 .setJoinType(joinType)
                 .setParentNodeId(parentNodeId)
                 .setLeftChildNodeId(leftChildNodeId)
@@ -477,7 +559,7 @@ public class FluoQueryMetadataDAO {
         }
 
         final AggregationMetadata.Builder builder = AggregationMetadata.builder(nodeId)
-                .setVariableOrder(varOrder)
+                .setVarOrder(varOrder)
                 .setParentNodeId(parentNodeId)
                 .setChildNodeId(childNodeId)
                 .setGroupByVariableOrder(groupByVars);
@@ -498,27 +580,28 @@ public class FluoQueryMetadataDAO {
     public void write(final TransactionBase tx, final FluoQuery query) {
         requireNonNull(tx);
         requireNonNull(query);
+        
+        QueryMetadata queryMetadata = query.getQueryMetadata();
+        final String sparql = queryMetadata.getSparql();
+        final String queryId = queryMetadata.getNodeId();
+        final String pcjId = queryMetadata.getExportId();
+        
+        // The results of the query are eventually exported to an instance
+        // of Rya, so store the Rya ID for the PCJ.
+        tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
+        tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
+        tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
+        write(tx, queryMetadata);
 
         // Write the rest of the metadata objects.
-        switch (query.getQueryType()) {
-        case Construct:
+        
+        if (query.getQueryType() == QueryType.Construct) {
             ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get();
-            // Store the Query ID so that it may be looked up from the original
-            // SPARQL string.
-            final String constructSparql = constructMetadata.getSparql();
-            final String constructQueryId = constructMetadata.getNodeId();
-            tx.set(Bytes.of(constructSparql), FluoQueryColumns.QUERY_ID, Bytes.of(constructQueryId));
             write(tx, constructMetadata);
-            break;
-        case Projection:
-            QueryMetadata queryMetadata = query.getQueryMetadata().get();
-            // Store the Query ID so that it may be looked up from the original
-            // SPARQL string.
-            final String sparql = queryMetadata.getSparql();
-            final String queryId = queryMetadata.getNodeId();
-            tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
-            write(tx, queryMetadata);
-            break;
+        }
+        
+        for(final ProjectionMetadata projection : query.getProjectionMetadata()) {
+            write(tx, projection);
         }
         
         Optional<PeriodicQueryMetadata> periodicMetadata = query.getPeriodicQueryMetadata();
@@ -569,16 +652,23 @@ public class FluoQueryMetadataDAO {
         case QUERY:
             // Add this node's metadata.
             final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId);
-            Preconditions.checkArgument(!builder.getQueryBuilder().isPresent());
             builder.setQueryMetadata(queryBuilder);
 
             // Add it's child's metadata.
             addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId());
             break;
-
+            
+        case PROJECTION:
+            //Add this node's metadata
+            final ProjectionMetadata.Builder projectionBuilder = readProjectionMetadataBuilder(sx, childNodeId);
+            builder.addProjectionBuilder(projectionBuilder);
+            
+            //Add it's child's metadata
+            addChildMetadata(sx, builder, projectionBuilder.build().getChildNodeId());
+            break; 
+            
         case CONSTRUCT:
             final ConstructQueryMetadata.Builder constructBuilder = readConstructQueryMetadataBuilder(sx, childNodeId);
-            Preconditions.checkArgument(!builder.getQueryBuilder().isPresent());
             builder.setConstructQueryMetadata(constructBuilder);
             
             // Add it's child's metadata.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
index 7bad9a7..aa79daf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
@@ -163,7 +163,7 @@ public class JoinMetadata extends CommonNodeMetadata {
      * Builds instances of {@link JoinMetadata}.
      */
     @DefaultAnnotation(NonNull.class)
-    public static final class Builder {
+    public static final class Builder implements CommonNodeMetadata.Builder {
 
         private final String nodeId;
         private VariableOrder varOrder;
@@ -194,11 +194,16 @@ public class JoinMetadata extends CommonNodeMetadata {
          * @param varOrder - The variable order of the binding sets that are emitted by this node.
          * @return This builder so that method invocation could be chained.
          */
-        public Builder setVariableOrder(@Nullable final VariableOrder varOrder) {
+        public Builder setVarOrder(@Nullable final VariableOrder varOrder) {
             this.varOrder = varOrder;
             return this;
         }
 
+        @Override
+        public VariableOrder getVariableOrder() {
+            return varOrder;
+        }
+        
         /**
          * Sets the node id of this node's parent.
          *
@@ -242,6 +247,14 @@ public class JoinMetadata extends CommonNodeMetadata {
             this.rightChildNodeId = rightChildNodeId;
             return this;
         }
+        
+        public String getLeftChildNodeId() {
+            return leftChildNodeId;
+        }
+        
+        public String getRightChildNodeId() {
+            return rightChildNodeId;
+        }
 
         /**
          * @return An instance of {@link JoinMetadata} built using this builder's values.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
index 33253f2..ae4b10e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
@@ -166,7 +166,7 @@ public class PeriodicQueryMetadata extends CommonNodeMetadata {
     /**
      * Builder for chaining method calls to construct an instance of PeriodicQueryMetadata.
      */
-    public static class Builder {
+    public static class Builder implements CommonNodeMetadata.Builder {
 
         private String nodeId;
         private VariableOrder varOrder;
@@ -200,11 +200,12 @@ public class PeriodicQueryMetadata extends CommonNodeMetadata {
             return this;
         }
         
+        
         /**
          * Returns {@link VariableOrder} 
          * @return VariableOrder that indicates order that results are written in 
          */
-        public VariableOrder getVarOrder() {
+        public VariableOrder getVariableOrder() {
             return varOrder;
         }
         
@@ -235,6 +236,10 @@ public class PeriodicQueryMetadata extends CommonNodeMetadata {
             return this;
         }
         
+        public String getChildNodeId() {
+            return childNodeId;
+        }
+        
         /**
          * Sets window size for periodic query
          * @param windowSize