You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/18 18:51:28 UTC

[3/5] incubator-rya git commit: RYA-282-Nested-Query. Closes #192.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java
new file mode 100644
index 0000000..5a337c7
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
+/**
+ * Metadata that is specific to a Projection.
+ */
+@Immutable
+@DefaultAnnotation(NonNull.class)
+public class ProjectionMetadata extends CommonNodeMetadata {
+
+    private final String childNodeId;
+    private final String parentNodeId;
+    private final VariableOrder projectedVars;
+
+    /**
+     * Constructs an instance of {@link ProjectionMetadata}.
+     *
+     * @param nodeId - The ID the Fluo app uses to reference this node. (not null)
+     * @param varOrder - The order in which binding values are written in the row to identify this result. (not null)
+     * @param childNodeId - The node whose results are projected to the query's SELECT variables. (not null)
+     * @param parentNodeId - The parent node of this projection (not null)
+     * @param projectedVars - The variables that the results are projected onto (not null)
+     */
+    public ProjectionMetadata(
+            final String nodeId,
+            final VariableOrder varOrder,
+            final String childNodeId,
+            final String parentNodeId,
+            final VariableOrder projectedVars) {
+        super(nodeId, varOrder);
+        this.childNodeId = checkNotNull(childNodeId);
+        this.parentNodeId = checkNotNull(parentNodeId);
+        this.projectedVars = checkNotNull(projectedVars);
+    }
+
+    /**
+     * @return The node whose results are projected to the query's SELECT variables.l
+     */
+    public String getChildNodeId() {
+        return childNodeId;
+    }
+    
+    /**
+     * @return The parent node of this projection node
+     */
+    public String getParentNodeId() {
+        return parentNodeId;
+    }
+    
+    /**
+     * @return The variables that results are projected onto
+     */
+    public VariableOrder getProjectedVars() {
+        return projectedVars;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(
+                super.getNodeId(),
+                super.getVariableOrder(),
+                projectedVars,
+                childNodeId,
+                parentNodeId);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if(this == o) {
+            return true;
+        }
+
+        if(o instanceof ProjectionMetadata) {
+            if(super.equals(o)) {
+                final ProjectionMetadata projectionMetadata = (ProjectionMetadata)o;
+                return new EqualsBuilder()
+                        .append(childNodeId, projectionMetadata.childNodeId)
+                        .append(parentNodeId, projectionMetadata.parentNodeId)
+                        .append(projectedVars, projectionMetadata.projectedVars)
+                        .isEquals();
+            }
+            return false;
+        }
+
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder()
+                .append("ProjectionMetadata {\n")
+                .append("    Node ID: " + super.getNodeId() + "\n")
+                .append("    Projection Variables: " + projectedVars + "\n")
+                .append("    Variable Order: " + super.getVariableOrder() + "\n")
+                .append("    Child Node ID: " + childNodeId + "\n")
+                .append("    Parent Node ID: " + parentNodeId + "\n")
+                .append("}")
+                .toString();
+    }
+
+    /**
+     * Creates a new {@link Builder} for this class.
+     *
+     * @param nodeId - The ID the Fluo app uses to reference this node. (not null)
+     * @return A new {@link Builder} for this class.
+     */
+    public static Builder builder(final String nodeId) {
+        return new Builder(nodeId);
+    }
+
+    /**
+     * Builds instances of {@link ProjectionMetadata}.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static final class Builder implements CommonNodeMetadata.Builder {
+
+        private String nodeId;
+        private VariableOrder varOrder;
+        private String childNodeId;
+        private String parentNodeId;
+        private VariableOrder projectedVars;
+
+        /**
+         * Constructs an instance of {@link Builder}.
+         *
+         * @param nodeId - The ID the Fluo app uses to reference this node. (not null)
+         */
+        public Builder(final String nodeId) {
+            this.nodeId = checkNotNull(nodeId);
+        }
+        
+        public String getNodeId() {
+            return nodeId;
+        }
+        
+        /**
+         * Set the variable order of binding sets that are emitted by this node.
+         *
+         * @param varOrder - The order in which result values are written to the row to identify this result
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setVarOrder(@Nullable final VariableOrder varOrder) {
+            this.varOrder = varOrder;
+            return this;
+        }
+        
+        /**
+         * @return the variable order of binding sets that are emitted by this node
+         */
+        public VariableOrder getVariableOrder() {
+            return varOrder;
+        }
+
+        /**
+         * Set the node whose results are projected to the query's SELECT variables.
+         *
+         * @param childNodeId - The node whose results are projected to the query's SELECT variables.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setChildNodeId(@Nullable final String childNodeId) {
+            this.childNodeId = childNodeId;
+            return this;
+        }
+        
+        public String getChildNodeId() {
+            return childNodeId;
+        }
+        
+        /**
+         * Set the the parent node of this projection node.
+         *
+         * @param parentNodeId - The parent node of this projection node
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setParentNodeId(@Nullable final String parentNodeId) {
+            this.parentNodeId = parentNodeId;
+            return this;
+        }
+        
+        public String getParentNodeId() {
+            return parentNodeId;
+        }
+        
+        /**
+         * @param varOrder - Variables that results are projected onto
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setProjectedVars(VariableOrder projectedVars) {
+            this.projectedVars = projectedVars;
+            return this;
+        }
+        
+        /**
+         * @return The variables that results are projected onto
+         */
+        public VariableOrder getProjectionVars() {
+            return projectedVars;
+        }
+
+        /**
+         * @return An instance of {@link ProjectionMetadata} built using this builder's values.
+         */
+        public ProjectionMetadata build() {
+            return new ProjectionMetadata(nodeId, varOrder, childNodeId, parentNodeId, projectedVars);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java
new file mode 100644
index 0000000..b45c56c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/**
+ * Base visitor class for navigating a {@link FluoQuery.Builder}.
+ * The visit methods in this class provide the basic functionality
+ * for navigating between the Builders that make u the FluoQuery.Builder.
+ *
+ */
+public abstract class QueryBuilderVisitorBase {
+
+    private FluoQuery.Builder fluoBuilder;
+    
+    public QueryBuilderVisitorBase(FluoQuery.Builder fluoBuilder) {
+        this.fluoBuilder = Preconditions.checkNotNull(fluoBuilder);
+    }
+    
+    public void visit() {
+        this.visit(fluoBuilder.getQueryBuilder());
+    }
+    
+    /**
+     * Visits the {@link FluoQuery.Builder} starting at the Metadata bulder node with the given id
+     * @param nodeId - id of the node this visitor will start at
+     */
+    public void visit(String nodeId) {
+        visitNode(nodeId);
+    }
+    
+    public void visit(QueryMetadata.Builder queryBuilder) {
+        visitNode(queryBuilder.getChildNodeId());
+    }
+    
+    public void visit(ConstructQueryMetadata.Builder constructBuilder) {
+        visitNode(constructBuilder.getChildNodeId());
+    }
+    
+    public void visit(ProjectionMetadata.Builder projectionBuilder) {
+        visitNode(projectionBuilder.getChildNodeId());
+    }
+    
+    public void visit(PeriodicQueryMetadata.Builder periodicBuilder) {
+        visitNode(periodicBuilder.getChildNodeId());
+    }
+    
+    public void visit(FilterMetadata.Builder filterBuilder) {
+        visitNode(filterBuilder.getChildNodeId());
+    }
+    
+    public void visit(JoinMetadata.Builder joinBuilder) {
+        visitNode(joinBuilder.getLeftChildNodeId());
+        visitNode(joinBuilder.getRightChildNodeId());
+    }
+    
+    public void visit(AggregationMetadata.Builder aggregationBuilder) {
+        visitNode(aggregationBuilder.getChildNodeId());
+    }
+    
+    public void visit(StatementPatternMetadata.Builder statementPatternBuilder) {}
+    
+    public void visitNode(String nodeId) {
+        Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        try {
+            switch(type.get()) {
+            case AGGREGATION:
+                visit(fluoBuilder.getAggregateBuilder(nodeId).get());
+                break;
+            case CONSTRUCT:
+                visit(fluoBuilder.getConstructQueryBuilder(nodeId).get());
+                break;
+            case FILTER:
+                visit(fluoBuilder.getFilterBuilder(nodeId).get());
+                break;
+            case JOIN:
+                visit(fluoBuilder.getJoinBuilder(nodeId).get());
+                break;
+            case PERIODIC_QUERY:
+                visit(fluoBuilder.getPeriodicQueryBuilder(nodeId).get());
+                break;
+            case PROJECTION:
+                visit(fluoBuilder.getProjectionBuilder(nodeId).get());
+                break;
+            case QUERY:
+                visit(fluoBuilder.getQueryBuilder(nodeId).get());
+                break;
+            case STATEMENT_PATTERN:
+                visit(fluoBuilder.getStatementPatternBuilder(nodeId).get());
+                break;
+            default:
+                throw new RuntimeException();
+            } 
+        } catch(Exception e) {
+            throw new IllegalArgumentException("Invalid Fluo Query.");
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index d017724..fe130fb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -20,18 +20,24 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
+import java.util.Set;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 
 import com.google.common.base.Objects;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
 /**
- * Metadata that is specific to a Projection.
+ * Metadata for a query registered with Fluo.  This metadata is for the topmost node
+ * in the {@link FluoQuery}, and it includes information about how to export results
+ * for the query.
  */
 @Immutable
 @DefaultAnnotation(NonNull.class)
@@ -39,6 +45,9 @@ public class QueryMetadata extends CommonNodeMetadata {
 
     private final String sparql;
     private final String childNodeId;
+    private final Set<ExportStrategy> exportStrategy;
+    private final QueryType queryType;
+    private final String exportId;
 
     /**
      * Constructs an instance of {@link QueryMetadata}.
@@ -47,15 +56,29 @@ public class QueryMetadata extends CommonNodeMetadata {
      * @param varOrder - The variable order of binding sets that are emitted by this node. (not null)
      * @param sparql - The SPARQL query whose results are being updated by the Fluo app. (not null)
      * @param childNodeId - The node whose results are projected to the query's SELECT variables. (not null)
+     * @param exportStrategy - Set of export strategies used for emiting results from Rya-Fluo app
      */
     public QueryMetadata(
             final String nodeId,
             final VariableOrder varOrder,
             final String sparql,
-            final String childNodeId) {
+            final String childNodeId,
+            final Set<ExportStrategy> exportStrategy,
+            final QueryType queryType) {
         super(nodeId, varOrder);
         this.sparql = checkNotNull(sparql);
         this.childNodeId = checkNotNull(childNodeId);
+        this.exportStrategy = checkNotNull(exportStrategy);
+        this.queryType = checkNotNull(queryType);
+        String[] idSplit = nodeId.split("_");
+        if(idSplit.length != 2) {
+            throw new IllegalArgumentException("Invalid Query Node Id");
+        }
+        this.exportId = idSplit[1];
+    }
+    
+    public String getExportId() {
+        return exportId;
     }
 
     /**
@@ -71,14 +94,30 @@ public class QueryMetadata extends CommonNodeMetadata {
     public String getChildNodeId() {
         return childNodeId;
     }
-
+    
+    /**
+     * @return strategies used for exporting results from Rya-Fluo Application
+     */
+    public Set<ExportStrategy> getExportStrategies() {
+        return exportStrategy;
+    }
+    
+    /**
+     * @return the {@link QueryType} of this query
+     */
+    public QueryType getQueryType() {
+        return queryType;
+    }
+    
     @Override
     public int hashCode() {
         return Objects.hashCode(
                 super.getNodeId(),
                 super.getVariableOrder(),
                 sparql,
-                childNodeId);
+                childNodeId,
+                exportStrategy,
+                queryType);
     }
 
     @Override
@@ -93,6 +132,8 @@ public class QueryMetadata extends CommonNodeMetadata {
                 return new EqualsBuilder()
                         .append(sparql, queryMetadata.sparql)
                         .append(childNodeId, queryMetadata.childNodeId)
+                        .append(exportStrategy, queryMetadata.exportStrategy)
+                        .append(queryType, queryMetadata.queryType)
                         .isEquals();
             }
             return false;
@@ -109,6 +150,8 @@ public class QueryMetadata extends CommonNodeMetadata {
                 .append("    Variable Order: " + super.getVariableOrder() + "\n")
                 .append("    Child Node ID: " + childNodeId + "\n")
                 .append("    SPARQL: " + sparql + "\n")
+                .append("    Query Type: " + queryType + "\n")
+                .append("    Export Strategies: " + exportStrategy + "\n")
                 .append("}")
                 .toString();
     }
@@ -127,12 +170,14 @@ public class QueryMetadata extends CommonNodeMetadata {
      * Builds instances of {@link QueryMetadata}.
      */
     @DefaultAnnotation(NonNull.class)
-    public static final class Builder {
+    public static final class Builder implements CommonNodeMetadata.Builder {
 
         private String nodeId;
         private VariableOrder varOrder;
         private String sparql;
         private String childNodeId;
+        private Set<ExportStrategy> exportStrategies;
+        private QueryType queryType;
 
         /**
          * Constructs an instance of {@link Builder}.
@@ -154,7 +199,7 @@ public class QueryMetadata extends CommonNodeMetadata {
          * @param varOrder - The variable order of binding sets that are emitted by this node.
          * @return This builder so that method invocations may be chained.
          */
-        public Builder setVariableOrder(@Nullable final VariableOrder varOrder) {
+        public Builder setVarOrder(@Nullable final VariableOrder varOrder) {
             this.varOrder = varOrder;
             return this;
         }
@@ -188,6 +233,37 @@ public class QueryMetadata extends CommonNodeMetadata {
             return this;
         }
         
+        /**
+         * Sets export strategies used for emitting results form Rya Fluo app
+         * @param export - Set of export strategies
+         * @return This builder so that method invocations may be chained
+         */
+        public Builder setExportStrategies(Set<ExportStrategy> export) {
+            this.exportStrategies = export;
+            return this;
+        }
+        
+        /**
+         * Set query type for the given query
+         * @param queryType - {@link QueryType} of the given query
+         * @return This builder so that method invocations may be chained
+         */
+        public Builder setQueryType(QueryType queryType) {
+            this.queryType = queryType;
+            return this;
+        }
+        
+        /**
+         * @return QueryType for the given query
+         */
+        public QueryType getQueryType() {
+            return queryType;
+        }
+        
+        
+        /**
+         * @return id of the child node of this node
+         */
         public String getChildNodeId() {
             return childNodeId;
         }
@@ -196,7 +272,7 @@ public class QueryMetadata extends CommonNodeMetadata {
          * @return An instance of {@link QueryMetadata} build using this builder's values.
          */
         public QueryMetadata build() {
-            return new QueryMetadata(nodeId, varOrder, sparql, childNodeId);
+            return new QueryMetadata(nodeId, varOrder, sparql, childNodeId, exportStrategies, queryType);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java
new file mode 100644
index 0000000..ce9b02c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+public abstract class QueryMetadataVisitorBase {
+
+ private FluoQuery fluoQuery;
+    
+    public QueryMetadataVisitorBase(FluoQuery fluoQuery) {
+        this.fluoQuery = Preconditions.checkNotNull(fluoQuery);
+    }
+    
+    public void visit() {
+        visit(fluoQuery.getQueryMetadata());
+    }
+    
+    /**
+     * Visits the {@link FluoQuery} starting at the Metadata node with the given id
+     * @param nodeId - id of the node this visitor will start at
+     */
+    public void visit(String nodeId) {
+        visitNode(nodeId);
+    }
+    
+    public void visit(QueryMetadata queryMetadata) {
+        visitNode(queryMetadata.getChildNodeId());
+    }
+    
+    public void visit(ConstructQueryMetadata constructMetadata) {
+        visitNode(constructMetadata.getChildNodeId());
+    }
+    
+    public void visit(ProjectionMetadata projectionMetadata) {
+        visitNode(projectionMetadata.getChildNodeId());
+    }
+    
+    public void visit(PeriodicQueryMetadata periodicMetadata) {
+        visitNode(periodicMetadata.getChildNodeId());
+    }
+    
+    public void visit(FilterMetadata filterMetadata) {
+        visitNode(filterMetadata.getChildNodeId());
+    }
+    
+    public void visit(JoinMetadata joinMetadata) {
+        visitNode(joinMetadata.getLeftChildNodeId());
+        visitNode(joinMetadata.getRightChildNodeId());
+    }
+    
+    public void visit(AggregationMetadata aggregationMetadata) {
+        visitNode(aggregationMetadata.getChildNodeId());
+    }
+    
+    public void visit(StatementPatternMetadata statementPatternMetadata) {}
+    
+    public void visitNode(String nodeId) {
+        Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        try {
+            switch(type.get()) {
+            case AGGREGATION:
+                visit(fluoQuery.getAggregationMetadata(nodeId).get());
+                break;
+            case CONSTRUCT:
+                visit(fluoQuery.getConstructQueryMetadata(nodeId).get());
+                break;
+            case FILTER:
+                visit(fluoQuery.getFilterMetadata(nodeId).get());
+                break;
+            case JOIN:
+                visit(fluoQuery.getJoinMetadata(nodeId).get());
+                break;
+            case PERIODIC_QUERY:
+                visit(fluoQuery.getPeriodicQueryMetadata(nodeId).get());
+                break;
+            case PROJECTION:
+                visit(fluoQuery.getProjectionMetadata(nodeId).get());
+                break;
+            case QUERY:
+                visit(fluoQuery.getQueryMetadata(nodeId).get());
+                break;
+            case STATEMENT_PATTERN:
+                visit(fluoQuery.getStatementPatternMetadata(nodeId).get());
+                break;
+            default:
+                throw new RuntimeException();
+            } 
+        } catch(Exception e) {
+            throw new IllegalArgumentException("Invalid Fluo Query.");
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 8e348f2..6c03be1 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -25,10 +25,11 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CO
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -42,16 +43,22 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
 import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer.FilterParseException;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.VariableOrderUpdateVisitor.UpdateAction;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.openrdf.model.Value;
 import org.openrdf.model.impl.BNodeImpl;
+import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.algebra.AggregateOperator;
 import org.openrdf.query.algebra.BNodeGenerator;
 import org.openrdf.query.algebra.Extension;
@@ -75,6 +82,7 @@ import org.openrdf.query.algebra.ValueExpr;
 import org.openrdf.query.algebra.Var;
 import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
 import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -90,32 +98,86 @@ import net.jcip.annotations.Immutable;
  */
 public class SparqlFluoQueryBuilder {
 
+    private String sparql;
+    private TupleExpr te;
+    private String queryId;
+    private NodeIds nodeIds;
+    //Default behavior is to export to Kafka - subject to change when user can 
+    //specify their own export strategy
+    private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.Kafka));
+    
+    public SparqlFluoQueryBuilder setSparql(String sparql) {
+        this.sparql = Preconditions.checkNotNull(sparql);
+        return this;
+    }
+    
+    public SparqlFluoQueryBuilder setTupleExpr(TupleExpr te) {
+        this.te = Preconditions.checkNotNull(te);
+        return this;
+    }
+    
     /**
-     * Creates the {@link FluoQuery} metadata that is required by the Fluo
-     * application to process a SPARQL query.
-     *
-     * @param parsedQuery - The query metadata will be derived from. (not null)
-     * @param nodeIds - The NodeIds object is passed in so that other parts
-     *   of the application may look up which ID is associated with each
-     *   node of the query.
-     * @return A {@link FluoQuery} object loaded with metadata built from the
-     *   {@link ParsedQuery}.
+     * Sets the FluoQuery id as generated by {@link NodeType#generateNewFluoIdForType(NodeType)} or
+     * {@link NodeType#generateNewIdForType(NodeType, String)}, where NodeType is of type Query.
+     * @param queryId for the {@link FluoQuery}
+     * @return SparqlFluoQueryBuilder for chaining method calls
      */
-    public FluoQuery make(final ParsedQuery parsedQuery, final NodeIds nodeIds) {
-        checkNotNull(parsedQuery);
-
-        final String sparql = parsedQuery.getSourceString();
-        final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder();
-
-        final NewQueryVisitor visitor = new NewQueryVisitor(sparql, fluoQueryBuilder, nodeIds);
-        TupleExpr te = parsedQuery.getTupleExpr();
+    public SparqlFluoQueryBuilder setFluoQueryId(String queryId) {
+        this.queryId = Preconditions.checkNotNull(queryId);
+        return this;
+    }
+    
+    public SparqlFluoQueryBuilder setNodeIds(NodeIds nodeIds) {
+        this.nodeIds = Preconditions.checkNotNull(nodeIds);
+        return this;
+    }
+    
+    public SparqlFluoQueryBuilder setExportStrategies(Set<ExportStrategy> exportStrategies) {
+        this.exportStrategies = exportStrategies;
+        return this;
+    }
+    
+    public FluoQuery build() {
+        Preconditions.checkNotNull(sparql);
+        Preconditions.checkNotNull(queryId);
+        Preconditions.checkNotNull(exportStrategies);
+      
+        if(nodeIds == null) {
+            nodeIds = new NodeIds();
+        }
+        
+        if(te == null) {
+            SPARQLParser parser = new SPARQLParser();
+            ParsedQuery pq;
+            try {
+                pq = parser.parseQuery(sparql, null);
+            } catch (MalformedQueryException e) {
+               throw new RuntimeException(e);
+            }
+            te = pq.getTupleExpr();
+        }
+        
         PeriodicQueryUtil.placePeriodicQueryNode(te);
+        String childNodeId = nodeIds.getOrMakeId(te);
+        
+        final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder();
+        QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId);
+        //sets {@link QueryType} and VariableOrder
+        setVarOrderAndQueryType(queryBuilder, te);
+        queryBuilder.setSparql(sparql);
+        queryBuilder.setChildNodeId(childNodeId);
+        queryBuilder.setExportStrategies(exportStrategies);
+        fluoQueryBuilder.setQueryMetadata(queryBuilder);
+        
+        setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId);
+        
+        final NewQueryVisitor visitor = new NewQueryVisitor(fluoQueryBuilder, nodeIds);
         te.visit( visitor );
-
+        
         final FluoQuery fluoQuery = fluoQueryBuilder.build();
         return fluoQuery;
     }
-
+    
     /**
      * A data structure that creates and keeps track of Node IDs for the nodes
      * of a {@link ParsedQuery}. This structure should only be used while creating
@@ -187,7 +249,7 @@ public class SparqlFluoQueryBuilder {
             } else if (node instanceof Join || node instanceof LeftJoin) {
                 prefix = JOIN_PREFIX;
             } else if (node instanceof Projection) {
-                prefix = QUERY_PREFIX;
+                prefix = PROJECTION_PREFIX;
             } else if(node instanceof Extension) {
                 prefix = AGGREGATION_PREFIX;
             }  else if (node instanceof Reduced) {
@@ -214,7 +276,6 @@ public class SparqlFluoQueryBuilder {
 
         private final NodeIds nodeIds;
         private final FluoQuery.Builder fluoQueryBuilder;
-        private final String sparql;
 
         /**
          * Constructs an instance of {@link NewQueryVisitor}.
@@ -227,8 +288,7 @@ public class SparqlFluoQueryBuilder {
          *   of the application may look up which ID is associated with each
          *   node of the query.
          */
-        public NewQueryVisitor(final String sparql, final FluoQuery.Builder fluoQueryBuilder, final NodeIds nodeIds) {
-            this.sparql = checkNotNull(sparql);
+        public NewQueryVisitor(final FluoQuery.Builder fluoQueryBuilder, final NodeIds nodeIds) {
             this.fluoQueryBuilder = checkNotNull(fluoQueryBuilder);
             this.nodeIds = checkNotNull(nodeIds);
         }
@@ -256,6 +316,7 @@ public class SparqlFluoQueryBuilder {
                 } else {
                     groupByVariableOrder = new VariableOrder();
                 }
+                
 
                 // The aggregations that need to be performed are the Group Elements.
                 final List<AggregationElement> aggregations = new ArrayList<>();
@@ -289,15 +350,21 @@ public class SparqlFluoQueryBuilder {
 
                 aggregationBuilder.setChildNodeId(childNodeId);
                 aggregationBuilder.setGroupByVariableOrder(groupByVariableOrder);
+                
+                Set<String> aggregationVars = getVarsToDelete(groupByVariableOrder.getVariableOrders(), aggregationBuilder.getVariableOrder().getVariableOrders());
+                FluoQueryUtils.updateVarOrders(fluoQueryBuilder, UpdateAction.DeleteVariable, Lists.newArrayList(aggregationVars), aggregationId);
+                
                 for(final AggregationElement aggregation : aggregations) {
                     aggregationBuilder.addAggregation(aggregation);
                 }
+                
+                
 
                 // Update the child node's metadata.
                 final Set<String> childVars = getVars(child);
                 final VariableOrder childVarOrder = new VariableOrder(childVars);
 
-                setChildMetadata(childNodeId, childVarOrder, aggregationId);
+                setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, aggregationId);
             }
 
             // Walk to the next node.
@@ -369,11 +436,11 @@ public class SparqlFluoQueryBuilder {
 
             // Create or update the left child's variable order and parent node id.
             final VariableOrder leftVarOrder = varOrders.getLeftVarOrder();
-            setChildMetadata(leftChildNodeId, leftVarOrder, joinNodeId);
+            setChildMetadata(fluoQueryBuilder, leftChildNodeId, leftVarOrder, joinNodeId);
 
             // Create or update the right child's variable order and parent node id.
             final VariableOrder rightVarOrder = varOrders.getRightVarOrder();
-            setChildMetadata(rightChildNodeId, rightVarOrder, joinNodeId);
+            setChildMetadata(fluoQueryBuilder, rightChildNodeId, rightVarOrder, joinNodeId);
         }
 
         @Override
@@ -407,7 +474,7 @@ public class SparqlFluoQueryBuilder {
             // Update the child node's metadata.
             final Set<String> childVars = getVars((TupleExpr)child);
             final VariableOrder childVarOrder = new VariableOrder(childVars);
-            setChildMetadata(childNodeId, childVarOrder, filterId);
+            setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, filterId);
 
             // Walk to the next node.
             super.meet(node);
@@ -442,12 +509,12 @@ public class SparqlFluoQueryBuilder {
                 // Update the child node's metadata.
                 final Set<String> childVars = getVars((TupleExpr) child);
                 final VariableOrder childVarOrder = new VariableOrder(childVars);
-                setChildMetadata(childNodeId, childVarOrder, periodicId);
+                setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, periodicId);
 
                 // update variable order of this node and all ancestors to
                 // include BIN_ID binding as
                 // first variable in the ordering
-                PeriodicQueryUtil.updateVarOrdersToIncludeBin(fluoQueryBuilder, periodicId);
+                FluoQueryUtils.updateVarOrders(fluoQueryBuilder, UpdateAction.AddVariable, Arrays.asList(IncrementalUpdateConstants.PERIODIC_BIN_ID), periodicId);
                 // Walk to the next node.
                 node.getArg().visit(this);
             } 
@@ -458,13 +525,12 @@ public class SparqlFluoQueryBuilder {
         public void meet(final Projection node) {
             // Create a builder for this node populated with the metadata.
             final String queryId = nodeIds.getOrMakeId(node);
-            final VariableOrder queryVarOrder = new VariableOrder(node.getBindingNames());
-
-            final QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId);
-            fluoQueryBuilder.setQueryMetadata(queryBuilder);
 
-            queryBuilder.setSparql(sparql);
-            queryBuilder.setVariableOrder(queryVarOrder);
+            ProjectionMetadata.Builder projectionBuilder = fluoQueryBuilder.getProjectionBuilder(queryId).orNull();
+            if (projectionBuilder == null) {
+                projectionBuilder = ProjectionMetadata.builder(queryId);
+                fluoQueryBuilder.addProjectionBuilder(projectionBuilder);
+            }
 
             final QueryModelNode child = node.getArg();
             if(child == null) {
@@ -472,13 +538,14 @@ public class SparqlFluoQueryBuilder {
             }
 
             final String childNodeId = nodeIds.getOrMakeId(child);
-            queryBuilder.setChildNodeId(childNodeId);
+            projectionBuilder.setChildNodeId(childNodeId);
+            projectionBuilder.setProjectedVars(projectionBuilder.getVariableOrder());
 
             // Update the child node's metadata.
             final Set<String> childVars = getVars((TupleExpr)child);
             final VariableOrder childVarOrder = new VariableOrder(childVars);
 
-            setChildMetadata(childNodeId, childVarOrder, queryId);
+            setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, queryId);
 
             // Walk to the next node.
             super.meet(node);
@@ -489,10 +556,13 @@ public class SparqlFluoQueryBuilder {
             //create id, initialize ConstructQueryMetadata builder, register ConstructQueryMetadata 
             //builder with FluoQueryBuilder, and add metadata that we currently have
             final String constructId = nodeIds.getOrMakeId(node);
-            final ConstructQueryMetadata.Builder constructBuilder = ConstructQueryMetadata.builder();
-            constructBuilder.setNodeId(constructId);
-            fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
-            constructBuilder.setSparql(sparql);
+            
+            ConstructQueryMetadata.Builder constructBuilder = fluoQueryBuilder.getConstructQueryBuilder().orNull();
+            if(constructBuilder == null) {
+                constructBuilder = ConstructQueryMetadata.builder();
+                constructBuilder.setNodeId(constructId);
+                fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
+            }
             
             //get child node
             QueryModelNode child = node.getArg();
@@ -531,96 +601,12 @@ public class SparqlFluoQueryBuilder {
             // Update the child node's metadata.
             final Set<String> childVars = getVars((TupleExpr)child);
             final VariableOrder childVarOrder = new VariableOrder(childVars);
-            setChildMetadata(childNodeId, childVarOrder, constructId);
+            setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, constructId);
             
             //fast forward visitor to next node we care about
             child.visit(this);
         }
         
-
-        /**
-         * Update a query node's metadata to include it's binding set variable order
-         * and it's parent node id. This information is only known when handling
-         * the parent node.
-         *
-         * @param childNodeId - The node ID of the child node.
-         * @param childVarOrder - The variable order of the child node's binding sets.
-         * @param parentNodeId - The node ID that consumes the child's binding sets.
-         */
-        private void setChildMetadata(final String childNodeId, final VariableOrder childVarOrder, final String parentNodeId) {
-            checkNotNull(childNodeId);
-            checkNotNull(childVarOrder);
-            checkNotNull(parentNodeId);
-
-            final NodeType childType = NodeType.fromNodeId(childNodeId).get();
-            switch (childType) {
-            case STATEMENT_PATTERN:
-                StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
-                if (spBuilder == null) {
-                    spBuilder = StatementPatternMetadata.builder(childNodeId);
-                    fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
-                }
-
-                spBuilder.setVarOrder(childVarOrder);
-                spBuilder.setParentNodeId(parentNodeId);
-                break;
-
-            case JOIN:
-                JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
-                if (joinBuilder == null) {
-                    joinBuilder = JoinMetadata.builder(childNodeId);
-                    fluoQueryBuilder.addJoinMetadata(joinBuilder);
-                }
-
-                joinBuilder.setVariableOrder(childVarOrder);
-                joinBuilder.setParentNodeId(parentNodeId);
-                break;
-
-            case FILTER:
-                FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
-                if (filterBuilder == null) {
-                    filterBuilder = FilterMetadata.builder(childNodeId);
-                    fluoQueryBuilder.addFilterMetadata(filterBuilder);
-                }
-
-                filterBuilder.setVarOrder(childVarOrder);
-                filterBuilder.setParentNodeId(parentNodeId);
-                break;
-
-            case AGGREGATION:
-                AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
-                if (aggregationBuilder == null) {
-                    aggregationBuilder = AggregationMetadata.builder(childNodeId);
-                    fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
-                }
-
-                aggregationBuilder.setVariableOrder(childVarOrder);
-                aggregationBuilder.setParentNodeId(parentNodeId);
-                break;
-
-            case QUERY:
-                throw new IllegalArgumentException("A QUERY node cannot be the child of another node.");
-            
-            case CONSTRUCT:
-                throw new IllegalArgumentException("A CONSTRUCT node cannot be the child of another node.");
-            
-            case PERIODIC_QUERY:
-                PeriodicQueryMetadata.Builder periodicQueryBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull();
-                if (periodicQueryBuilder == null) {
-                    periodicQueryBuilder = PeriodicQueryMetadata.builder();
-                    periodicQueryBuilder.setNodeId(childNodeId);
-                    fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder);
-                }
-                periodicQueryBuilder.setVarOrder(childVarOrder);
-                periodicQueryBuilder.setParentNodeId(parentNodeId);
-                break;
-                
-            default:
-                throw new IllegalArgumentException("Unsupported NodeType: " + childType);
-
-            }
-        }
-        
         private ConstructGraph getConstructGraph(List<ProjectionElemList> projections, List<ExtensionElem> extensionElems) {
             Map<String, Value> valueMap = new HashMap<>();
             //create valueMap to associate source names with Values
@@ -654,6 +640,13 @@ public class SparqlFluoQueryBuilder {
             return new ConstructGraph(constructProj);
         }
         
+        private Set<String> getVarsToDelete(Collection<String> groupByVars, Collection<String> varOrderVars) {
+            Set<String> groupBySet = Sets.newHashSet(groupByVars);
+            Set<String> varOrderSet = Sets.newHashSet(varOrderVars);
+            
+            return Sets.difference(varOrderSet, groupBySet);
+        }
+        
         private void validateProjectionElemList(ProjectionElemList list) {
             List<ProjectionElem> elements = list.getElements();
             checkArgument(elements.size() == 3);
@@ -662,8 +655,6 @@ public class SparqlFluoQueryBuilder {
             checkArgument(elements.get(2).getTargetName().equals("object"));
         }
         
-        
-
         /**
          * Get the non-constant variables from a {@link TupleExpr}.
          *
@@ -764,4 +755,199 @@ public class SparqlFluoQueryBuilder {
             return shifted;
         }
     }
+    
+    private void setVarOrderAndQueryType(QueryMetadata.Builder builder, TupleExpr te) {
+        QueryMetadataLocator locator = new QueryMetadataLocator();
+        try {
+            te.visit(locator);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        
+        builder.setVarOrder(locator.getVarOrder());
+        builder.setQueryType(locator.getQueryType());
+    }
+    
+    public static class QueryMetadataLocator extends QueryModelVisitorBase<Exception> {
+        
+        private VariableOrder varOrder;
+        private QueryType queryType;
+        
+        public VariableOrder getVarOrder() {
+            return varOrder;
+        }
+        
+        public QueryType getQueryType() {
+            return queryType;
+        }
+        
+        public void meet(Projection node) throws Exception {
+            Set<String> bindingNames = node.getBindingNames();
+            if(varOrder == null) {
+                varOrder = new VariableOrder(bindingNames);
+            }
+            
+            if(queryType == null) {
+                queryType = QueryType.Projection;
+            }
+            super.meet(node);
+        }
+        
+        public void meet(Reduced node) throws Exception {
+            if(varOrder == null) {
+                varOrder = getConstructGraphVarOrder(node);
+            }
+            
+            if(queryType == null) {
+                queryType = QueryType.Construct;
+            }
+            super.meet(node);
+        }
+        
+        public void meetOther(final QueryModelNode node) throws Exception {
+            if (node instanceof PeriodicQueryNode) {
+                queryType = QueryType.Periodic;
+            } else {
+                super.meetOther(node);
+            }
+        }
+    }
+    
+    private static VariableOrder getConstructGraphVarOrder(Reduced node) {
+        
+        //get child node
+          QueryModelNode child = node.getArg();
+          Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection);
+          UnaryTupleOperator unary = (UnaryTupleOperator) child;
+          
+          //get ProjectionElemList to build ConstructGraph
+          final List<ProjectionElemList> projections = new ArrayList<>();
+          if(unary instanceof Projection) {
+              projections.add(((Projection) unary).getProjectionElemList());
+          } else {
+              projections.addAll(((MultiProjection)unary).getProjections());
+          }
+          
+          return getConstructGraphVarOrder(projections);
+      }
+    
+    private static VariableOrder getConstructGraphVarOrder(List<ProjectionElemList> projections) {
+        Set<String> varOrders = new HashSet<>();
+        
+        for(ProjectionElemList elems: projections) {
+            for(ProjectionElem elem: elems.getElements()) {
+                String name = elem.getSourceName();
+                if(!name.startsWith("-const-") && !name.startsWith("-anon-")) {
+                    varOrders.add(name);
+                }
+            }
+        }
+        
+        return new VariableOrder(varOrders);
+    }
+    
+    
+    /**
+     * Update a query node's metadata to include it's binding set variable order
+     * and it's parent node id. This information is only known when handling
+     * the parent node.
+     *
+     * @param fluoQueryBuilder - Builder whose metadata is updatad
+     * @param childNodeId - The node ID of the child node.
+     * @param childVarOrder - The variable order of the child node's binding sets.
+     * @param parentNodeId - The node ID that consumes the child's binding sets.
+     */
+    private static void setChildMetadata(final FluoQuery.Builder fluoQueryBuilder, final String childNodeId, final VariableOrder childVarOrder, final String parentNodeId) {
+        checkNotNull(childNodeId);
+        checkNotNull(childVarOrder);
+        checkNotNull(parentNodeId);
+
+        final NodeType childType = NodeType.fromNodeId(childNodeId).get();
+        switch (childType) {
+        case STATEMENT_PATTERN:
+            StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
+            if (spBuilder == null) {
+                spBuilder = StatementPatternMetadata.builder(childNodeId);
+                fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
+            }
+
+            spBuilder.setVarOrder(childVarOrder);
+            spBuilder.setParentNodeId(parentNodeId);
+            break;
+
+        case JOIN:
+            JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
+            if (joinBuilder == null) {
+                joinBuilder = JoinMetadata.builder(childNodeId);
+                fluoQueryBuilder.addJoinMetadata(joinBuilder);
+            }
+
+            joinBuilder.setVarOrder(childVarOrder);
+            joinBuilder.setParentNodeId(parentNodeId);
+            break;
+
+        case FILTER:
+            FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
+            if (filterBuilder == null) {
+                filterBuilder = FilterMetadata.builder(childNodeId);
+                fluoQueryBuilder.addFilterMetadata(filterBuilder);
+            }
+
+            filterBuilder.setVarOrder(childVarOrder);
+            filterBuilder.setParentNodeId(parentNodeId);
+            break;
+
+        case AGGREGATION:
+            AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
+            if (aggregationBuilder == null) {
+                aggregationBuilder = AggregationMetadata.builder(childNodeId);
+                fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
+            }
+
+            aggregationBuilder.setVarOrder(childVarOrder);
+            aggregationBuilder.setParentNodeId(parentNodeId);
+            break;
+            
+        case PROJECTION:
+            ProjectionMetadata.Builder projectionBuilder = fluoQueryBuilder.getProjectionBuilder(childNodeId).orNull();
+            if(projectionBuilder == null) {
+                projectionBuilder = ProjectionMetadata.builder(childNodeId);
+                fluoQueryBuilder.addProjectionBuilder(projectionBuilder);
+            }
+            
+            projectionBuilder.setVarOrder(childVarOrder);
+            projectionBuilder.setParentNodeId(parentNodeId);
+            break;
+            
+        case QUERY:
+            throw new IllegalArgumentException("A QUERY node cannot be the child of another node.");
+        
+        case CONSTRUCT:
+            ConstructQueryMetadata.Builder constructBuilder = fluoQueryBuilder.getConstructQueryBuilder().orNull();
+            if(constructBuilder == null) {
+                constructBuilder = ConstructQueryMetadata.builder();
+                constructBuilder.setNodeId(childNodeId);
+                fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
+            }
+            
+            Preconditions.checkArgument(childNodeId.equals(constructBuilder.getNodeId()));
+            constructBuilder.setVarOrder(childVarOrder);
+            constructBuilder.setParentNodeId(parentNodeId);
+            break;
+        
+        case PERIODIC_QUERY:
+            PeriodicQueryMetadata.Builder periodicQueryBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull();
+            if (periodicQueryBuilder == null) {
+                periodicQueryBuilder = PeriodicQueryMetadata.builder();
+                periodicQueryBuilder.setNodeId(childNodeId);
+                fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder);
+            }
+            periodicQueryBuilder.setVarOrder(childVarOrder);
+            periodicQueryBuilder.setParentNodeId(parentNodeId);
+            break;
+            
+        default:
+            throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
index 7de10d5..beead93 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
@@ -127,7 +127,7 @@ public class StatementPatternMetadata extends CommonNodeMetadata {
      * Builds instances of {@link StatementPatternMetadata}.
      */
     @DefaultAnnotation(NonNull.class)
-    public static final class Builder {
+    public static final class Builder implements CommonNodeMetadata.Builder {
 
         private final String nodeId;
         private VariableOrder varOrder;
@@ -160,6 +160,11 @@ public class StatementPatternMetadata extends CommonNodeMetadata {
             this.varOrder = varOrder;
             return this;
         }
+        
+        @Override
+        public VariableOrder getVariableOrder() {
+            return varOrder;
+        }
 
         /**
          * Sets the statement pattern new statements are matched against.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
new file mode 100644
index 0000000..303f9bb
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.util.VariableOrderUpdateVisitor.UpdateAction;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class for manipulating components of a {@link FluoQuery}.
+ *
+ */
+public class FluoQueryUtils {
+
+    /**
+     * Updates the {@link VariableOrder}s of a given {@link FluoQuery.Builder}.
+     * @param builder - builder whose VariableOrders will be updated
+     * @param action - add or delete variables
+     * @param variables - variables to be added or deleted
+     * @param stopNodeId - node to stop at
+     * @return - FluoQuery.Builder with updated VariableOrders
+     */
+    public static FluoQuery.Builder updateVarOrders(FluoQuery.Builder builder, UpdateAction action, List<String> variables, String stopNodeId) {
+        VariableOrderUpdateVisitor visitor = new VariableOrderUpdateVisitor(builder, action, variables, stopNodeId);
+        visitor.visit();
+        
+        return builder;
+    }
+    
+    /**
+     * Converts the fluo query id to a pcj id
+     * @param fluoQueryId - query id of the form query_prefix + _ + UUID
+     * @return the pcjid which consists of only the UUID portion of the fluo query id
+     */
+    public static String convertFluoQueryIdToPcjId(String fluoQueryId) {
+        Preconditions.checkNotNull(fluoQueryId);
+        String[] queryIdParts = fluoQueryId.split(IncrementalUpdateConstants.QUERY_PREFIX + "_");
+        Preconditions.checkArgument(queryIdParts.length == 2 && queryIdParts[1]!= null && queryIdParts[1].length() > 0);
+        return queryIdParts[1];
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
index fd24af2..406ba4c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
@@ -20,7 +20,6 @@ package org.apache.rya.indexing.pcj.fluo.app.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -30,13 +29,8 @@ import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.openrdf.model.Literal;
 import org.openrdf.model.URI;
 import org.openrdf.model.Value;
@@ -194,72 +188,6 @@ public class PeriodicQueryUtil {
     }
 
     /**
-     * Adds the variable "periodicBinId" to the beginning of all {@link VariableOrder}s for the 
-     * Metadata nodes that appear above the PeriodicQueryNode.  This ensures that the binId is
-     * written first in the Row so that bins can be easily scanned and deleted.
-     * @param builder
-     * @param nodeId
-     */
-    public static void updateVarOrdersToIncludeBin(FluoQuery.Builder builder, String nodeId) {
-        NodeType type = NodeType.fromNodeId(nodeId).orNull();
-        if (type == null) {
-            throw new IllegalArgumentException("NodeId must be associated with an existing MetadataBuilder.");
-        }
-        switch (type) {
-        case AGGREGATION:
-            AggregationMetadata.Builder aggBuilder = builder.getAggregateBuilder(nodeId).orNull();
-            if (aggBuilder != null) {
-                VariableOrder varOrder = aggBuilder.getVariableOrder();
-                VariableOrder groupOrder = aggBuilder.getGroupByVariableOrder();
-                // update varOrder with BIN_ID
-                List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
-                orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
-                aggBuilder.setVariableOrder(new VariableOrder(orderList));
-                // update groupVarOrder with BIN_ID
-                List<String> groupOrderList = new ArrayList<>(groupOrder.getVariableOrders());
-                groupOrderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
-                aggBuilder.setGroupByVariableOrder(new VariableOrder(groupOrderList));
-                // recursive call to update the VariableOrders of all ancestors
-                // of this node
-                updateVarOrdersToIncludeBin(builder, aggBuilder.getParentNodeId());
-            } else {
-                throw new IllegalArgumentException("There is no AggregationMetadata.Builder for the indicated Id.");
-            }
-            break;
-        case PERIODIC_QUERY:
-            PeriodicQueryMetadata.Builder periodicBuilder = builder.getPeriodicQueryBuilder().orNull();
-            if (periodicBuilder != null && periodicBuilder.getNodeId().equals(nodeId)) {
-                VariableOrder varOrder = periodicBuilder.getVarOrder();
-                List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
-                orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
-                periodicBuilder.setVarOrder(new VariableOrder(orderList));
-                // recursive call to update the VariableOrders of all ancestors
-                // of this node
-                updateVarOrdersToIncludeBin(builder, periodicBuilder.getParentNodeId());
-            } else {
-                throw new IllegalArgumentException(
-                        "PeriodicQueryMetadata.Builder id does not match the indicated id.  A query cannot have more than one PeriodicQueryMetadata Node.");
-            }
-            break;
-        case QUERY:
-            QueryMetadata.Builder queryBuilder = builder.getQueryBuilder().orNull();
-            if (queryBuilder != null && queryBuilder.getNodeId().equals(nodeId)) {
-                VariableOrder varOrder = queryBuilder.getVariableOrder();
-                List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
-                orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
-                queryBuilder.setVariableOrder(new VariableOrder(orderList));
-            } else {
-                throw new IllegalArgumentException(
-                        "QueryMetadata.Builder id does not match the indicated id.  A query cannot have more than one QueryMetadata Node.");
-            }
-            break;
-        default:
-            throw new IllegalArgumentException(
-                    "Incorrectly positioned PeriodicQueryNode.  The PeriodicQueryNode can only be positioned below Projections, Extensions, and ConstructQueryNodes.");
-        }
-    }
-
-    /**
      * Collects all Metadata node Ids that are ancestors of the PeriodicQueryNode and contain the variable 
      * {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}.
      * @param sx - Fluo Snapshot for scanning Fluo
@@ -277,6 +205,10 @@ public class PeriodicQueryUtil {
         case PERIODIC_QUERY:
             ids.add(nodeId);
             break;
+        case PROJECTION:
+            ids.add(nodeId);
+            getPeriodicQueryNodeAncestorIds(sx, sx.get( Bytes.of(nodeId), FluoQueryColumns.PROJECTION_CHILD_NODE_ID).toString(), ids);
+            break;
         case QUERY:
             ids.add(nodeId);
             getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.QUERY_CHILD_NODE_ID).toString(), ids);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java
new file mode 100644
index 0000000..f433849
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryBuilderVisitorBase;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Visitor that traverses a {@link FluoQuery.Builder} and performs the indicated {@link UpdateAction}
+ * on the {@link VariableOrder}s of each node using a provided list of variables.  The visitor
+ * either adds the provided list of variables to the VariableOrder of each node or deletes the
+ * provided variables from the VariableOrder of each node.
+ *
+ */
+public class VariableOrderUpdateVisitor extends QueryBuilderVisitorBase {
+
+    /**
+     * Enum class indicating whether to add or delete variables from
+     * the VariableOrders of nodes in the FluoQuery.
+     *
+     */
+    public static enum UpdateAction {
+        AddVariable, DeleteVariable
+    };
+
+    private UpdateAction action;
+    private List<String> variables;
+    private String stopNodeId;
+
+    /**
+     * Creates a VariableOrderUpdateVisitor to update the variables in a given FluoQuery.Builder
+     * @param fluoBuilder - builder whose VariableOrder will be updated
+     * @param action - either add or delete 
+     * @param variables - variables to be added or deleted
+     * @param stopNodeId - indicates the builder node to stop at
+     */
+    public VariableOrderUpdateVisitor(FluoQuery.Builder fluoBuilder, UpdateAction action, List<String> variables, String stopNodeId) {
+        super(fluoBuilder);
+        this.action = Preconditions.checkNotNull(action);
+        this.variables = Preconditions.checkNotNull(variables);
+        this.stopNodeId = Preconditions.checkNotNull(stopNodeId);
+    }
+
+    public void visit(QueryMetadata.Builder builder) {
+        builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+        if(!atStopNode(builder.getNodeId())) {
+            super.visit(builder);
+        }
+    }
+
+    public void visit(ProjectionMetadata.Builder builder) {
+        builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+        if(action == UpdateAction.AddVariable) {
+            builder.setProjectedVars(updateOrder(builder.getProjectionVars()));
+        }
+        if(!atStopNode(builder.getNodeId())) {
+            super.visit(builder);
+        }
+    }
+
+    public void visit(ConstructQueryMetadata.Builder builder) {
+        builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+        if(!atStopNode(builder.getNodeId())) {
+            super.visit(builder);
+        }
+    }
+
+    public void visit(FilterMetadata.Builder builder) {
+        builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+        if(!atStopNode(builder.getNodeId())) {
+            super.visit(builder);
+        }
+    }
+
+    public void visit(PeriodicQueryMetadata.Builder builder) {
+        builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+        if(!atStopNode(builder.getNodeId())) {
+            super.visit(builder);
+        }
+    }
+
+    public void visit(JoinMetadata.Builder builder) {
+        builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+        if(!atStopNode(builder.getNodeId())) {
+            super.visit(builder);
+        }
+    }
+
+    public void visit(AggregationMetadata.Builder builder) {
+        builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+        builder.setGroupByVariableOrder(updateOrder(builder.getGroupByVariableOrder()));
+        if(!atStopNode(builder.getNodeId())) {
+            super.visit(builder);
+        }
+    }
+
+    public void visit(StatementPatternMetadata.Builder builder) {
+        if(!atStopNode(builder.getNodeId())) {
+            super.visit(builder);
+        }
+    }
+
+    boolean atStopNode(String nodeId) {
+        return nodeId.equals(stopNodeId);
+    }
+    
+    private VariableOrder updateOrder(VariableOrder varOrder) {
+
+        switch (action) {
+        case AddVariable:
+            varOrder = addBindingToOrder(varOrder);
+            break;
+        case DeleteVariable:
+            varOrder = deleteBindingFromOrder(varOrder);
+            break;
+        }
+        return varOrder;
+    }
+
+    private VariableOrder addBindingToOrder(VariableOrder varOrder) {
+        List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
+        orderList.addAll(0, variables);
+        return new VariableOrder(orderList);
+    }
+
+    private VariableOrder deleteBindingFromOrder(VariableOrder varOrder) {
+        List<String> vars = new ArrayList<>();
+        varOrder.getVariableOrders().forEach(x -> {
+            if (!variables.contains(x) || x.equals(IncrementalUpdateConstants.PERIODIC_BIN_ID)) {
+                vars.add(x);
+            }
+        });
+        return new VariableOrder(vars);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
index c8ca6af..b40ba3f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeRelocator;
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeVisitor;
@@ -162,17 +162,17 @@ public class PeriodicQueryUtilTest {
                 + "?obs <uri:hasTime> ?time. " //n
                 + "?obs <uri:hasLattitude> ?lat }"; //n
          
-         SPARQLParser parser = new SPARQLParser();
-         ParsedQuery pq = parser.parseQuery(query, null);
          SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
-         FluoQuery fluoQuery = builder.make(pq, new NodeIds());
+         builder.setSparql(query);
+         builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+         FluoQuery fluoQuery = builder.build();
          
          PeriodicQueryMetadata periodicMeta = fluoQuery.getPeriodicQueryMetadata().orNull();
          Assert.assertEquals(true, periodicMeta != null);
          VariableOrder periodicVars = periodicMeta.getVariableOrder();
          Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, periodicVars.getVariableOrders().get(0));
          
-         QueryMetadata queryMeta = fluoQuery.getQueryMetadata().get();
+         QueryMetadata queryMeta = fluoQuery.getQueryMetadata();
          VariableOrder queryVars = queryMeta.getVariableOrder();
          Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, queryVars.getVariableOrders().get(0));
          

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
new file mode 100644
index 0000000..b432868
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueryBuilderVisitorTest {
+
+    @Test
+    public void builderTest() {
+        
+        FluoQuery.Builder fluoBuilder = FluoQuery.builder();
+        
+        String queryId = NodeType.generateNewFluoIdForType(NodeType.QUERY);
+        String projectionId = NodeType.generateNewFluoIdForType(NodeType.PROJECTION);
+        String joinId = NodeType.generateNewFluoIdForType(NodeType.JOIN);
+        String leftSp = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+        String rightSp = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+        
+        List<String> expected = Arrays.asList(queryId, projectionId, joinId, leftSp, rightSp);
+        
+        QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId);
+        queryBuilder.setChildNodeId(projectionId);
+        
+        ProjectionMetadata.Builder projectionBuilder = ProjectionMetadata.builder(projectionId);
+        projectionBuilder.setChildNodeId(joinId);
+        
+        JoinMetadata.Builder joinBuilder = JoinMetadata.builder(joinId);
+        joinBuilder.setLeftChildNodeId(leftSp);
+        joinBuilder.setRightChildNodeId(rightSp);
+        
+        StatementPatternMetadata.Builder left = StatementPatternMetadata.builder(leftSp);
+        StatementPatternMetadata.Builder right = StatementPatternMetadata.builder(rightSp);
+        
+        fluoBuilder.setQueryMetadata(queryBuilder);
+        fluoBuilder.addProjectionBuilder(projectionBuilder);
+        fluoBuilder.addJoinMetadata(joinBuilder);
+        fluoBuilder.addStatementPatternBuilder(left);
+        fluoBuilder.addStatementPatternBuilder(right);
+        
+        QueryBuilderPrinter printer = new QueryBuilderPrinter(fluoBuilder);
+        printer.visit();
+        Assert.assertEquals(expected, printer.getIds());
+    }
+    
+    
+    public static class QueryBuilderPrinter extends QueryBuilderVisitorBase {
+        
+        private List<String> ids = new ArrayList<>();
+        
+        public List<String> getIds() {
+            return ids;
+        }
+        
+        public QueryBuilderPrinter(FluoQuery.Builder builder) {
+            super(builder);
+        }
+        
+        public void visit(QueryMetadata.Builder queryBuilder) {
+            System.out.println(queryBuilder.getNodeId());
+            ids.add(queryBuilder.getNodeId());
+            super.visit(queryBuilder);
+        }
+        
+        public void visit(ProjectionMetadata.Builder projectionBuilder) {
+            System.out.println(projectionBuilder.getNodeId());
+            ids.add(projectionBuilder.getNodeId());
+            super.visit(projectionBuilder);
+        }
+        
+        public void visit(JoinMetadata.Builder joinBuilder) {
+            System.out.println(joinBuilder.getNodeId());
+            ids.add(joinBuilder.getNodeId());
+            super.visit(joinBuilder);
+        }
+        
+        public void visit(StatementPatternMetadata.Builder statementBuilder) {
+            System.out.println(statementBuilder.getNodeId());
+            ids.add(statementBuilder.getNodeId());
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
new file mode 100644
index 0000000..5c89a75
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueryMetadataVisitorTest {
+
+    @Test
+    public void builderTest() {
+        String query = "prefix function: <http://org.apache.rya/function#> " // n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?id (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id } group by ?id"; // n
+        
+        SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+        builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+        builder.setSparql(query);
+        FluoQuery fluoQuery = builder.build();
+        
+        QueryMetadata queryMetadata = fluoQuery.getQueryMetadata();
+        String queryId = queryMetadata.getNodeId();
+        String projectionId = queryMetadata.getChildNodeId();
+        String aggId = fluoQuery.getProjectionMetadata(projectionId).get().getChildNodeId();
+        String periodicId = fluoQuery.getAggregationMetadata(aggId).get().getChildNodeId();
+        String joinId = fluoQuery.getPeriodicQueryMetadata(periodicId).get().getChildNodeId();
+        String leftSp = fluoQuery.getJoinMetadata(joinId).get().getLeftChildNodeId();
+        String rightSp = fluoQuery.getJoinMetadata(joinId).get().getRightChildNodeId();
+        
+        List<String> expected = Arrays.asList(queryId, projectionId, aggId, periodicId, joinId, leftSp, rightSp);
+        QueryMetadataVisitor visitor = new QueryMetadataVisitor(fluoQuery);
+        visitor.visit();
+        
+        Assert.assertEquals(expected, visitor.getIds());
+    }
+    
+    
+    public static class QueryMetadataVisitor extends QueryMetadataVisitorBase {
+        
+        private List<String> ids = new ArrayList<>();
+        
+        public List<String> getIds() {
+            return ids;
+        }
+        
+        public QueryMetadataVisitor(FluoQuery metadata) {
+            super(metadata);
+        }
+        
+        public void visit(QueryMetadata metadata) {
+            ids.add(metadata.getNodeId());
+            super.visit(metadata);
+        }
+        
+        public void visit(ProjectionMetadata metadata) {
+            ids.add(metadata.getNodeId());
+            super.visit(metadata);
+        }
+        
+        public void visit(JoinMetadata metadata) {
+            ids.add(metadata.getNodeId());
+            super.visit(metadata);
+        }
+        
+        public void visit(StatementPatternMetadata metadata) {
+            ids.add(metadata.getNodeId());
+        }
+        
+        public void visit(PeriodicQueryMetadata metadata) {
+            ids.add(metadata.getNodeId());
+            super.visit(metadata);
+        }
+        
+        public void visit(FilterMetadata metadata) {
+            ids.add(metadata.getNodeId());
+            super.visit(metadata);
+        }
+        
+        public void visit(AggregationMetadata metadata) {
+            ids.add(metadata.getNodeId());
+            super.visit(metadata);
+        }
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
index 854798d..3f335f4 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
@@ -41,7 +41,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
 import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand;
 import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest;
 import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -124,7 +124,7 @@ public class NewQueryCommand implements PcjAdminClientCommand {
         log.trace("SPARQL Query: " + request.getQuery());
         log.trace("Var Orders: " + request.getVarOrders());
         log.trace("Loading these values into the Fluo app.");
-        final CreatePcj createPcj = new CreatePcj();
+        final CreateFluoPcj createPcj = new CreateFluoPcj();
         try {
             // Create the PCJ in Rya.
             final String sparql = request.getQuery();