You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/06/22 18:05:16 UTC

[4/4] incubator-rya git commit: RYA-273-Construct Query Support. Closes #161.

RYA-273-Construct Query Support. Closes #161.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/60090ad5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/60090ad5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/60090ad5

Branch: refs/heads/master
Commit: 60090ad52de294d55e2bcea2a0629ee19bfb3827
Parents: 646d21b
Author: Caleb Meier <ca...@parsons.com>
Authored: Fri Apr 14 19:20:25 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Thu Jun 22 11:03:02 2017 -0700

----------------------------------------------------------------------
 common/rya.api/pom.xml                          |   6 +-
 .../org/apache/rya/api/domain/RyaSubGraph.java  | 118 +++++++
 .../kryo/RyaStatementSerializer.java            | 162 +++++++++
 .../kryo/RyaSubGraphSerializer.java             |  84 +++++
 ...AbstractAccumuloRdfConfigurationBuilder.java |  26 +-
 .../apache/rya/sail/config/RyaSailFactory.java  |  40 +++
 .../rya/indexing/pcj/fluo/api/CreatePcj.java    |  71 +++-
 .../rya/indexing/pcj/fluo/api/DeletePcj.java    |  11 +-
 .../indexing/pcj/fluo/app/ConstructGraph.java   | 141 ++++++++
 .../pcj/fluo/app/ConstructGraphSerializer.java  |  52 +++
 .../pcj/fluo/app/ConstructProjection.java       | 266 ++++++++++++++
 .../fluo/app/ConstructQueryResultUpdater.java   |  91 +++++
 .../pcj/fluo/app/FluoStringConverter.java       |  51 ++-
 .../fluo/app/IncrementalUpdateConstants.java    |   1 +
 .../rya/indexing/pcj/fluo/app/NodeType.java     |  23 +-
 .../export/IncrementalBindingSetExporter.java   |  69 ++++
 .../IncrementalBindingSetExporterFactory.java   | 104 ++++++
 .../app/export/IncrementalResultExporter.java   |  69 ----
 .../IncrementalResultExporterFactory.java       | 104 ------
 .../export/IncrementalRyaSubGraphExporter.java  |  39 ++
 .../IncrementalRyaSubGraphExporterFactory.java  |  47 +++
 .../export/kafka/KafkaBindingSetExporter.java   |  87 +++++
 .../kafka/KafkaBindingSetExporterFactory.java   |  64 ++++
 .../app/export/kafka/KafkaResultExporter.java   |  87 -----
 .../kafka/KafkaResultExporterFactory.java       |  64 ----
 .../export/kafka/KafkaRyaSubGraphExporter.java  |  81 +++++
 .../kafka/KafkaRyaSubGraphExporterFactory.java  |  62 ++++
 .../app/export/kafka/RyaSubGraphKafkaSerDe.java | 100 ++++++
 .../app/export/rya/RyaBindingSetExporter.java   |  72 ++++
 .../rya/RyaBindingSetExporterFactory.java       |  77 ++++
 .../app/export/rya/RyaExportParameters.java     |  15 +
 .../fluo/app/export/rya/RyaResultExporter.java  |  72 ----
 .../export/rya/RyaResultExporterFactory.java    |  77 ----
 .../fluo/app/observers/BindingSetUpdater.java   |  12 +
 .../observers/ConstructQueryResultObserver.java | 198 +++++++++++
 .../fluo/app/observers/QueryResultObserver.java |  36 +-
 .../fluo/app/query/ConstructQueryMetadata.java  | 192 ++++++++++
 .../indexing/pcj/fluo/app/query/FluoQuery.java  | 106 +++++-
 .../pcj/fluo/app/query/FluoQueryColumns.java    |  33 ++
 .../fluo/app/query/FluoQueryMetadataDAO.java    | 181 +++++++---
 .../pcj/fluo/app/query/QueryMetadata.java       |   3 +-
 .../fluo/app/query/SparqlFluoQueryBuilder.java  | 210 ++++++++---
 .../pcj/fluo/app/ConstructGraphTest.java        | 145 ++++++++
 .../pcj/fluo/app/ConstructGraphTestUtils.java   | 126 +++++++
 .../pcj/fluo/app/ConstructProjectionTest.java   | 112 ++++++
 .../pcj/fluo/app/FluoStringConverterTest.java   |   7 -
 .../pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java |  57 +++
 .../export/rya/KafkaExportParametersTest.java   |   4 +-
 .../fluo/client/util/QueryReportRenderer.java   |  28 +-
 .../rya.pcj.fluo/pcj.fluo.integration/pom.xml   | 180 +++++-----
 .../pcj/fluo/ConstructGraphTestUtils.java       | 126 +++++++
 .../indexing/pcj/fluo/KafkaExportITBase.java    | 143 +++++---
 .../rya/indexing/pcj/fluo/RyaExportITBase.java  |   9 +
 .../indexing/pcj/fluo/api/GetQueryReportIT.java |   2 +-
 .../fluo/app/query/FluoQueryMetadataDAOIT.java  |  94 ++++-
 .../pcj/fluo/integration/CreateDeleteIT.java    |   1 +
 .../pcj/fluo/integration/KafkaExportIT.java     |  36 +-
 .../integration/KafkaRyaSubGraphExportIT.java   | 352 +++++++++++++++++++
 .../pcj/functions/geo/FunctionAdapter.java      |   2 -
 59 files changed, 3986 insertions(+), 842 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/pom.xml
----------------------------------------------------------------------
diff --git a/common/rya.api/pom.xml b/common/rya.api/pom.xml
index f73c006..94f191d 100644
--- a/common/rya.api/pom.xml
+++ b/common/rya.api/pom.xml
@@ -70,7 +70,11 @@ under the License.
             <groupId>com.github.stephenc.jcip</groupId>
             <artifactId>jcip-annotations</artifactId>
         </dependency>
-        
+        <dependency>
+			<groupId>com.esotericsoftware.kryo</groupId>
+			<artifactId>kryo</artifactId>
+			<version>2.24.0</version>
+		</dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java
new file mode 100644
index 0000000..f08eba4
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java
@@ -0,0 +1,118 @@
+package org.apache.rya.api.domain;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.base.Objects;
+
+/**
+ * This class packages together a collection of {@link RyaStatement}s to form a subgraph
+ */
+public class RyaSubGraph {
+
+    private String id;
+    private Set<RyaStatement> statements;
+    
+    /**
+     * Creates empty subgraph with given id
+     * @param id - id of the created subgraph
+     */
+    public RyaSubGraph(String id) {
+        this.id = id;
+        this.statements = new HashSet<>();
+    }
+    
+    /**
+     * Creates sugraph with specified id and statements
+     * @param id - id of the created subgraph
+     * @param statements - statements that make up subgraph
+     */
+    public RyaSubGraph(String id, Set<RyaStatement> statements) {
+        this.id = id;
+        this.statements = statements;
+    }
+
+    /**
+     * @return id of this subgraph
+     */
+    public String getId() {
+        return id;
+    }
+    
+    /**
+     * @return RyaStatements representing this subgraph
+     */
+    public Set<RyaStatement> getStatements() {
+        return statements;
+    }
+    
+    /**
+     * Sets id of subgraph
+     * @param id - id of subgraph
+     */
+    public void setId(String id) {
+        this.id = id;
+    }
+    
+    /**
+     * Sets subgraph statements to specified statements
+     * @param statements - statements that will be set to subgraph statements
+     */
+    public void setStatements(Set<RyaStatement> statements) {
+        this.statements = statements;
+    }
+    
+
+    /**
+     * Adds statement to this subgraph
+     * @param statement - RyaStatement to be added to subgraph
+     */
+    public void addStatement(RyaStatement statement){
+        statements.add(statement);
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        
+        if(this == other) {
+            return true;
+        }
+        
+        if(other instanceof RyaSubGraph) {
+            RyaSubGraph bundle = (RyaSubGraph) other;
+            return Objects.equal(this.id, ((RyaSubGraph) other).id) && Objects.equal(this.statements,bundle.statements);
+        }
+        
+        return false;
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(this.id, this.statements);
+    }
+    
+    
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Rya Subgraph {\n").append("   Rya Subgraph ID: " + id + "\n")
+                .append("   Rya Statements: " + statements + "\n").toString();
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java
new file mode 100644
index 0000000..6c0efd2
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java
@@ -0,0 +1,162 @@
+package org.apache.rya.api.domain.serialization.kryo;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+
+/**
+ * Kryo Serializer for {@link RyaStatement}s
+ *
+ */
+public class RyaStatementSerializer extends Serializer<RyaStatement> {
+    
+    /**
+     * Uses Kryo to write RyaStatement to {@lin Output}
+     * @param kryo - writes statement to output
+     * @param output - output stream that statement is written to
+     * @param object - statement written to output
+     */
+    public static void writeToKryo(Kryo kryo, Output output, RyaStatement object) {
+        Preconditions.checkNotNull(kryo);
+        Preconditions.checkNotNull(output);
+        Preconditions.checkNotNull(object);
+        output.writeString(object.getSubject().getData());
+        output.writeString(object.getPredicate().getData());
+        output.writeString(object.getObject().getDataType().toString());
+        output.writeString(object.getObject().getData());
+        boolean hasContext = object.getContext() != null;
+        output.writeBoolean(hasContext);
+        if(hasContext){
+            output.writeString(object.getContext().getData());
+        }
+        boolean shouldWrite = object.getColumnVisibility() != null;
+        output.writeBoolean(shouldWrite);
+        if(shouldWrite){
+            output.writeInt(object.getColumnVisibility().length);
+            output.writeBytes(object.getColumnVisibility());
+        }
+        shouldWrite = object.getQualifer() != null;
+        output.writeBoolean(shouldWrite);
+        if(shouldWrite){
+            output.writeString(object.getQualifer());
+        }
+        shouldWrite = object.getTimestamp() != null;
+        output.writeBoolean(shouldWrite);
+        if(shouldWrite){
+            output.writeLong(object.getTimestamp());
+        }
+        shouldWrite = object.getValue() != null;
+        output.writeBoolean(shouldWrite);
+        if(shouldWrite){
+            output.writeBytes(object.getValue());
+        }
+    }   
+
+    /**
+     * Uses Kryo to write RyaStatement to {@lin Output}
+     * @param kryo - writes statement to output
+     * @param output - output stream that statement is written to
+     * @param object - statement written to output
+     */
+    @Override
+    public void write(Kryo kryo, Output output, RyaStatement object) {
+        writeToKryo(kryo, output, object);
+    }
+    
+    /**
+     * Uses Kryo to read a RyaStatement from {@link Input}
+     * @param kryo - reads statement from input
+     * @param input - Input stream that statement is read from
+     * @param type - Type read from input stream
+     * @return - statement read from input stream
+     */
+    public static RyaStatement readFromKryo(Kryo kryo, Input input, Class<RyaStatement> type){
+        return read(input);
+    }
+
+    /**
+     * Reads RyaStatement from {@link Input} stream
+     * @param input - input stream that statement is read from
+     * @return - statement read from input stream
+     */
+    public static RyaStatement read(Input input){
+        Preconditions.checkNotNull(input);
+        String subject = input.readString();
+        String predicate = input.readString();
+        String objectType = input.readString();
+        String objectValue = input.readString();
+        RyaType value;
+        if (objectType.equals(XMLSchema.ANYURI.toString())){
+            value = new RyaURI(objectValue);
+        }
+        else {
+            value = new RyaType(new URIImpl(objectType), objectValue);
+        }
+        RyaStatement statement = new RyaStatement(new RyaURI(subject), new RyaURI(predicate), value);
+        int length = 0;
+        boolean hasNextValue = input.readBoolean();
+        if (hasNextValue){
+            statement.setContext(new RyaURI(input.readString()));
+        }
+        hasNextValue = input.readBoolean();
+        if (hasNextValue){
+            length = input.readInt();
+            statement.setColumnVisibility(input.readBytes(length));
+        }
+        hasNextValue = input.readBoolean();
+        if (hasNextValue){
+            statement.setQualifer(input.readString());
+        }
+        hasNextValue = input.readBoolean();
+        if (hasNextValue){
+            statement.setTimestamp(input.readLong());
+        }
+        hasNextValue = input.readBoolean();
+        if (hasNextValue){
+            length = input.readInt();
+            statement.setValue(input.readBytes(length));
+        }
+
+        return statement;
+    }
+
+    /**
+     * Uses Kryo to read a RyaStatement from {@link Input}
+     * @param kryo - reads statement from input
+     * @param input - Input stream that statement is read from
+     * @param type - Type read from input stream
+     * @return - statement read from input stream
+     */
+    @Override
+    public RyaStatement read(Kryo kryo, Input input, Class<RyaStatement> type) {
+        return readFromKryo(kryo, input, type);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java
new file mode 100644
index 0000000..dbb6c3b
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java
@@ -0,0 +1,84 @@
+package org.apache.rya.api.domain.serialization.kryo;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo based Serializer/Deserializer for {@link RyaSubGraph}.
+ *
+ */
+public class RyaSubGraphSerializer extends Serializer<RyaSubGraph>{
+    static final Logger log = LoggerFactory.getLogger(RyaSubGraphSerializer.class);
+
+    /**
+     * Use Kryo to write RyaSubGraph to {@link Output} stream
+     * @param kryo - used to write subgraph to output stream
+     * @param output - stream that subgraph is written to
+     * @param object - subgraph written to output stream
+     */
+    @Override
+    public void write(Kryo kryo, Output output, RyaSubGraph object) {
+        output.writeString(object.getId());
+        output.writeInt(object.getStatements().size());
+        for (RyaStatement statement : object.getStatements()){
+            RyaStatementSerializer.writeToKryo(kryo, output, statement);
+        }
+    }
+    
+    /**
+     * Reads RyaSubGraph from {@link Input} stream
+     * @param input - stream that subgraph is read from
+     * @return subgraph read from input stream
+     */
+    public static RyaSubGraph read(Input input){
+        RyaSubGraph bundle = new RyaSubGraph(input.readString());
+        int numStatements = input.readInt();
+        for (int i=0; i < numStatements; i++){
+            bundle.addStatement(RyaStatementSerializer.read(input));
+        }       
+        return bundle;
+    }
+
+    /**
+     * Uses Kryo to read RyaSubGraph from {@link Input} stream
+     * @param kryo - used to read subgraph from input stream
+     * @param input - stream that subgraph is read from
+     * @param type - class of object to be read from input stream (RyaSubgraph)
+     * @return subgraph read from input stream
+     */
+    @Override
+    public RyaSubGraph read(Kryo kryo, Input input, Class<RyaSubGraph> type) {
+        RyaSubGraph bundle = new RyaSubGraph(input.readString());
+        int numStatements = input.readInt();
+        
+        for (int i=0; i < numStatements; i++){
+            bundle.addStatement(RyaStatementSerializer.readFromKryo(kryo, input, RyaStatement.class));
+        }       
+        return bundle;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java
index e342db2..d1422f6 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java
@@ -44,19 +44,19 @@ public abstract class AbstractAccumuloRdfConfigurationBuilder<B extends Abstract
     private boolean useComposite = false;
     private boolean useSelectivity = false;
 
-    protected static final String ACCUMULO_USER = "accumulo.user";
-    protected static final String ACCUMULO_PASSWORD = "accumulo.password";
-    protected static final String ACCUMULO_INSTANCE = "accumulo.instance";
-    protected static final String ACCUMULO_AUTHS = "accumulo.auths";
-    protected static final String ACCUMULO_VISIBILITIES = "accumulo.visibilities";
-    protected static final String ACCUMULO_ZOOKEEPERS = "accumulo.zookeepers";
-    protected static final String ACCUMULO_RYA_PREFIX = "accumulo.rya.prefix";
-    protected static final String USE_INFERENCE = "use.inference";
-    protected static final String USE_DISPLAY_QUERY_PLAN = "use.display.plan";
-    protected static final String USE_MOCK_ACCUMULO = "use.mock";
-    protected static final String USE_PREFIX_HASHING = "use.prefix.hashing";
-    protected static final String USE_COUNT_STATS = "use.count.stats";
-    protected static final String USE_JOIN_SELECTIVITY = "use.join.selectivity";
+    public static final String ACCUMULO_USER = "accumulo.user";
+    public static final String ACCUMULO_PASSWORD = "accumulo.password";
+    public static final String ACCUMULO_INSTANCE = "accumulo.instance";
+    public static final String ACCUMULO_AUTHS = "accumulo.auths";
+    public static final String ACCUMULO_VISIBILITIES = "accumulo.visibilities";
+    public static final String ACCUMULO_ZOOKEEPERS = "accumulo.zookeepers";
+    public static final String ACCUMULO_RYA_PREFIX = "accumulo.rya.prefix";
+    public static final String USE_INFERENCE = "use.inference";
+    public static final String USE_DISPLAY_QUERY_PLAN = "use.display.plan";
+    public static final String USE_MOCK_ACCUMULO = "use.mock";
+    public static final String USE_PREFIX_HASHING = "use.prefix.hashing";
+    public static final String USE_COUNT_STATS = "use.count.stats";
+    public static final String USE_JOIN_SELECTIVITY = "use.join.selectivity";
 
     /**
      * Sets Accumulo user. This is a required parameter to connect to an

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
index bdb33ce..e156f86 100644
--- a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
@@ -131,6 +131,19 @@ public class RyaSailFactory {
         return dao;
     }
 
+    /**
+     * Creates AccumuloRyaDAO without updating the AccumuloRdfConfiguration.  This method does not force
+     * the user's configuration to be consistent with the Rya Instance configuration.  As a result, new index
+     * tables might be created when using this method.  This method does not require the {@link AccumuloRyaInstanceDetailsRepository}
+     * to exist.  This is for internal use, backwards compatibility and testing purposes only.  It is recommended that
+     * {@link RyaSailFactory#getAccumuloDAOWithUpdatedConfig(AccumuloRdfConfiguration)} be used for new installations of Rya.
+     * 
+     * @param config - user configuration
+     * @return - AccumuloRyaDAO with Indexers configured according to user's specification
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     * @throws RyaDAOException
+     */
     public static AccumuloRyaDAO getAccumuloDAO(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException {
         final Connector connector = ConfigUtils.getConnector(config);
         final AccumuloRyaDAO dao = new AccumuloRyaDAO();
@@ -142,6 +155,33 @@ public class RyaSailFactory {
         dao.init();
         return dao;
     }
+    
+    /**
+     * Creates an AccumuloRyaDAO after updating the AccumuloRdfConfiguration so that it is consistent
+     * with the configuration of the RyaInstance that the user is trying to connect to.  This ensures
+     * that user configuration aligns with Rya instance configuration and prevents the creation of 
+     * new index tables based on a user's query configuration.  This method requires the {@link AccumuloRyaInstanceDetailsRepository}
+     * to exist.
+     * 
+     * @param config - user's query configuration
+     * @return - AccumuloRyaDAO with an updated configuration that is consistent with the Rya instance configuration
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     * @throws RyaDAOException
+     */
+    public static AccumuloRyaDAO getAccumuloDAOWithUpdatedConfig(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException {
+        
+        String ryaInstance = config.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
+        Objects.requireNonNull(ryaInstance, "RyaInstance or table prefix is missing from configuration."+RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
+        String user = config.get(AccumuloRdfConfiguration.CLOUDBASE_USER);
+        String pswd = config.get(AccumuloRdfConfiguration.CLOUDBASE_PASSWORD);
+        Objects.requireNonNull(user, "Accumulo user name is missing from configuration."+AccumuloRdfConfiguration.CLOUDBASE_USER);
+        Objects.requireNonNull(pswd, "Accumulo user password is missing from configuration."+AccumuloRdfConfiguration.CLOUDBASE_PASSWORD);
+        config.setTableLayoutStrategy( new TablePrefixLayoutStrategy(ryaInstance) );
+        updateAccumuloConfig(config, user, pswd, ryaInstance);
+        
+        return getAccumuloDAO(config);
+    }
 
     public static void updateAccumuloConfig(final AccumuloRdfConfiguration config, final String user, final String pswd, final String ryaInstance) throws AccumuloException, AccumuloSecurityException {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
index 1de0813..a17f02f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -113,9 +113,49 @@ public class CreatePcj {
         checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0.");
         this.spInsertBatchSize = spInsertBatchSize;
     }
+    
+    
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  This method does not
+     * require a pcjId and does not require a PCJ table to have already been created via {@link PrecomputedJoinStorage}.
+     * This method only adds the metadata to the Fluo table to incrementally generate query results.  Since there
+     * is no PCJ table, the incremental results must be exported to some external queuing service such as Kafka.
+     * This method currently only supports SPARQL COSNTRUCT queries, as they only export to Kafka by default. 
+     *
+     * @param sparql - SPARQL query whose results will be updated in the Fluo table
+     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+     * @return The metadata that was written to the Fluo application for the PCJ.
+     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+     * @throws RuntimeException If SPARQL query is not a CONSTRUCT query.
+     */
+    public FluoQuery createFluoPcj(final FluoClient fluo, String sparql) throws MalformedQueryException, PcjException {
+        requireNonNull(sparql);
+        requireNonNull(fluo);
+
+        // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo.
+        // We use these IDs later when scanning Rya for historic Statement Pattern matches
+        // as well as setting up automatic exports.
+        final NodeIds nodeIds = new NodeIds();
+        final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null);
+        final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
+        checkArgument(fluoQuery.getConstructQueryMetadata().isPresent(), "Sparql query: " + sparql + " must begin with a construct.");
+
+        try (Transaction tx = fluo.newTransaction()) {
+            // Write the query's structure to Fluo.
+            new FluoQueryMetadataDAO().write(tx, fluoQuery);
+            tx.commit();
+        }
+
+        return fluoQuery;
+    }
+
+    
 
     /**
-     * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  This method requires that a
+     * PCJ table already exist for the query corresponding to the pcjId.  Results will be exported
+     * to this table.
      *
      * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
      * @param pcjStorage - Provides access to the PCJ index. (not null)
@@ -146,12 +186,14 @@ public class CreatePcj {
         try (Transaction tx = fluo.newTransaction()) {
             // Write the query's structure to Fluo.
             new FluoQueryMetadataDAO().write(tx, fluoQuery);
-
-            // The results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ.
-            final String queryId = fluoQuery.getQueryMetadata().getNodeId();
-            tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
-            tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
-
+            
+            if (fluoQuery.getQueryMetadata().isPresent()) {
+                // If the query is not a construct query, 
+                // the results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ.
+                final String queryId = fluoQuery.getQueryMetadata().get().getNodeId();
+                tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
+                tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
+            } 
             // Flush the changes to Fluo.
             tx.commit();
         }
@@ -165,7 +207,9 @@ public class CreatePcj {
      * This call scans Rya for Statement Pattern matches and inserts them into
      * the Fluo application. The Fluo application will then maintain the intermediate
      * results as new triples are inserted and export any new query results to the
-     * {@code pcjId} within the provided {@code pcjStorage}.
+     * {@code pcjId} within the provided {@code pcjStorage}.  This method requires that a
+     * PCJ table already exist for the query corresponding to the pcjId.  Results will be exported
+     * to this table.
      *
      * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
      * @param pcjStorage - Provides access to the PCJ index. (not null)
@@ -227,9 +271,14 @@ public class CreatePcj {
         } catch (final IOException e) {
             log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e);
         }
-
-        // return queryId to the caller for later monitoring from the export.
-        return fluoQuery.getQueryMetadata().getNodeId();
+        
+        //return queryId to the caller for later monitoring from the export
+        if(fluoQuery.getConstructQueryMetadata().isPresent()) {
+            return fluoQuery.getConstructQueryMetadata().get().getNodeId();
+        } 
+        
+        return fluoQuery.getQueryMetadata().get().getNodeId();
+        
     }
 
     private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
index c11f9fb..87eb9cc 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
@@ -34,6 +34,7 @@ import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
@@ -139,6 +140,12 @@ public class DeletePcj {
                 nodeIds.add(queryChild);
                 getChildNodeIds(tx, queryChild, nodeIds);
                 break;
+            case CONSTRUCT:
+                final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId);
+                final String constructChild = constructMeta.getChildNodeId();
+                nodeIds.add(constructChild);
+                getChildNodeIds(tx, constructChild, nodeIds);
+                break;
             case JOIN:
                 final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId);
                 final String lchild = joinMeta.getLeftChildNodeId();
@@ -229,7 +236,7 @@ public class DeletePcj {
 
 
     /**
-     * Deletes all BindingSets associated with the specified nodeId.
+     * Deletes all results (BindingSets or Statements) associated with the specified nodeId.
      *
      * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null)
      * @param client - Used to delete the data. (not null)
@@ -240,7 +247,7 @@ public class DeletePcj {
 
         final NodeType type = NodeType.fromNodeId(nodeId).get();
         Transaction tx = client.newTransaction();
-        while(deleteDataBatch(tx, getIterator(tx, nodeId, type.getBsColumn()), type.getBsColumn())) {
+        while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) {
             tx = client.newTransaction();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java
new file mode 100644
index 0000000..6c6f833
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java
@@ -0,0 +1,141 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.BNode;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.BNodeImpl;
+import org.openrdf.query.algebra.StatementPattern;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Creates a construct query graph (represented as a Set of
+ * {@link RyaStatement}s with Binding names subject, predicate, object) from a
+ * given BindingSet and the underlying {@link ConstructProjection}s.
+ *
+ */
+public class ConstructGraph {
+
+    private Set<ConstructProjection> projections;
+    private Set<String> bNodeNames;
+    
+    /**
+     * Creates a ConstructGraph from the specified collection of {@link ConstructProjection}s.
+     * @param projections - ConstructProjections used to create a ConstructGraph
+     */
+    public ConstructGraph(Set<ConstructProjection> projections) {
+        Preconditions.checkNotNull(projections);
+        Preconditions.checkArgument(projections.size() > 0);
+        this.projections = projections;
+        this.bNodeNames = getBNodeNames(projections);
+    }
+    
+    /**
+     * Creates a ConstructGraph from the given Collection of {@link StatementPattern}s.
+     * @param patterns - StatementPatterns used to create a ConstructGraph
+     */
+    public ConstructGraph(Collection<StatementPattern> patterns) {
+        Preconditions.checkNotNull(patterns);
+        Preconditions.checkArgument(patterns.size() > 0);
+        Set<ConstructProjection> projections = new HashSet<>();
+        for(StatementPattern pattern: patterns) {
+            projections.add(new ConstructProjection(pattern));
+        }
+        this.projections = projections;
+        this.bNodeNames = getBNodeNames(projections);
+    }
+    
+    private Set<String> getBNodeNames(Set<ConstructProjection> projections) {
+        Set<String> bNodeNames = new HashSet<>();
+        for (ConstructProjection projection : projections) {
+            Optional<Value> optVal = projection.getSubjValue();
+            if (optVal.isPresent() && optVal.get() instanceof BNode) {
+                bNodeNames.add(projection.getSubjectSourceName());
+            }
+        }
+        return bNodeNames;
+    }
+    
+    private Map<String, BNode> getBNodeMap() {
+        Map<String, BNode> bNodeMap = new HashMap<>();
+        for(String name: bNodeNames) {
+            bNodeMap.put(name, new BNodeImpl(UUID.randomUUID().toString()));
+        }
+        return bNodeMap;
+    }
+    
+    /**
+     * @return - the {@link ConstructProjection}s used to build the construct graph
+     * returned by {@link ConstructGraph#createGraphFromBindingSet(VisibilityBindingSet)}.
+     */
+    public Set<ConstructProjection> getProjections() {
+        return projections;
+    }
+    
+    /**
+     * Creates a construct query graph represented as a Set of {@link RyaStatement}s 
+     * @param bs - VisiblityBindingSet used to build statement BindingSets
+     * @return - Set of RyaStatements that represent a construct query graph.  
+     */
+    public Set<RyaStatement> createGraphFromBindingSet(VisibilityBindingSet bs) {
+        Set<RyaStatement> bSets = new HashSet<>();
+        long ts = System.currentTimeMillis();
+        Map<String, BNode> bNodes = getBNodeMap();
+        for(ConstructProjection projection: projections) {
+            RyaStatement statement = projection.projectBindingSet(bs, bNodes);
+            //ensure that all RyaStatements in graph have the same timestamp
+            statement.setTimestamp(ts);
+            bSets.add(statement);
+        }
+        return bSets;
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if(this == o) {
+            return true;
+        }
+        
+        if(o instanceof ConstructGraph) {
+            ConstructGraph graph = (ConstructGraph) o;
+            return this.projections.equals(graph.projections);
+        }
+        return false;
+    }
+    
+    @Override
+    public int hashCode() {
+        int hash = 17;
+        for(ConstructProjection projection: projections) {
+            hash += projection.hashCode();
+        }
+        
+        return hash;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java
new file mode 100644
index 0000000..82a6c6c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java
@@ -0,0 +1,52 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Converts {@link ConstructGraph}s to and from Strings for
+ * storage and retrieval from Fluo. 
+ *
+ */
+public class ConstructGraphSerializer {
+
+    public static final String SP_DELIM = "\u0002";
+    
+    public static ConstructGraph toConstructGraph(String graphString) {
+        Set<ConstructProjection> projections = new HashSet<>();
+        String[] spStrings = graphString.split(SP_DELIM);
+        for(String sp: spStrings) {
+           projections.add(new ConstructProjection(FluoStringConverter.toStatementPattern(sp))); 
+        }
+        return new ConstructGraph(projections);
+    }
+    
+    public static String toConstructString(ConstructGraph graph) {
+        Set<ConstructProjection> projections = graph.getProjections();
+        Set<String> spStrings = new HashSet<>();
+        for(ConstructProjection projection: projections) {
+            spStrings.add(FluoStringConverter.toStatementPatternString(projection.getStatementPatternRepresentation()));
+        }
+        return Joiner.on(SP_DELIM).join(spStrings);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
new file mode 100644
index 0000000..6c1aa01
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
@@ -0,0 +1,266 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.BNode;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.BNodeImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This class projects a VisibilityBindingSet onto a RyaStatement. The Binding
+ * {@link Value}s that get projected onto subject, predicate and object are
+ * indicated by the names {@link ConstructProjection#getSubjectSourceVar()},
+ * {@link ConstructProjection#getPredicateSourceVar()} and
+ * {@link ConstructProjection#getObjectSourceVar()} and must satisfy standard
+ * RDF constraints for RDF subjects, predicates and objects. The purpose of
+ * projecting {@link BindingSet}s in this way is to provide functionality for
+ * SPARQL Construct queries which create RDF statements from query results.
+ *
+ */
+public class ConstructProjection {
+
+    private static final Logger log = Logger.getLogger(ConstructProjection.class);
+    private String subjName;
+    private String predName;
+    private String objName;
+    private Optional<Value> subjValue;
+    private Optional<Value> predValue;
+    private Optional<Value> objValue;
+    private Var subjVar;
+    private Var predVar;
+    private Var objVar;
+
+    public ConstructProjection(Var subjectVar, Var predicateVar, Var objectVar) {
+        Preconditions.checkNotNull(subjectVar);
+        Preconditions.checkNotNull(predicateVar);
+        Preconditions.checkNotNull(objectVar);
+        subjName = subjectVar.getName();
+        predName = predicateVar.getName();
+        objName = objectVar.getName();
+        Preconditions.checkNotNull(subjName);
+        Preconditions.checkNotNull(predName);
+        Preconditions.checkNotNull(objName);
+        this.subjVar = subjectVar;
+        this.predVar = predicateVar;
+        this.objVar = objectVar;
+        if((subjVar.isAnonymous() || subjName.startsWith("-anon-")) && subjectVar.getValue() == null) {
+            subjValue = Optional.of(new BNodeImpl(""));
+        } else {
+            subjValue = Optional.ofNullable(subjectVar.getValue());
+        }
+        predValue = Optional.ofNullable(predicateVar.getValue());
+        objValue = Optional.ofNullable(objectVar.getValue());
+    }
+
+    public ConstructProjection(StatementPattern pattern) {
+        this(pattern.getSubjectVar(), pattern.getPredicateVar(), pattern.getObjectVar());
+    }
+
+    /**
+     * Returns a Var with info about the Value projected onto the RyaStatement
+     * subject. If the org.openrdf.query.algebra.Var returned by this method is
+     * not constant (as indicated by {@link Var#isConstant()}, then
+     * {@link Var#getName()} is the Binding name that gets projected. If the Var
+     * is constant, then {@link Var#getValue()} is assigned to the subject
+     * 
+     * @return {@link org.openrdf.query.algebra.Var} containing info about
+     *         Binding that gets projected onto the subject
+     */
+    public String getSubjectSourceName() {
+        return subjName;
+    }
+
+    /**
+     * Returns a Var with info about the Value projected onto the RyaStatement
+     * predicate. If the org.openrdf.query.algebra.Var returned by this method
+     * is not constant (as indicated by {@link Var#isConstant()}, then
+     * {@link Var#getName()} is the Binding name that gets projected. If the Var
+     * is constant, then {@link Var#getValue()} is assigned to the predicate
+     * 
+     * @return {@link org.openrdf.query.algebra.Var} containing info about
+     *         Binding that gets projected onto the predicate
+     */
+    public String getPredicateSourceName() {
+        return predName;
+    }
+
+    /**
+     * Returns a Var with info about the Value projected onto the RyaStatement
+     * object. If the org.openrdf.query.algebra.Var returned by this method is
+     * not constant (as indicated by {@link Var#isConstant()}, then
+     * {@link Var#getName()} is the Binding name that gets projected. If the Var
+     * is constant, then {@link Var#getValue()} is assigned to the object
+     * 
+     * @return {@link org.openrdf.query.algebra.Var} containing info about
+     *         Binding that gets projected onto the object
+     */
+    public String getObjectSourceName() {
+        return objName;
+    }
+
+    /**
+     * @return Value set for RyaStatement subject (if present)
+     */
+    public Optional<Value> getSubjValue() {
+        return subjValue;
+    }
+
+    /**
+     * @return Value set for RyaStatement predicate (if present)
+     */
+    public Optional<Value> getPredValue() {
+        return predValue;
+    }
+
+    /**
+     * @return Value set for RyaStatement object (if present)
+     */
+    public Optional<Value> getObjValue() {
+        return objValue;
+    }
+    
+
+    /**
+     * @return SubjectPattern representation of this ConstructProjection
+     *         containing the {@link ConstructProjection#subjectSourceVar},
+     *         {@link ConstructProjection#predicateSourceVar},
+     *         {@link ConstructProjection#objectSourceVar}
+     */
+    public StatementPattern getStatementPatternRepresentation() {
+        return new StatementPattern(subjVar, predVar, objVar);
+    }
+
+    /**
+     * Projects a given BindingSet onto a RyaStatement. The subject, predicate,
+     * and object are extracted from the input VisibilityBindingSet (if the
+     * subjectSourceVar, predicateSourceVar, objectSourceVar is resp.
+     * non-constant) and from the Var Value itself (if subjectSourceVar,
+     * predicateSource, objectSourceVar is resp. constant).
+     * 
+     * 
+     * @param vBs
+     *            - Visibility BindingSet that gets projected onto an RDF
+     *            Statement BindingSet with Binding names subject, predicate and
+     *            object
+     * @param   bNodeMap - Optional Map used to pass {@link BNode}s for given variable names into
+     *          multiple {@link ConstructProjection}s.  This allows a ConstructGraph to create
+     *          RyaStatements with the same BNode for a given variable name across multiple ConstructProjections.
+     * @return - RyaStatement whose values are determined by
+     *         {@link ConstructProjection#getSubjectSourceVar()},
+     *         {@link ConstructProjection#getPredicateSourceVar()},
+     *         {@link ConstructProjection#getObjectSourceVar()}.
+     * 
+     */
+    public RyaStatement projectBindingSet(VisibilityBindingSet vBs, Map<String, BNode> bNodes) {
+     
+        Preconditions.checkNotNull(vBs);
+        Preconditions.checkNotNull(bNodes);
+        
+        Value subj = getValue(subjName, subjValue, vBs, bNodes);
+        Value pred = getValue(predName, predValue, vBs, bNodes);
+        Value obj = getValue(objName, objValue, vBs, bNodes);
+        
+        Preconditions.checkNotNull(subj);
+        Preconditions.checkNotNull(pred);
+        Preconditions.checkNotNull(obj);
+        Preconditions.checkArgument(subj instanceof Resource);
+        Preconditions.checkArgument(pred instanceof URI);
+
+        RyaURI subjType = RdfToRyaConversions.convertResource((Resource) subj);
+        RyaURI predType = RdfToRyaConversions.convertURI((URI) pred);
+        RyaType objectType = RdfToRyaConversions.convertValue(obj);
+
+        RyaStatement statement = new RyaStatement(subjType, predType, objectType);
+        try {
+            statement.setColumnVisibility(vBs.getVisibility().getBytes("UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            log.trace("Unable to decode column visibility.  RyaStatement being created without column visibility.");
+        }
+        return statement;
+    }
+    
+    private Value getValue(String name, Optional<Value> optValue, VisibilityBindingSet bs, Map<String, BNode> bNodes) {
+        Value returnValue = null;
+        if (optValue.isPresent()) {
+            Value tempValue = optValue.get();
+            if(tempValue instanceof BNode) {
+                Preconditions.checkArgument(bNodes.containsKey(name));
+                returnValue = bNodes.get(name);
+            } else {
+                returnValue = tempValue;
+            }
+        } else {
+            returnValue = bs.getValue(name);
+        }
+        return returnValue;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o instanceof ConstructProjection) {
+            ConstructProjection projection = (ConstructProjection) o;
+            return new EqualsBuilder().append(this.subjName, projection.subjName).append(this.predName, projection.predName)
+                    .append(this.objName, projection.objName).append(this.subjValue, projection.subjValue)
+                    .append(this.predValue, projection.predValue).append(this.objValue, projection.objValue).isEquals();
+        }
+        return false;
+
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(this.subjName, this.predName, this.objName, this.subjValue, this.predValue, this.objValue);
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Construct Projection {\n").append("   Subject Name: " + subjName + "\n")
+                .append("   Subject Value: " + subjValue + "\n").append("   Predicate Name: " + predName + "\n")
+                .append("   Predicate Value: " + predValue + "\n").append("   Object Name: " + objName + "\n")
+                .append("   Object Value: " + objValue + "\n").append("}").toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
new file mode 100644
index 0000000..d8d60b5
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
@@ -0,0 +1,91 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaSchema;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * This class creates results for the ConstructQuery.  This class applies the {@link ConstructGraph}
+ * associated with the Construct Query to generate a collection of {@link RyaStatement}s.  These statements
+ * are then used to form a {@link RyaSubGraph} that is serialized and stored as a value in the Column 
+ * {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}.
+ *
+ */
+public class ConstructQueryResultUpdater {
+
+    private static final Logger log = Logger.getLogger(ConstructQueryResultUpdater.class);
+    private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe();
+    
+    /**
+     * Updates the Construct Query results by applying the {@link ConnstructGraph} to
+     * create a {@link RyaSubGraph} and then writing the subgraph to {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}.
+     * @param tx - transaction used to write the subgraph
+     * @param bs - BindingSet that the ConstructProjection expands into a subgraph
+     * @param metadata - metadata that the ConstructProjection is extracted from
+     */
+    public void updateConstructQueryResults(TransactionBase tx, VisibilityBindingSet bs, ConstructQueryMetadata metadata) {
+        
+        String nodeId = metadata.getNodeId();
+        Column column = FluoQueryColumns.CONSTRUCT_STATEMENTS;
+        ConstructGraph graph = metadata.getConstructGraph();
+        
+        try {
+            Set<RyaStatement> statements = graph.createGraphFromBindingSet(bs);
+            RyaSubGraph subgraph = new RyaSubGraph(metadata.getNodeId(), statements);
+            String resultId = nodeId + "_" + getSubGraphId(subgraph);
+            tx.set(Bytes.of(resultId), column, Bytes.of(serializer.toBytes(subgraph)));
+        } catch (Exception e) {
+            log.trace("Unable to serialize RyaStatement generated by ConstructGraph: " + graph + " from BindingSet: " + bs );
+        }
+    }
+    
+    /**
+     * Generates a simple hash used as an id for the subgraph.  Id generated as hash as opposed
+     * to UUID to avoid the same subgraph result being stored under multiple UUID.  
+     * @param subgraph - subgraph that an id is need for
+     * @return - hash of subgraph used as an id
+     */
+    private int getSubGraphId(RyaSubGraph subgraph) {
+        int id = 17;
+        id = 31*id + subgraph.getId().hashCode();
+        for(RyaStatement statement: subgraph.getStatements()) {
+            int statementId = 7;
+            if(!statement.getSubject().getData().startsWith(RyaSchema.BNODE_NAMESPACE)) {
+                statementId = 17*statementId + statement.getSubject().hashCode();
+            }
+            statementId = 17*statementId + statement.getPredicate().hashCode();
+            statementId = 17*statementId + statement.getObject().hashCode();
+            id += statementId;
+        }
+        return Math.abs(id);
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
index 5221c21..05a8d1c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
@@ -23,17 +23,25 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DE
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE;
 
+import java.util.UUID;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
+import org.openrdf.model.BNode;
 import org.openrdf.model.Literal;
 import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.BNodeImpl;
 import org.openrdf.model.impl.LiteralImpl;
 import org.openrdf.model.impl.URIImpl;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.algebra.Var;
 
+import com.google.common.base.Preconditions;
+
+import org.apache.rya.api.domain.RyaSchema;
 import org.apache.rya.api.domain.RyaType;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 
@@ -85,25 +93,33 @@ public class FluoStringConverter {
      */
     public static Var toVar(final String varString) {
         checkNotNull(varString);
-
-        if(varString.startsWith("-const-")) {
-            // The variable is a constant value.
-            final String[] varParts = varString.split(TYPE_DELIM);
-            final String name = varParts[0];
-            final String valueString = name.substring("-const-".length());
-
+        final String[] varParts = varString.split(TYPE_DELIM);
+        final String name = varParts[0];
+        
+        // The variable is a constant value.
+        if(varParts.length > 1) {
             final String dataTypeString = varParts[1];
             if(dataTypeString.equals(URI_TYPE)) {
                 // Handle a URI object.
+                Preconditions.checkArgument(varParts.length == 2);
+                final String valueString = name.substring("-const-".length());
                 final Var var = new Var(name, new URIImpl(valueString));
-                var.setAnonymous(true);
+                var.setConstant(true);
+                return var;
+            } else if(dataTypeString.equals(RyaSchema.BNODE_NAMESPACE)) { 
+                // Handle a BNode object
+                Preconditions.checkArgument(varParts.length == 3);
+                Var var = new Var(name);
+                var.setValue(new BNodeImpl(varParts[2]));
                 return var;
             } else {
-                // Literal value.
+                // Handle a Literal Value.
+                Preconditions.checkArgument(varParts.length == 2);
+                final String valueString = name.substring("-const-".length());
                 final URI dataType = new URIImpl(dataTypeString);
                 final Literal value = new LiteralImpl(valueString, dataType);
                 final Var var = new Var(name, value);
-                var.setAnonymous(true);
+                var.setConstant(true);
                 return var;
             }
         } else {
@@ -126,19 +142,24 @@ public class FluoStringConverter {
 
         final Var subjVar = sp.getSubjectVar();
         String subj = subjVar.getName();
-        if(subjVar.isConstant()) {
-            subj = subj + TYPE_DELIM + URI_TYPE;
-        }
+        if(subjVar.getValue() != null) {
+            Value subjValue = subjVar.getValue();
+            if (subjValue instanceof BNode ) {
+                subj = subj + TYPE_DELIM + RyaSchema.BNODE_NAMESPACE + TYPE_DELIM + ((BNode) subjValue).getID(); 
+            } else {
+                subj = subj + TYPE_DELIM + URI_TYPE;
+            }
+        } 
 
         final Var predVar = sp.getPredicateVar();
         String pred = predVar.getName();
-        if(predVar.isConstant()) {
+        if(predVar.getValue() != null) {
             pred = pred + TYPE_DELIM + URI_TYPE;
         }
 
         final Var objVar = sp.getObjectVar();
         String obj = objVar.getName();
-        if (objVar.isConstant()) {
+        if (objVar.getValue() != null) {
             final RyaType rt = RdfToRyaConversions.convertValue(objVar.getValue());
             obj =  obj + TYPE_DELIM + rt.getDataType().stringValue();
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index be4df71..f9d14b5 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -33,6 +33,7 @@ public class IncrementalUpdateConstants {
     public static final String FILTER_PREFIX = "FILTER";
     public static final String AGGREGATION_PREFIX = "AGGREGATION";
     public static final String QUERY_PREFIX = "QUERY";
+    public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
 
     public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI";
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
index 5365e30..b829b7e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
@@ -20,6 +20,7 @@ package org.apache.rya.indexing.pcj.fluo.app;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
@@ -30,7 +31,6 @@ import java.util.List;
 import org.apache.fluo.api.data.Column;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QueryNodeMetadataColumns;
-import org.openrdf.query.BindingSet;
 
 import com.google.common.base.Optional;
 
@@ -42,23 +42,24 @@ public enum NodeType {
     JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET),
     STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET),
     QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET),
-    AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET);
+    AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET),
+    CONSTRUCT(QueryNodeMetadataColumns.CONSTRUCT_COLUMNS, FluoQueryColumns.CONSTRUCT_STATEMENTS);
 
     //Metadata Columns associated with given NodeType
     private QueryNodeMetadataColumns metadataColumns;
 
-    //Column where BindingSet results are stored for given NodeType
-    private Column bindingSetColumn;
+    //Column where results are stored for given NodeType
+    private Column resultColumn;
 
     /**
      * Constructs an instance of {@link NodeType}.
      *
      * @param metadataColumns - Metadata {@link Column}s associated with this {@link NodeType}. (not null)
-     * @param bindingSetColumn - The {@link Column} used to store this {@link NodeType|'s {@link BindingSet}s. (not null)
+     * @param resultColumn - The {@link Column} used to store this {@link NodeType}'s results. (not null)
      */
-    private NodeType(final QueryNodeMetadataColumns metadataColumns, final Column bindingSetColumn) {
+    private NodeType(QueryNodeMetadataColumns metadataColumns, Column resultColumn) {
     	this.metadataColumns = requireNonNull(metadataColumns);
-    	this.bindingSetColumn = requireNonNull(bindingSetColumn);
+    	this.resultColumn = requireNonNull(resultColumn);
     }
 
     /**
@@ -70,10 +71,10 @@ public enum NodeType {
 
 
     /**
-     * @return The {@link Column} used to store this {@link NodeType|'s {@link BindingSet}s.
+     * @return The {@link Column} used to store this {@link NodeType}'s query results.
      */
-    public Column getBsColumn() {
-    	return bindingSetColumn;
+    public Column getResultColumn() {
+    	return resultColumn;
     }
 
     /**
@@ -98,6 +99,8 @@ public enum NodeType {
             type = QUERY;
         } else if(nodeId.startsWith(AGGREGATION_PREFIX)) {
             type = AGGREGATION;
+        } else if(nodeId.startsWith(CONSTRUCT_PREFIX)) {
+            type = CONSTRUCT;
         }
 
         return Optional.fromNullable(type);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
new file mode 100644
index 0000000..c2f4cb4
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Exports a single Binding Set that is a new result for a SPARQL query to some
+ * other location.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface IncrementalBindingSetExporter extends AutoCloseable {
+
+    /**
+     * Export a Binding Set that is a result of a SPARQL query that does not include a Group By clause.
+     *
+     * @param tx - The Fluo transaction this export is a part of. (not null)
+     * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null)
+     * @param bindingSetString - The Binding Set as it was represented within the Fluo application. (not null)
+     * @throws ResultExportException The result could not be exported.
+     */
+    public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException;
+
+    /**
+     * A result could not be exported.
+     */
+    public static class ResultExportException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link ResultExportException}.
+         *
+         * @param message - Explains why the exception was thrown.
+         */
+        public ResultExportException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link ResultExportException}.
+         *
+         * @param message - Explains why the exception was thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public ResultExportException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
new file mode 100644
index 0000000..1bf492a
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+import com.google.common.base.Optional;
+
+import org.apache.fluo.api.observer.Observer.Context;
+
+/**
+ * Builds instances of {@link IncrementalBindingSetExporter} using the provided
+ * configurations.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface IncrementalBindingSetExporterFactory {
+
+    /**
+     * Builds an instance of {@link IncrementalBindingSetExporter} using the
+     * configurations that are provided.
+     *
+     * @param context - Contains the host application's configuration values
+     *   and any parameters that were provided at initialization. (not null)
+     * @return An exporter if configurations were found in the context; otherwise absent.
+     * @throws IncrementalExporterFactoryException A non-configuration related
+     *   problem has occurred and the exporter could not be created as a result.
+     * @throws ConfigurationException Thrown if configuration values were
+     *   provided, but an instance of the exporter could not be initialized
+     *   using them. This could be because they were improperly formatted,
+     *   a required field was missing, or some other configuration based problem.
+     */
+    public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
+
+    /**
+     * Indicates a {@link IncrementalBindingSetExporter} could not be created by a
+     * {@link IncrementalBindingSetExporterFactory}.
+     */
+    public static class IncrementalExporterFactoryException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link }.
+         *
+         * @param message - Explains why this exception is being thrown.
+         */
+        public IncrementalExporterFactoryException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link }.
+         *
+         * @param message - Explains why this exception is being thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public IncrementalExporterFactoryException(final String message, final Throwable t) {
+            super(message, t);
+        }
+    }
+
+    /**
+     * The configuration could not be interpreted because required fields were
+     * missing or a value wasn't properly formatted.
+     */
+    public static class ConfigurationException extends IncrementalExporterFactoryException {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link ConfigurationException}.
+         *
+         * @param message - Explains why this exception is being thrown.
+         */
+        public ConfigurationException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link ConfigurationException}.
+         *
+         * @param message - Explains why this exception is being thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public ConfigurationException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
deleted file mode 100644
index 02dced7..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Exports a single Binding Set that is a new result for a SPARQL query to some
- * other location.
- */
-@DefaultAnnotation(NonNull.class)
-public interface IncrementalResultExporter extends AutoCloseable {
-
-    /**
-     * Export a Binding Set that is a result of a SPARQL query that does not include a Group By clause.
-     *
-     * @param tx - The Fluo transaction this export is a part of. (not null)
-     * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null)
-     * @param bindingSetString - The Binding Set as it was represented within the Fluo application. (not null)
-     * @throws ResultExportException The result could not be exported.
-     */
-    public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException;
-
-    /**
-     * A result could not be exported.
-     */
-    public static class ResultExportException extends Exception {
-        private static final long serialVersionUID = 1L;
-
-        /**
-         * Constructs an instance of {@link ResultExportException}.
-         *
-         * @param message - Explains why the exception was thrown.
-         */
-        public ResultExportException(final String message) {
-            super(message);
-        }
-
-        /**
-         * Constructs an instance of {@link ResultExportException}.
-         *
-         * @param message - Explains why the exception was thrown.
-         * @param cause - The exception that caused this one to be thrown.
-         */
-        public ResultExportException(final String message, final Throwable cause) {
-            super(message, cause);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
deleted file mode 100644
index f9fe2bd..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.observer.Observer.Context;
-
-/**
- * Builds instances of {@link IncrementalResultExporter} using the provided
- * configurations.
- */
-@DefaultAnnotation(NonNull.class)
-public interface IncrementalResultExporterFactory {
-
-    /**
-     * Builds an instance of {@link IncrementalResultExporter} using the
-     * configurations that are provided.
-     *
-     * @param context - Contains the host application's configuration values
-     *   and any parameters that were provided at initialization. (not null)
-     * @return An exporter if configurations were found in the context; otherwise absent.
-     * @throws IncrementalExporterFactoryException A non-configuration related
-     *   problem has occurred and the exporter could not be created as a result.
-     * @throws ConfigurationException Thrown if configuration values were
-     *   provided, but an instance of the exporter could not be initialized
-     *   using them. This could be because they were improperly formatted,
-     *   a required field was missing, or some other configuration based problem.
-     */
-    public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
-
-    /**
-     * Indicates a {@link IncrementalResultExporter} could not be created by a
-     * {@link IncrementalResultExporterFactory}.
-     */
-    public static class IncrementalExporterFactoryException extends Exception {
-        private static final long serialVersionUID = 1L;
-
-        /**
-         * Constructs an instance of {@link }.
-         *
-         * @param message - Explains why this exception is being thrown.
-         */
-        public IncrementalExporterFactoryException(final String message) {
-            super(message);
-        }
-
-        /**
-         * Constructs an instance of {@link }.
-         *
-         * @param message - Explains why this exception is being thrown.
-         * @param cause - The exception that caused this one to be thrown.
-         */
-        public IncrementalExporterFactoryException(final String message, final Throwable t) {
-            super(message, t);
-        }
-    }
-
-    /**
-     * The configuration could not be interpreted because required fields were
-     * missing or a value wasn't properly formatted.
-     */
-    public static class ConfigurationException extends IncrementalExporterFactoryException {
-        private static final long serialVersionUID = 1L;
-
-        /**
-         * Constructs an instance of {@link ConfigurationException}.
-         *
-         * @param message - Explains why this exception is being thrown.
-         */
-        public ConfigurationException(final String message) {
-            super(message);
-        }
-
-        /**
-         * Constructs an instance of {@link ConfigurationException}.
-         *
-         * @param message - Explains why this exception is being thrown.
-         * @param cause - The exception that caused this one to be thrown.
-         */
-        public ConfigurationException(final String message, final Throwable cause) {
-            super(message, cause);
-        }
-    }
-}
\ No newline at end of file