You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/06/22 18:05:13 UTC

[1/4] incubator-rya git commit: RYA-273-Construct Query Support. Closes #161.

Repository: incubator-rya
Updated Branches:
  refs/heads/master 646d21b4e -> 60090ad52


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
index 3ed1844..452dd27 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
@@ -18,18 +18,22 @@
  */
 package org.apache.rya.indexing.pcj.fluo;
 
+import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertEquals;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.recipes.test.AccumuloExportITBase;
@@ -40,7 +44,9 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.api.client.Install.InstallConfiguration;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
@@ -48,7 +54,9 @@ import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
@@ -60,8 +68,11 @@ import org.apache.rya.sail.config.RyaSailFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.repository.sail.SailRepositoryConnection;
 import org.openrdf.sail.Sail;
 
+
 import kafka.admin.AdminUtils;
 import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
@@ -74,7 +85,8 @@ import kafka.utils.ZkUtils;
 import kafka.zk.EmbeddedZookeeper;
 
 /**
- * The base Integration Test class used for Fluo applications that export to a Kakfa topic.
+ * The base Integration Test class used for Fluo applications that export to a
+ * Kakfa topic.
  */
 public class KafkaExportITBase extends AccumuloExportITBase {
 
@@ -88,8 +100,10 @@ public class KafkaExportITBase extends AccumuloExportITBase {
     private EmbeddedZookeeper zkServer;
     private ZkClient zkClient;
 
-    // The Rya instance statements are written to that will be fed into the Fluo app.
+    // The Rya instance statements are written to that will be fed into the Fluo
+    // app.
     private RyaSailRepository ryaSailRepo = null;
+    private AccumuloRyaDAO dao = null;
 
     /**
      * Add info about the Kafka queue/topic to receive the export.
@@ -104,7 +118,8 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         observers.add(new ObserverSpecification(FilterObserver.class.getName()));
         observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
 
-        // Configure the export observer to export new PCJ results to the mini accumulo cluster.
+        // Configure the export observer to export new PCJ results to the mini
+        // accumulo cluster.
         final HashMap<String, String> exportParams = new HashMap<>();
 
         final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
@@ -114,11 +129,29 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
         producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
         kafkaParams.addAllProducerConfig(producerConfig);
 
         final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
         observers.add(exportObserverConfig);
+        
+        //create construct query observer and tell it not to export to Kafka
+        //it will only add results back into Fluo
+        HashMap<String, String> constructParams = new HashMap<>();
+        final KafkaExportParameters kafkaConstructParams = new KafkaExportParameters(constructParams);
+        kafkaConstructParams.setExportToKafka(true);
+        
+        // Configure the Kafka Producer
+        final Properties constructProducerConfig = new Properties();
+        constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+        constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName());
+        kafkaConstructParams.addAllProducerConfig(constructProducerConfig);
+
+        final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
+                constructParams);
+        observers.add(constructExportObserverConfig);
 
         // Add the observers to the Fluo Configuration.
         super.getFluoConfiguration().addObservers(observers);
@@ -150,24 +183,24 @@ public class KafkaExportITBase extends AccumuloExportITBase {
     }
 
     @After
-    public void teardownRya() throws Exception {
+    public void teardownRya() {
         final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
         final String instanceName = cluster.getInstanceName();
         final String zookeepers = cluster.getZooKeepers();
 
         // Uninstall the instance of Rya.
         final RyaClient ryaClient = AccumuloRyaClientFactory.build(
-                new AccumuloConnectionDetails(
-                    ACCUMULO_USER,
-                    ACCUMULO_PASSWORD.toCharArray(),
-                    instanceName,
-                    zookeepers),
+                new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
                 super.getAccumuloConnector());
 
-        ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
-
-        // Shutdown the repo.
-        ryaSailRepo.shutDown();
+        try {
+            ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
+            // Shutdown the repo.
+            if(ryaSailRepo != null) {ryaSailRepo.shutDown();}
+            if(dao != null ) {dao.destroy();}
+        } catch (Exception e) {
+            System.out.println("Encountered the following Exception when shutting down Rya: " + e.getMessage());
+        }
     }
 
     private void installRyaInstance() throws Exception {
@@ -177,26 +210,18 @@ public class KafkaExportITBase extends AccumuloExportITBase {
 
         // Install the Rya instance to the mini accumulo cluster.
         final RyaClient ryaClient = AccumuloRyaClientFactory.build(
-                new AccumuloConnectionDetails(
-                    ACCUMULO_USER,
-                    ACCUMULO_PASSWORD.toCharArray(),
-                    instanceName,
-                    zookeepers),
+                new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
                 super.getAccumuloConnector());
 
-        ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
-                .setEnableTableHashPrefix(false)
-                .setEnableFreeTextIndex(false)
-                .setEnableEntityCentricIndex(false)
-                .setEnableGeoIndex(false)
-                .setEnableTemporalIndex(false)
-                .setEnablePcjIndex(true)
-                .setFluoPcjAppName( super.getFluoConfiguration().getApplicationName() )
-                .build());
+        ryaClient.getInstall().install(RYA_INSTANCE_NAME,
+                InstallConfiguration.builder().setEnableTableHashPrefix(false).setEnableFreeTextIndex(false)
+                        .setEnableEntityCentricIndex(false).setEnableGeoIndex(false).setEnableTemporalIndex(false).setEnablePcjIndex(true)
+                        .setFluoPcjAppName(super.getFluoConfiguration().getApplicationName()).build());
 
         // Connect to the Rya instance that was just installed.
         final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
         final Sail sail = RyaSailFactory.getInstance(conf);
+        dao = RyaSailFactory.getAccumuloDAOWithUpdatedConfig(conf);
         ryaSailRepo = new RyaSailRepository(sail);
     }
 
@@ -211,15 +236,12 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
         conf.setAuths("");
 
-
         // PCJ configuration information.
         conf.set(ConfigUtils.USE_PCJ, "true");
         conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
         conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName());
-        conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
-                PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
-        conf.set(ConfigUtils.PCJ_UPDATER_TYPE,
-                PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+        conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+        conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
 
         conf.setDisplayQueryPlan(true);
 
@@ -227,20 +249,29 @@ public class KafkaExportITBase extends AccumuloExportITBase {
     }
 
     /**
-     * @return A {@link RyaSailRepository} that is connected to the Rya instance that statements are loaded into.
+     * @return A {@link RyaSailRepository} that is connected to the Rya instance
+     *         that statements are loaded into.
      */
     protected RyaSailRepository getRyaSailRepository() throws Exception {
         return ryaSailRepo;
     }
 
     /**
+     * @return A {@link AccumuloRyaDAO} so that RyaStatements with distinct
+     *         visibilities can be added to the Rya Instance
+     */
+    protected AccumuloRyaDAO getRyaDAO() {
+        return dao;
+    }
+
+    /**
      * Close all the Kafka mini server and mini-zookeeper
      */
     @After
     public void teardownKafka() {
-        kafkaServer.shutdown();
-        zkClient.close();
-        zkServer.shutdown();
+        if(kafkaServer != null) {kafkaServer.shutdown();}
+        if(zkClient != null) {zkClient.close();}
+        if(zkServer != null) {zkServer.shutdown();}
     }
 
     /**
@@ -257,7 +288,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         // setup producer
         final Properties producerProps = new Properties();
         producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
-        producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
+        producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
         producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
         final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps);
 
@@ -266,7 +297,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
         consumerProps.setProperty("group.id", "group0");
         consumerProps.setProperty("client.id", "consumer0");
-        consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
+        consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
         consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 
         // to make sure the consumer starts from the beginning of the topic
@@ -296,8 +327,10 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
         consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
         consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
-        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.IntegerDeserializer");
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
 
         // to make sure the consumer starts from the beginning of the topic
         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -306,4 +339,32 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         consumer.subscribe(Arrays.asList(TopicName));
         return consumer;
     }
+
+    protected String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
+        requireNonNull(sparql);
+        requireNonNull(statements);
+
+        // Register the PCJ with Rya.
+        final Instance accInstance = super.getAccumuloConnector().getInstance();
+        final Connector accumuloConn = super.getAccumuloConnector();
+
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER,
+                ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn);
+
+        final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
+
+        // Write the data to Rya.
+        final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
+        ryaConn.begin();
+        ryaConn.add(statements);
+        ryaConn.commit();
+        ryaConn.close();
+
+        // Wait for the Fluo application to finish computing the end result.
+        super.getMiniFluo().waitForObservers();
+
+        // The PCJ Id is the topic name the results will be written to.
+        return pcjId;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
index 84b6343..4eab0f6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
@@ -26,8 +26,10 @@ import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
 import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
@@ -68,6 +70,13 @@ public class RyaExportITBase extends FluoITBase {
 
         final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
         observers.add(exportObserverConfig);
+        
+        final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
+        kafkaParams.setExportToKafka(false);
+
+        final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
+                exportParams);
+        observers.add(constructExportObserverConfig);
 
         // Add the observers to the Fluo Configuration.
         super.getFluoConfiguration().addObservers(observers);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index d56c23a..d19646e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -106,7 +106,7 @@ public class GetQueryReportIT extends RyaExportITBase {
 
             final FluoQuery fluoQuery = report.getFluoQuery();
 
-            final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId();
+            final String queryNodeId = fluoQuery.getQueryMetadata().get().getNodeId();
             expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
 
             final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index 082f46d..accabbf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -20,22 +20,30 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.List;
+
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.junit.Test;
 import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
 import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 import org.openrdf.repository.RepositoryException;
 
+import com.google.common.base.Optional;
+
 /**
  * Integration tests the methods of {@link FluoQueryMetadataDAO}.
  */
@@ -160,6 +168,42 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
             assertEquals(originalMetadata, storedMetdata);
         }
     }
+    
+    @Test
+    public void constructQueryMetadataTest() throws MalformedQueryException {
+        
+        String query = "select ?x ?y where {?x <uri:p1> ?y. ?y <uri:p2> <uri:o1> }";
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final ConstructQueryMetadata.Builder builder = ConstructQueryMetadata.builder();
+        builder.setNodeId("nodeId");
+        builder.setSparql(query);
+        builder.setChildNodeId("childNodeId");
+        builder.setConstructGraph(new ConstructGraph(patterns));
+        final ConstructQueryMetadata originalMetadata = builder.build();
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
+
+            // Read it from the Fluo table.
+            ConstructQueryMetadata storedMetdata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetdata = dao.readConstructQueryMetadata(sx, "nodeId");
+            }
+
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalMetadata, storedMetdata);
+        }
+    }
 
     @Test
     public void aggregationMetadataTest_withGroupByVarOrders() {
@@ -242,6 +286,9 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
         final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds());
 
+        assertEquals(QueryType.Projection, originalQuery.getQueryType());
+        assertEquals(false, originalQuery.getConstructQueryMetadata().isPresent());
+        
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
             // Write it to the Fluo table.
             try(Transaction tx = fluoClient.newTransaction()) {
@@ -249,12 +296,51 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
                 tx.commit();
             }
 
-            // Read it from the Fluo table.
-            FluoQuery storedQuery = null;
-            try(Snapshot sx = fluoClient.newSnapshot()) {
-                storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
+        // Read it from the Fluo table.
+        FluoQuery storedQuery = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().get().getNodeId());
+        }
+
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalQuery, storedQuery);
+        }
+    }
+    
+    @Test
+    public void fluoConstructQueryTest() throws MalformedQueryException {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final String sparql =
+                "CONSTRUCT { ?customer <http://travelsTo> <http://England> .  ?customer <http://friendsWith> ?worker }" +
+                "WHERE { " +
+                  "FILTER(?customer = <http://Alice>) " +
+                  "FILTER(?city = <http://London>) " +
+                  "?customer <http://talksTo> ?worker. " +
+                  "?worker <http://livesIn> ?city. " +
+                  "?worker <http://worksAt> <http://Chipotle>. " +
+                "}";
+
+        final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
+        final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds());
+        
+        assertEquals(QueryType.Construct, originalQuery.getQueryType());
+        assertEquals(false, originalQuery.getQueryMetadata().isPresent());
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalQuery);
+                tx.commit();
             }
 
+        // Read it from the Fluo table.
+        FluoQuery storedQuery = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedQuery = dao.readFluoQuery(sx, originalQuery.getConstructQueryMetadata().get().getNodeId());
+        }
+
             // Ensure the deserialized object is the same as the serialized one.
             assertEquals(originalQuery, storedQuery);
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 349d391..414fa70 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -121,6 +121,7 @@ public class CreateDeleteIT extends RyaExportITBase {
             assertEquals(0, empty_rows.size());
         }
     }
+    
 
     private String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
         requireNonNull(sparql);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index 7fa28ab..219e079 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -29,14 +29,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -47,7 +42,6 @@ import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.model.vocabulary.XMLSchema;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.MapBindingSet;
-import org.openrdf.repository.sail.SailRepositoryConnection;
 
 import com.google.common.collect.Sets;
 
@@ -427,38 +421,10 @@ public class KafkaExportIT extends KafkaExportITBase {
 
         // Verify the end results of the query match the expected results.
         final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location"));
+        System.out.println(results);
         assertEquals(expectedResults, results);
     }
 
-    private String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
-        requireNonNull(sparql);
-        requireNonNull(statements);
-
-        // Register the PCJ with Rya.
-        final Instance accInstance = super.getAccumuloConnector().getInstance();
-        final Connector accumuloConn = super.getAccumuloConnector();
-
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(
-                ACCUMULO_USER,
-                ACCUMULO_PASSWORD.toCharArray(),
-                accInstance.getInstanceName(),
-                accInstance.getZooKeepers()), accumuloConn);
-
-        final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
-
-        // Write the data to Rya.
-        final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
-        ryaConn.begin();
-        ryaConn.add(statements);
-        ryaConn.commit();
-        ryaConn.close();
-
-        // Wait for the Fluo application to finish computing the end result.
-        super.getMiniFluo().waitForObservers();
-
-        // The PCJ Id is the topic name the results will be written to.
-        return pcjId;
-    }
 
     private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception {
         requireNonNull(pcjId);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
new file mode 100644
index 0000000..c8167c7
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
@@ -0,0 +1,352 @@
+package org.apache.rya.indexing.pcj.fluo.integration;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils;
+import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.google.common.collect.Sets;
+
+public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
+
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+
+    /**
+     * Add info about the Kafka queue/topic to receive the export.
+     *
+     * @see org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap)
+     */
+    @Override
+    protected void preFluoInitHook() throws Exception {
+        // Setup the observers that will be used by the Fluo PCJ Application.
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new ObserverSpecification(FilterObserver.class.getName()));
+        observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
+
+        // Configure the export observer to export new PCJ results to the mini
+        // accumulo cluster.
+        final HashMap<String, String> exportParams = new HashMap<>();
+
+        final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
+        kafkaParams.setExportToKafka(true);
+
+        // Configure the Kafka Producer
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName());
+        kafkaParams.addAllProducerConfig(producerConfig);
+
+        final ObserverSpecification exportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
+                exportParams);
+        observers.add(exportObserverConfig);
+
+        // Add the observers to the Fluo Configuration.
+        super.getFluoConfiguration().addObservers(observers);
+    }
+
+    @Test
+    public void basicConstructQuery() throws Exception {
+        // A query that groups what is aggregated by one of the keys.
+        final String sparql = "CONSTRUCT { ?customer <urn:travelsTo> ?city . ?customer <urn:friendsWith> ?worker }" + "WHERE { "
+                + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> ?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:Joe"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")),
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:livesIn"), vf.createURI("urn:London")),
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:burgerShack")));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadStatements(sparql, statements);
+
+        // Verify the end results of the query match the expected results.
+        final Set<RyaSubGraph> results = readAllResults(pcjId);
+        
+        final Set<RyaSubGraph> expectedResults = new HashSet<>();
+        RyaSubGraph subGraph = new RyaSubGraph(pcjId);
+        RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:travelsTo"), new RyaURI("urn:London"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Bob"));
+        // if no visibility indicated, then visibilities set to empty byte in
+        // Fluo - they are null by default in RyaStatement
+        // need to set visibility to empty byte so that RyaStatement's equals
+        // will return true
+        statement1.setColumnVisibility(new byte[0]);
+        statement2.setColumnVisibility(new byte[0]);
+
+        Set<RyaStatement> stmnts = new HashSet<>(Arrays.asList(statement1, statement2));
+        subGraph.setStatements(stmnts);
+        expectedResults.add(subGraph);
+
+        ConstructGraphTestUtils.subGraphsEqualIgnoresTimestamp(expectedResults, results);
+    }
+
+    @Test
+    public void basicConstructQueryWithVis() throws Exception {
+        // A query that groups what is aggregated by one of the keys.
+        final String sparql = "CONSTRUCT { ?customer <urn:travelsTo> ?city . ?customer <urn:friendsWith> ?worker }" + "WHERE { "
+                + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> ?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}";
+
+        // Create the Statements that will be loaded into Rya.
+        RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:talksTo"), new RyaURI("urn:Bob"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:livesIn"), new RyaURI("urn:London"));
+        RyaStatement statement3 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack"));
+        statement1.setColumnVisibility("U&W".getBytes("UTF-8"));
+        statement2.setColumnVisibility("V".getBytes("UTF-8"));
+        statement3.setColumnVisibility("W".getBytes("UTF-8"));
+        
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadRyaStatements(sparql, Arrays.asList(statement1, statement2, statement3));
+
+        // Verify the end results of the query match the expected results.
+        final Set<RyaSubGraph> results = readAllResults(pcjId);
+        // Create the expected results of the SPARQL query once the PCJ has been
+        // computed.
+        final Set<RyaSubGraph> expectedResults = new HashSet<>();
+        RyaSubGraph subGraph = new RyaSubGraph(pcjId);
+        RyaStatement statement4 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:travelsTo"), new RyaURI("urn:London"));
+        RyaStatement statement5 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Bob"));
+        // if no visibility indicated, then visibilities set to empty byte in
+        // Fluo - they are null by default in RyaStatement
+        // need to set visibility to empty byte so that RyaStatement's equals
+        // will return true
+        statement4.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+        statement5.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+
+        Set<RyaStatement> stmnts = new HashSet<>(Arrays.asList(statement4, statement5));
+        subGraph.setStatements(stmnts);
+        expectedResults.add(subGraph);
+
+        ConstructGraphTestUtils.subGraphsEqualIgnoresTimestamp(expectedResults, results);
+    }
+
+    
+    @Test
+    public void constructQueryWithVisAndMultipleSubGraphs() throws Exception {
+        // A query that groups what is aggregated by one of the keys.
+        final String sparql = "CONSTRUCT { ?customer <urn:travelsTo> ?city . ?customer <urn:friendsWith> ?worker }" + "WHERE { "
+                + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> ?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}";
+
+        // Create the Statements that will be loaded into Rya.
+        RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:talksTo"), new RyaURI("urn:Bob"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:livesIn"), new RyaURI("urn:London"));
+        RyaStatement statement3 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack"));
+        RyaStatement statement4 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:talksTo"), new RyaURI("urn:Evan"));
+        RyaStatement statement5 = new RyaStatement(new RyaURI("urn:Evan"), new RyaURI("urn:livesIn"), new RyaURI("urn:SanFrancisco"));
+        RyaStatement statement6 = new RyaStatement(new RyaURI("urn:Evan"), new RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack"));
+        statement1.setColumnVisibility("U&W".getBytes("UTF-8"));
+        statement2.setColumnVisibility("V".getBytes("UTF-8"));
+        statement3.setColumnVisibility("W".getBytes("UTF-8"));
+        statement4.setColumnVisibility("A&B".getBytes("UTF-8"));
+        statement5.setColumnVisibility("B".getBytes("UTF-8"));
+        statement6.setColumnVisibility("C".getBytes("UTF-8"));
+        
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadRyaStatements(sparql, Arrays.asList(statement1, statement2, statement3, statement4, statement5, statement6));
+
+        // Verify the end results of the query match the expected results.
+        final Set<RyaSubGraph> results = readAllResults(pcjId);
+        // Create the expected results of the SPARQL query once the PCJ has been
+        // computed.
+        RyaStatement statement7 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:travelsTo"), new RyaURI("urn:London"));
+        RyaStatement statement8 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Bob"));
+        RyaStatement statement9 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:travelsTo"), new RyaURI("urn:SanFrancisco"));
+        RyaStatement statement10 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Evan"));
+        statement7.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+        statement8.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+        statement9.setColumnVisibility("A&B&C".getBytes("UTF-8"));
+        statement10.setColumnVisibility("A&B&C".getBytes("UTF-8"));
+
+        final Set<RyaSubGraph> expectedResults = new HashSet<>();
+
+        RyaSubGraph subGraph1 = new RyaSubGraph(pcjId);
+        Set<RyaStatement> stmnts1 = new HashSet<>(Arrays.asList(statement7, statement8));
+        subGraph1.setStatements(stmnts1);
+        expectedResults.add(subGraph1);
+        
+        RyaSubGraph subGraph2 = new RyaSubGraph(pcjId);
+        Set<RyaStatement> stmnts2 = new HashSet<>(Arrays.asList(statement9, statement10));
+        subGraph2.setStatements(stmnts2);
+        expectedResults.add(subGraph2);
+
+        ConstructGraphTestUtils.subGraphsEqualIgnoresTimestamp(expectedResults, results);
+    }
+    
+    @Test
+    public void constructQueryWithBlankNodesAndMultipleSubGraphs() throws Exception {
+        // A query that groups what is aggregated by one of the keys.
+        final String sparql = "CONSTRUCT { _:b <urn:travelsTo> ?city . _:b <urn:friendsWith> ?worker }" + "WHERE { "
+                + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> ?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}";
+
+        // Create the Statements that will be loaded into Rya.
+        RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:talksTo"), new RyaURI("urn:Bob"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:livesIn"), new RyaURI("urn:London"));
+        RyaStatement statement3 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack"));
+        RyaStatement statement4 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:talksTo"), new RyaURI("urn:Evan"));
+        RyaStatement statement5 = new RyaStatement(new RyaURI("urn:Evan"), new RyaURI("urn:livesIn"), new RyaURI("urn:SanFrancisco"));
+        RyaStatement statement6 = new RyaStatement(new RyaURI("urn:Evan"), new RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack"));
+        statement1.setColumnVisibility("U&W".getBytes("UTF-8"));
+        statement2.setColumnVisibility("V".getBytes("UTF-8"));
+        statement3.setColumnVisibility("W".getBytes("UTF-8"));
+        statement4.setColumnVisibility("A&B".getBytes("UTF-8"));
+        statement5.setColumnVisibility("B".getBytes("UTF-8"));
+        statement6.setColumnVisibility("C".getBytes("UTF-8"));
+        
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadRyaStatements(sparql, Arrays.asList(statement1, statement2, statement3, statement4, statement5, statement6));
+
+        // Verify the end results of the query match the expected results.
+        final Set<RyaSubGraph> results = readAllResults(pcjId);
+        // Create the expected results of the SPARQL query once the PCJ has been
+        // computed.
+        RyaStatement statement7 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:travelsTo"), new RyaURI("urn:London"));
+        RyaStatement statement8 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Bob"));
+        RyaStatement statement9 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:travelsTo"), new RyaURI("urn:SanFrancisco"));
+        RyaStatement statement10 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Evan"));
+        statement7.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+        statement8.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+        statement9.setColumnVisibility("A&B&C".getBytes("UTF-8"));
+        statement10.setColumnVisibility("A&B&C".getBytes("UTF-8"));
+
+        final Set<RyaSubGraph> expectedResults = new HashSet<>();
+
+        RyaSubGraph subGraph1 = new RyaSubGraph(pcjId);
+        Set<RyaStatement> stmnts1 = new HashSet<>(Arrays.asList(statement7, statement8));
+        subGraph1.setStatements(stmnts1);
+        expectedResults.add(subGraph1);
+        
+        RyaSubGraph subGraph2 = new RyaSubGraph(pcjId);
+        Set<RyaStatement> stmnts2 = new HashSet<>(Arrays.asList(statement9, statement10));
+        subGraph2.setStatements(stmnts2);
+        expectedResults.add(subGraph2);
+
+        ConstructGraphTestUtils.subGraphsEqualIgnoresBlankNode(expectedResults, results);
+    }
+    
+    protected KafkaConsumer<String, RyaSubGraph> makeRyaSubGraphConsumer(final String TopicName) {
+        // setup consumer
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName());
+
+        // to make sure the consumer starts from the beginning of the topic
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        final KafkaConsumer<String, RyaSubGraph> consumer = new KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Arrays.asList(TopicName));
+        return consumer;
+    }
+
+    private Set<RyaSubGraph> readAllResults(final String pcjId) throws Exception {
+        requireNonNull(pcjId);
+
+        // Read all of the results from the Kafka topic.
+        final Set<RyaSubGraph> results = new HashSet<>();
+
+        try (final KafkaConsumer<String, RyaSubGraph> consumer = makeRyaSubGraphConsumer(pcjId)) {
+            final ConsumerRecords<String, RyaSubGraph> records = consumer.poll(5000);
+            final Iterator<ConsumerRecord<String, RyaSubGraph>> recordIterator = records.iterator();
+            while (recordIterator.hasNext()) {
+                results.add(recordIterator.next().value());
+            }
+        }
+
+        return results;
+    }
+    
+    protected String loadStatements(final String sparql, final Collection<Statement> statements) throws Exception {
+        return loadRyaStatements(sparql, statements.stream().map(x -> RdfToRyaConversions.convertStatement(x)).collect(Collectors.toSet()));
+    }
+    
+
+    protected String loadRyaStatements(final String sparql, final Collection<RyaStatement> statements) throws Exception {
+        requireNonNull(sparql);
+        requireNonNull(statements);
+        FluoClient client = null;
+
+        try {
+            CreatePcj createPcj = new CreatePcj();
+            client = new FluoClientImpl(super.getFluoConfiguration());
+            FluoQuery fluoQuery = createPcj.createFluoPcj(client, sparql);
+
+            AccumuloRyaDAO dao = getRyaDAO();
+            dao.add(statements.iterator());
+
+            // Wait for the Fluo application to finish computing the end result.
+            super.getMiniFluo().waitForObservers();
+
+            // FluoITHelper.printFluoTable(client);
+            return fluoQuery.getConstructQueryMetadata().get().getNodeId();
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java
index cf9dfef..2fbe334 100644
--- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java
+++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java
@@ -44,7 +44,6 @@ class FunctionAdapter implements Function {
 
     @Override
     public Value evaluate(ValueFactory valueFactory, Value... args) throws ValueExprEvaluationException {
-        System.out.println("Evaluate: Valuefactory=" + valueFactory);
         // need a Adapter for org.eclipse.rdf4j.model.ValueFactory
         org.eclipse.rdf4j.model.ValueFactory rdf4jValueFactory = org.eclipse.rdf4j.model.impl.SimpleValueFactory.getInstance();
         // org.eclipse.rdf4j.model.ValueFactory rdf4jValueFactory = new ValueFactoryAdapter(valueFactory);
@@ -61,7 +60,6 @@ class FunctionAdapter implements Function {
             org.eclipse.rdf4j.model.Literal vLiteral = (org.eclipse.rdf4j.model.Literal) v;
             org.openrdf.model.URI vType = valueFactory.createURI(vLiteral.getDatatype().stringValue());
             org.openrdf.model.Literal theReturnValue = valueFactory.createLiteral(vLiteral.getLabel(), vType);
-            System.out.println("Function RETURNS:" + theReturnValue + " class:" + theReturnValue.getClass() + " rdf4j=" + v + " class:" + v.getClass());
             return theReturnValue;
         }
         //


[3/4] incubator-rya git commit: RYA-273-Construct Query Support. Closes #161.

Posted by ca...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
new file mode 100644
index 0000000..797502c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
@@ -0,0 +1,39 @@
+package org.apache.rya.indexing.pcj.fluo.app.export;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+
+/**
+ * Incrementally exports {@link RyaSubGraph}s that are generated by SPARQL Construct Queries
+ * from the Rya-Fluo application to the core Rya tables.
+ *
+ */
+public interface IncrementalRyaSubGraphExporter extends AutoCloseable {
+
+    /**
+     * Export a RyaSubGraph that is the result of SPARQL Construct Query.
+     *
+     * @param constructID - The Fluo Id of the construct query the created the RyaSubGraph
+     * @param subgraph - The RyaSubGraph to export (non-null)
+     * @throws ResultExportException The result could not be exported.
+     */
+    public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
new file mode 100644
index 0000000..ecbec09
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
@@ -0,0 +1,47 @@
+package org.apache.rya.indexing.pcj.fluo.app.export;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+
+import com.google.common.base.Optional;
+
+/**
+ * Builds instances of {@link IncrementalRyaSubGraphExporter} using the provided
+ * configurations.
+ */
+public interface IncrementalRyaSubGraphExporterFactory {
+
+    /**
+     * Builds an instance of {@link IncrementalRyaSubGraphExporter} using the
+     * configurations that are provided.
+     *
+     * @param context - Contains the host application's configuration values
+     *   and any parameters that were provided at initialization. (not null)
+     * @return An exporter if configurations were found in the context; otherwise absent.
+     * @throws IncrementalExporterFactoryException A non-configuration related
+     *   problem has occurred and the exporter could not be created as a result.
+     * @throws ConfigurationException Thrown if configuration values were
+     *   provided, but an instance of the exporter could not be initialized
+     *   using them. This could be because they were improperly formatted,
+     *   a required field was missing, or some other configuration based problem.
+     */
+    public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
new file mode 100644
index 0000000..152d156
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * Incrementally exports SPARQL query results to Kafka topics.
+ */
+public class KafkaBindingSetExporter implements IncrementalBindingSetExporter {
+    private static final Logger log = Logger.getLogger(KafkaBindingSetExporter.class);
+
+    private final KafkaProducer<String, VisibilityBindingSet> producer;
+
+    /**
+     * Constructs an instance given a Kafka producer.
+     *
+     * @param producer
+     *            for sending result set alerts to a broker. (not null)
+     *            Can be created and configured by {@link KafkaBindingSetExporterFactory}
+     */
+    public KafkaBindingSetExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
+        super();
+        checkNotNull(producer, "Producer is required.");
+        this.producer = producer;
+    }
+
+    /**
+     * Send the results to the topic using the queryID as the topicname
+     */
+    @Override
+    public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
+        checkNotNull(fluoTx);
+        checkNotNull(queryId);
+        checkNotNull(result);
+        try {
+            final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
+            final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
+            log.trace(msg);
+
+            // Send the result to the topic whose name matches the PCJ ID.
+            final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result);
+            final Future<RecordMetadata> future = producer.send(rec);
+
+            // Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
+            future.get();
+
+            log.debug("producer.send(rec) completed");
+
+        } catch (final Throwable e) {
+            throw new ResultExportException("A result could not be exported to Kafka.", e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        producer.close(5, TimeUnit.SECONDS);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
new file mode 100644
index 0000000..5507037
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import com.google.common.base.Optional;
+
+/**
+ * Creates instances of {@link KafkaBindingSetExporter}.
+ * <p/>
+ * Configure a Kafka producer by adding several required Key/values as described here:
+ * http://kafka.apache.org/documentation.html#producerconfigs
+ * <p/>
+ * Here is a simple example:
+ * <pre>
+ *     Properties producerConfig = new Properties();
+ *     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ *     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ *     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ * </pre>
+ * 
+ * @see ProducerConfig
+ */
+public class KafkaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory {
+    private static final Logger log = Logger.getLogger(KafkaBindingSetExporterFactory.class);
+    @Override
+    public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+        final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
+        log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
+        if (exportParams.isExportToKafka()) {
+            // Setup Kafka connection
+            KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig());
+            // Create the exporter
+            final IncrementalBindingSetExporter exporter = new KafkaBindingSetExporter(producer);
+            return Optional.of(exporter);
+        } else {
+            return Optional.absent();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
deleted file mode 100644
index 72ec947..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-/**
- * Incrementally exports SPARQL query results to Kafka topics.
- */
-public class KafkaResultExporter implements IncrementalResultExporter {
-    private static final Logger log = Logger.getLogger(KafkaResultExporter.class);
-
-    private final KafkaProducer<String, VisibilityBindingSet> producer;
-
-    /**
-     * Constructs an instance given a Kafka producer.
-     *
-     * @param producer
-     *            for sending result set alerts to a broker. (not null)
-     *            Can be created and configured by {@link KafkaResultExporterFactory}
-     */
-    public KafkaResultExporter(final KafkaProducer<String, VisibilityBindingSet> producer) {
-        super();
-        checkNotNull(producer, "Producer is required.");
-        this.producer = producer;
-    }
-
-    /**
-     * Send the results to the topic using the queryID as the topicname
-     */
-    @Override
-    public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
-        checkNotNull(fluoTx);
-        checkNotNull(queryId);
-        checkNotNull(result);
-        try {
-            final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
-            final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
-            log.trace(msg);
-
-            // Send the result to the topic whose name matches the PCJ ID.
-            final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result);
-            final Future<RecordMetadata> future = producer.send(rec);
-
-            // Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
-            future.get();
-
-            log.debug("producer.send(rec) completed");
-
-        } catch (final Throwable e) {
-            throw new ResultExportException("A result could not be exported to Kafka.", e);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        producer.close(5, TimeUnit.SECONDS);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
deleted file mode 100644
index 995e9d9..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
-
-import org.apache.fluo.api.observer.Observer.Context;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-import com.google.common.base.Optional;
-
-/**
- * Creates instances of {@link KafkaResultExporter}.
- * <p/>
- * Configure a Kafka producer by adding several required Key/values as described here:
- * http://kafka.apache.org/documentation.html#producerconfigs
- * <p/>
- * Here is a simple example:
- * <pre>
- *     Properties producerConfig = new Properties();
- *     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- *     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- *     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- * </pre>
- * 
- * @see ProducerConfig
- */
-public class KafkaResultExporterFactory implements IncrementalResultExporterFactory {
-    private static final Logger log = Logger.getLogger(KafkaResultExporterFactory.class);
-    @Override
-    public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
-        final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
-        log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
-        if (exportParams.isExportToKafka()) {
-            // Setup Kafka connection
-            KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig());
-            // Create the exporter
-            final IncrementalResultExporter exporter = new KafkaResultExporter(producer);
-            return Optional.of(exporter);
-        } else {
-            return Optional.absent();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
new file mode 100644
index 0000000..a15743f
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
@@ -0,0 +1,81 @@
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application 
+ *
+ */
+public class KafkaRyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
+
+    private final KafkaProducer<String, RyaSubGraph> producer;
+    private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporter.class);
+
+    public KafkaRyaSubGraphExporter(KafkaProducer<String, RyaSubGraph> producer) {
+        checkNotNull(producer);
+        this.producer = producer;
+    }
+    
+    /**
+     * Exports the RyaSubGraph to a Kafka topic equivalent to the result returned by {@link RyaSubGraph#getId()}
+     * @param subgraph - RyaSubGraph exported to Kafka
+     * @param contructID - rowID of result that is exported. Used for logging purposes.
+     */
+    @Override
+    public void export(String constructID, RyaSubGraph subGraph) throws ResultExportException {
+        checkNotNull(constructID);
+        checkNotNull(subGraph);
+        try {
+            // Send the result to the topic whose name matches the PCJ ID.
+            final ProducerRecord<String, RyaSubGraph> rec = new ProducerRecord<>(subGraph.getId(), subGraph);
+            final Future<RecordMetadata> future = producer.send(rec);
+
+            // Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
+            future.get();
+
+            log.debug("Producer successfully sent record with id: " + constructID + " and statements: " + subGraph.getStatements());
+
+        } catch (final Throwable e) {
+            throw new ResultExportException("A result could not be exported to Kafka.", e);
+        }
+    }
+
+    /**
+     * Closes exporter.
+     */
+    @Override
+    public void close() throws Exception {
+        producer.close(5, TimeUnit.SECONDS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
new file mode 100644
index 0000000..2c1e4c0
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
@@ -0,0 +1,62 @@
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory;
+
+import com.google.common.base.Optional;
+
+/**
+ * Factory for creating {@link KafkaRyaSubGraphExporter}s that are used for
+ * exporting {@link RyaSubGraph}s from the Rya Fluo application to Kafka.
+ *
+ */
+public class KafkaRyaSubGraphExporterFactory implements IncrementalRyaSubGraphExporterFactory {
+
+    private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporterFactory.class);
+    
+    /**
+     * Builds a {@link KafkaRyaSubGraphExporter}.
+     * @param context - {@link Context} object used to pass configuration parameters
+     * @return an Optional consisting of an IncrementalSubGraphExproter if it can be constructed
+     * @throws IncrementalExporterFactoryException
+     * @throws ConfigurationException
+     */
+    @Override
+    public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+        final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
+        log.debug("KafkaRyaSubGraphExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
+        if (exportParams.isExportToKafka()) {
+            // Setup Kafka connection
+            KafkaProducer<String, RyaSubGraph> producer = new KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig());
+            // Create the exporter
+            final IncrementalRyaSubGraphExporter exporter = new KafkaRyaSubGraphExporter(producer);
+            return Optional.of(exporter);
+        } else {
+            return Optional.absent();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java
new file mode 100644
index 0000000..ed20e8a
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java
@@ -0,0 +1,100 @@
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.serialization.kryo.RyaSubGraphSerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for {@link RyaSubGraph}s.
+ *
+ */
+public class RyaSubGraphKafkaSerDe implements Serializer<RyaSubGraph>, Deserializer<RyaSubGraph> {
+
+    private final Kryo kryo;
+    
+    public RyaSubGraphKafkaSerDe() {
+        kryo = new Kryo();
+        kryo.register(RyaSubGraph.class,new RyaSubGraphSerializer());
+    }
+    
+  /**
+   * Deserializes from bytes to a RyaSubGraph
+   * @param bundleBytes - byte representation of RyaSubGraph
+   * @return - Deserialized RyaSubGraph
+   */
+    public RyaSubGraph fromBytes(byte[] bundleBytes) {
+        return kryo.readObject(new Input(bundleBytes), RyaSubGraph.class);
+    }
+
+    /**
+     * Serializes RyaSubGraph to bytes
+     * @param bundle - RyaSubGraph to be serialized
+     * @return - serialized bytes from RyaSubGraph
+     */
+    public byte[] toBytes(RyaSubGraph bundle) {
+        Output output = new Output(new ByteArrayOutputStream());
+        kryo.writeObject(output, bundle, new RyaSubGraphSerializer());
+        return output.getBuffer();
+    }
+
+    /**
+     * Deserializes RyaSubGraph
+     * @param topic - topic that data is associated with (no effect) 
+     * @param bundleBytes - bytes to be deserialized
+     * @return - deserialized RyaSubGraph
+     */
+    @Override
+    public RyaSubGraph deserialize(String topic, byte[] bundleBytes) {
+        return fromBytes(bundleBytes);
+    }
+
+    /**
+     * Serializes RyaSubGraph
+     * @param subgraph - subgraph to be serialized
+     * @param topic - topic that data is associated with
+     * @return - serialized bytes from subgraph
+     */
+    @Override
+    public byte[] serialize(String topic, RyaSubGraph subgraph) {
+        return toBytes(subgraph);
+    }
+
+    /**
+     * Closes serializer (no effect)
+     */
+    @Override
+    public void close() {
+    }
+    
+    /**
+     * Configures serializer (no effect)
+     */
+    @Override
+    public void configure(Map<String, ?> arg0, boolean arg1) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
new file mode 100644
index 0000000..84d3ce6
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collections;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
+ */
+public class RyaBindingSetExporter implements IncrementalBindingSetExporter {
+
+    private final PrecomputedJoinStorage pcjStorage;
+
+    /**
+     * Constructs an instance of {@link RyaBindingSetExporter}.
+     *
+     * @param pcjStorage - The PCJ storage the new results will be exported to. (not null)
+     */
+    public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage) {
+        this.pcjStorage = checkNotNull(pcjStorage);
+    }
+
+    @Override
+    public void export(
+            final TransactionBase fluoTx,
+            final String queryId,
+            final VisibilityBindingSet result) throws ResultExportException {
+        requireNonNull(fluoTx);
+        requireNonNull(queryId);
+        requireNonNull(result);
+
+        // Look up the ID the PCJ represents within the PCJ Storage.
+        final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
+
+        try {
+            pcjStorage.addResults(pcjId, Collections.singleton(result));
+        } catch (final PCJStorageException e) {
+            throw new ResultExportException("A result could not be exported to Rya.", e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        pcjStorage.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
new file mode 100644
index 0000000..86d593f
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+
+import com.google.common.base.Optional;
+
+import org.apache.fluo.api.observer.Observer.Context;
+
+/**
+ * Creates instances of {@link RyaBindingSetExporter}.
+ */
+public class RyaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory {
+
+    @Override
+    public Optional<IncrementalBindingSetExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+        checkNotNull(context);
+
+        // Wrap the context's parameters for parsing.
+        final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() );
+
+        if(params.isExportToRya()) {
+            // Setup Zookeeper connection info.
+            final String accumuloInstance = params.getAccumuloInstanceName().get();
+            final String zookeeperServers =  params.getZookeeperServers().get().replaceAll(";", ",");
+            final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers);
+
+            try {
+                // Setup Accumulo connection info.
+                final String exporterUsername = params.getExporterUsername().get();
+                final String exporterPassword = params.getExporterPassword().get();
+                final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword));
+
+                // Setup Rya PCJ Storage.
+                final String ryaInstanceName = params.getRyaInstanceName().get();
+                final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
+
+                // Make the exporter.
+                final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage);
+                return Optional.of(exporter);
+
+            } catch (final AccumuloException | AccumuloSecurityException e) {
+                throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e);
+            }
+        } else {
+            return Optional.absent();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
index cba6a43..a1ba5b8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
@@ -45,6 +45,7 @@ public class RyaExportParameters extends ParametersBase {
     public static final String CONF_EXPORTER_PASSWORD = "pcj.fluo.export.rya.exporterPassword";
 
     public static final String CONF_RYA_INSTANCE_NAME = "pcj.fluo.export.rya.ryaInstanceName";
+    public static final String CONF_FLUO_APP_NAME = "pcj.fluo.export.rya.fluo.application.name";
 
     /**
      * Constructs an instance of {@link RyaExportParameters}.
@@ -147,4 +148,18 @@ public class RyaExportParameters extends ParametersBase {
     public Optional<String> getExporterPassword() {
         return Optional.fromNullable( params.get(CONF_EXPORTER_PASSWORD) );
     }
+    
+    /**
+     * @param fluoApplicationName - The name of the Rya Fluo application
+     */
+    public void setFluoApplicationName(@Nullable final String fluoApplicationName) {
+        params.put(CONF_FLUO_APP_NAME, fluoApplicationName);
+    }
+    
+    /**
+     * @return The name of the Rya Fluo application
+     */
+    public Optional<String> getFluoApplicationName() {
+        return Optional.fromNullable(params.get(CONF_FLUO_APP_NAME));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
deleted file mode 100644
index b8b3c45..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.rya;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.Objects.requireNonNull;
-
-import java.util.Collections;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-/**
- * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
- */
-public class RyaResultExporter implements IncrementalResultExporter {
-
-    private final PrecomputedJoinStorage pcjStorage;
-
-    /**
-     * Constructs an instance of {@link RyaResultExporter}.
-     *
-     * @param pcjStorage - The PCJ storage the new results will be exported to. (not null)
-     */
-    public RyaResultExporter(final PrecomputedJoinStorage pcjStorage) {
-        this.pcjStorage = checkNotNull(pcjStorage);
-    }
-
-    @Override
-    public void export(
-            final TransactionBase fluoTx,
-            final String queryId,
-            final VisibilityBindingSet result) throws ResultExportException {
-        requireNonNull(fluoTx);
-        requireNonNull(queryId);
-        requireNonNull(result);
-
-        // Look up the ID the PCJ represents within the PCJ Storage.
-        final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
-
-        try {
-            pcjStorage.addResults(pcjId, Collections.singleton(result));
-        } catch (final PCJStorageException e) {
-            throw new ResultExportException("A result could not be exported to Rya.", e);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        pcjStorage.close();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
deleted file mode 100644
index c695272..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.rya;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.observer.Observer.Context;
-
-/**
- * Creates instances of {@link RyaResultExporter}.
- */
-public class RyaResultExporterFactory implements IncrementalResultExporterFactory {
-
-    @Override
-    public Optional<IncrementalResultExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException {
-        checkNotNull(context);
-
-        // Wrap the context's parameters for parsing.
-        final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() );
-
-        if(params.isExportToRya()) {
-            // Setup Zookeeper connection info.
-            final String accumuloInstance = params.getAccumuloInstanceName().get();
-            final String zookeeperServers =  params.getZookeeperServers().get().replaceAll(";", ",");
-            final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers);
-
-            try {
-                // Setup Accumulo connection info.
-                final String exporterUsername = params.getExporterUsername().get();
-                final String exporterPassword = params.getExporterPassword().get();
-                final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword));
-
-                // Setup Rya PCJ Storage.
-                final String ryaInstanceName = params.getRyaInstanceName().get();
-                final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
-
-                // Make the exporter.
-                final IncrementalResultExporter exporter = new RyaResultExporter(pcjStorage);
-                return Optional.of(exporter);
-
-            } catch (final AccumuloException | AccumuloSecurityException e) {
-                throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e);
-            }
-        } else {
-            return Optional.absent();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index dc4b3b4..ac131e3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -26,11 +26,13 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructQueryResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
@@ -57,6 +59,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
     private final FilterResultUpdater filterUpdater = new FilterResultUpdater();
     private final QueryResultUpdater queryUpdater = new QueryResultUpdater();
     private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater();
+    private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater();
 
     @Override
     public abstract ObservedColumn getObservedColumn();
@@ -102,6 +105,15 @@ public abstract class BindingSetUpdater extends AbstractObserver {
                 }
                 break;
 
+            case CONSTRUCT: 
+                final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId);
+                try{
+                    constructUpdater.updateConstructQueryResults(tx, observedBindingSet, constructQuery);
+                } catch (final Exception e) {
+                    throw new RuntimeException("Could not process a Query node.", e);
+                }
+                break;
+                
             case FILTER:
                 final FilterMetadata parentFilter = queryDao.readFilterMetadata(tx, parentNodeId);
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
new file mode 100644
index 0000000..f0fef07
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -0,0 +1,198 @@
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.resolver.triple.TripleRow;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
+ * Construct Query {@link RyaStatement}s and exports the results using the
+ * {@link IncrementalRyaSubGraphExporter}s that are registered with this
+ * Observer.
+ *
+ */
+public class ConstructQueryResultObserver extends AbstractObserver {
+
+    private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver();
+    private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class);
+    private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe();
+
+    /**
+     * We expect to see the same expressions a lot, so we cache the simplified
+     * forms.
+     */
+    private final Map<String, String> simplifiedVisibilities = new HashMap<>();
+
+    /**
+     * Builders for each type of result exporter we support.
+     */
+    private static final ImmutableSet<IncrementalRyaSubGraphExporterFactory> factories = ImmutableSet
+            .<IncrementalRyaSubGraphExporterFactory> builder().add(new KafkaRyaSubGraphExporterFactory()).build();
+
+    /**
+     * The exporters that are configured.
+     */
+    private ImmutableSet<IncrementalRyaSubGraphExporter> exporters = null;
+
+    /**
+     * Before running, determine which exporters are configured and set them up.
+     */
+    @Override
+    public void init(final Context context) {
+        final ImmutableSet.Builder<IncrementalRyaSubGraphExporter> exportersBuilder = ImmutableSet.builder();
+
+        for (final IncrementalRyaSubGraphExporterFactory builder : factories) {
+            try {
+                log.debug("ConstructQueryResultObserver.init(): for each exportersBuilder=" + builder);
+
+                final Optional<IncrementalRyaSubGraphExporter> exporter = builder.build(context);
+                if (exporter.isPresent()) {
+                    exportersBuilder.add(exporter.get());
+                }
+            } catch (final IncrementalExporterFactoryException e) {
+                log.error("Could not initialize a result exporter.", e);
+            }
+        }
+
+        exporters = exportersBuilder.build();
+    }
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+        return new ObservedColumn(FluoQueryColumns.CONSTRUCT_STATEMENTS, NotificationType.STRONG);
+    }
+
+    @Override
+    public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+        Bytes bytes = tx.get(row, col);
+        RyaSubGraph subgraph = serializer.fromBytes(bytes.toArray());
+        Set<RyaStatement> statements = subgraph.getStatements();
+        if (statements.size() > 0) {
+            byte[] visibility = statements.iterator().next().getColumnVisibility();
+            visibility = simplifyVisibilities(visibility);
+            for(RyaStatement statement: statements) {
+                statement.setColumnVisibility(visibility);
+            }
+            subgraph.setStatements(statements);
+
+            for (IncrementalRyaSubGraphExporter exporter : exporters) {
+                exporter.export(row.toString(), subgraph);
+            }
+        }
+        //add generated triples back into Fluo for chaining queries together
+        insertTriples(tx, subgraph.getStatements());
+    }
+    
+    @Override
+    public void close() {
+        if(exporters != null) {
+            for(final IncrementalRyaSubGraphExporter exporter : exporters) {
+                try {
+                    exporter.close();
+                } catch(final Exception e) {
+                    log.warn("Problem encountered while closing one of the exporters.", e);
+                }
+            }
+        }
+    }
+
+    private byte[] simplifyVisibilities(byte[] visibilityBytes) throws UnsupportedEncodingException {
+        // Simplify the result's visibilities and cache new simplified
+        // visibilities
+        String visibility = new String(visibilityBytes, "UTF-8");
+        if (!simplifiedVisibilities.containsKey(visibility)) {
+            String simplified = VisibilitySimplifier.simplify(visibility);
+            simplifiedVisibilities.put(visibility, simplified);
+        }
+        return simplifiedVisibilities.get(visibility).getBytes("UTF-8");
+    }
+    
+    private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) {
+
+        for (final RyaStatement triple : triples) {
+            Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility());
+            try {
+                tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
+            } catch (final TripleRowResolverException e) {
+                log.error("Could not convert a Triple into the SPO format: " + triple);
+            }
+        }
+    }
+    
+
+    /**
+     * Converts a triple into a byte[] holding the Rya SPO representation of it.
+     *
+     * @param triple - The triple to convert. (not null)
+     * @return The Rya SPO representation of the triple.
+     * @throws TripleRowResolverException The triple could not be converted.
+     */
+    public static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException {
+        checkNotNull(triple);
+        final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple);
+        final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
+        return spoRow.getRow();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 28c92af..b675ba7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -30,12 +30,12 @@ import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.utils.VisibilitySimplifier;
 import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
@@ -58,16 +58,16 @@ public class QueryResultObserver extends AbstractObserver {
     /**
      * Builders for each type of result exporter we support.
      */
-    private static final ImmutableSet<IncrementalResultExporterFactory> factories =
-            ImmutableSet.<IncrementalResultExporterFactory>builder()
-                .add(new RyaResultExporterFactory())
-                .add(new KafkaResultExporterFactory())
+    private static final ImmutableSet<IncrementalBindingSetExporterFactory> factories =
+            ImmutableSet.<IncrementalBindingSetExporterFactory>builder()
+                .add(new RyaBindingSetExporterFactory())
+                .add(new KafkaBindingSetExporterFactory())
                 .build();
 
     /**
      * The exporters that are configured.
      */
-    private ImmutableSet<IncrementalResultExporter> exporters = null;
+    private ImmutableSet<IncrementalBindingSetExporter> exporters = null;
 
     @Override
     public ObservedColumn getObservedColumn() {
@@ -79,13 +79,13 @@ public class QueryResultObserver extends AbstractObserver {
      */
     @Override
     public void init(final Context context) {
-        final ImmutableSet.Builder<IncrementalResultExporter> exportersBuilder = ImmutableSet.builder();
-
-        for(final IncrementalResultExporterFactory builder : factories) {
-        	log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder);
+        final ImmutableSet.Builder<IncrementalBindingSetExporter> exportersBuilder = ImmutableSet.builder();
 
+        for(final IncrementalBindingSetExporterFactory builder : factories) {
             try {
-                final Optional<IncrementalResultExporter> exporter = builder.build(context);
+                log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder);
+
+                final Optional<IncrementalBindingSetExporter> exporter = builder.build(context);
                 if(exporter.isPresent()) {
                     exportersBuilder.add(exporter.get());
                 }
@@ -117,7 +117,7 @@ public class QueryResultObserver extends AbstractObserver {
         result.setVisibility( simplifiedVisibilities.get(visibility) );
 
         // Export the result using each of the provided exporters.
-        for(final IncrementalResultExporter exporter : exporters) {
+        for(final IncrementalBindingSetExporter exporter : exporters) {
             try {
                 exporter.export(tx, queryId, result);
             } catch (final ResultExportException e) {
@@ -129,7 +129,7 @@ public class QueryResultObserver extends AbstractObserver {
     @Override
     public void close() {
         if(exporters != null) {
-            for(final IncrementalResultExporter exporter : exporters) {
+            for(final IncrementalBindingSetExporter exporter : exporters) {
                 try {
                     exporter.close();
                 } catch(final Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
new file mode 100644
index 0000000..e836c5d
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
@@ -0,0 +1,192 @@
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Metadata object used to store metadata for Construct Query Nodes found in
+ * SPARQL queries.
+ *
+ */
+public class ConstructQueryMetadata extends CommonNodeMetadata {
+
+    private String childNodeId;
+    private ConstructGraph graph;
+    private String sparql;
+
+    /**
+     * Creates ConstructQueryMetadata object from the provided metadata arguments.
+     * @param nodeId - id for the ConstructQueryNode
+     * @param childNodeId - id for the child of the ConstructQueryNode
+     * @param graph - {@link ConstructGraph} used to project {@link BindingSet}s onto sets of statement representing construct graph
+     * @param sparql - SPARQL query containing construct graph
+     */
+    public ConstructQueryMetadata(String nodeId, String childNodeId, ConstructGraph graph, String sparql) {
+        super(nodeId, new VariableOrder("subject", "predicate", "object"));
+        Preconditions.checkNotNull(childNodeId);
+        Preconditions.checkNotNull(graph);
+        Preconditions.checkNotNull(sparql);
+        this.childNodeId = childNodeId;
+        this.graph = graph;
+        this.sparql = sparql;
+    }
+
+    /**
+     * @return sparql query string representing this construct query
+     */
+    public String getSparql() {
+        return sparql;
+    }
+
+    /**
+     * @return The node whose results are projected onto the given
+     *         {@link ConstructGraph}.
+     */
+    public String getChildNodeId() {
+        return childNodeId;
+    }
+
+    /**
+     * @return The ConstructGraph used to form statement {@link BindingSet}s for
+     *         this Construct Query
+     */
+    public ConstructGraph getConstructGraph() {
+        return graph;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), childNodeId, graph, sparql);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o instanceof ConstructQueryMetadata) {
+            ConstructQueryMetadata queryMetadata = (ConstructQueryMetadata) o;
+            if (super.equals(queryMetadata)) {
+                return new EqualsBuilder().append(childNodeId, queryMetadata.childNodeId).append(graph, queryMetadata.graph)
+                        .append(sparql, queryMetadata.sparql).isEquals();
+            }
+            return false;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Construct Query Metadata {\n").append("    Node ID: " + super.getNodeId() + "\n")
+                .append("    SPARQL QUERY: " + sparql + "\n").append("    Variable Order: " + super.getVariableOrder() + "\n")
+                .append("    Child Node ID: " + childNodeId + "\n").append("    Construct Graph: " + graph.getProjections() + "\n")
+                .append("}").toString();
+    }
+
+    /**
+     * Creates a new {@link Builder} for this class.
+     *
+     * @return A new {@link Builder} for this class.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builds instances of {@link QueryMetadata}.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static final class Builder {
+
+
+        private String nodeId;
+        private ConstructGraph graph;
+        private String childNodeId;
+        private String sparql;
+
+        /**
+         * Set the node Id that identifies this Construct Query Node
+         * 
+         * @param nodeId
+         *            id for this node
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setNodeId(String nodeId) {
+            this.nodeId = nodeId;
+            return this;
+        }
+        
+        /**
+         * Set the SPARQL String representing this construct query
+         * @param SPARQL string representing this construct query
+         */
+        public Builder setSparql(String sparql) {
+            this.sparql = sparql;
+            return this;
+        }
+
+        /**
+         * Set the ConstructGraph used to form statement {@link BindingSet}s for
+         * this Construct Query
+         *
+         * @param varOrder
+         *            - ConstructGraph to project {@link BindingSet}s onto RDF
+         *            statements
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setConstructGraph(ConstructGraph graph) {
+            this.graph = graph;
+            return this;
+        }
+
+        /**
+         * Set the node whose results are projected onto the given
+         * {@link ConstructGraph}.
+         *
+         * @param childNodeId
+         *            - The node whose results are projected onto the given
+         *            {@link ConstructGraph}.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setChildNodeId(String childNodeId) {
+            this.childNodeId = childNodeId;
+            return this;
+        }
+
+        /**
+         * @return An instance of {@link ConstructQueryMetadata} build using
+         *         this builder's values.
+         */
+        public ConstructQueryMetadata build() {
+            return new ConstructQueryMetadata(nodeId, childNodeId, graph, sparql);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index 3230a5d..a701052 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -43,11 +44,14 @@ import net.jcip.annotations.Immutable;
 @DefaultAnnotation(NonNull.class)
 public class FluoQuery {
 
-    private final QueryMetadata queryMetadata;
+    private final Optional<QueryMetadata> queryMetadata;
+    private final Optional<ConstructQueryMetadata> constructMetadata;
     private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata;
     private final ImmutableMap<String, FilterMetadata> filterMetadata;
     private final ImmutableMap<String, JoinMetadata> joinMetadata;
     private final ImmutableMap<String, AggregationMetadata> aggregationMetadata;
+    private final QueryType type;
+    public static enum QueryType {Projection, Construct};
 
     /**
      * Constructs an instance of {@link FluoQuery}. Private because applications
@@ -67,21 +71,65 @@ public class FluoQuery {
             final QueryMetadata queryMetadata,
             final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
             final ImmutableMap<String, FilterMetadata> filterMetadata,
+            final ImmutableMap<String, JoinMetadata> joinMetadata, 
+            final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
+                this.aggregationMetadata = requireNonNull(aggregationMetadata);
+        this.queryMetadata = Optional.of(requireNonNull(queryMetadata));
+        this.constructMetadata = Optional.absent();
+        this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
+        this.filterMetadata = requireNonNull(filterMetadata);
+        this.joinMetadata = requireNonNull(joinMetadata);
+        this.type = QueryType.Projection;
+    }
+    
+    
+    /**
+     * Constructs an instance of {@link FluoQuery}. Private because applications
+     * must use {@link Builder} instead.
+     *
+     * @param constructMetadata - The root node of a query that is updated in Fluo. (not null)
+     * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as
+     *   it is represented within the Fluo app. (not null)
+     * @param filterMetadata A map from Node ID to Filter metadata as it is represented
+     *   within the Fluo app. (not null)
+     * @param joinMetadata A map from Node ID to Join metadata as it is represented
+     *   within the Fluo app. (not null)
+     * @param aggregationMetadata - A map from Node ID to Aggregation metadata as it is
+     *   represented within the Fluo app. (not null)
+     */
+    private FluoQuery(
+            final ConstructQueryMetadata constructMetadata,
+            final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
+            final ImmutableMap<String, FilterMetadata> filterMetadata,
             final ImmutableMap<String, JoinMetadata> joinMetadata,
             final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
-        this.queryMetadata = requireNonNull(queryMetadata);
+        this.constructMetadata = Optional.of(requireNonNull(constructMetadata));
+        this.queryMetadata = Optional.absent();
         this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
         this.filterMetadata = requireNonNull(filterMetadata);
         this.joinMetadata = requireNonNull(joinMetadata);
-        this.aggregationMetadata = requireNonNull(aggregationMetadata);
+        this.aggregationMetadata = aggregationMetadata;
+        this.type = QueryType.Construct;
+    }
+    
+    /**
+     * Returns the {@link QueryType} of this query
+     * @return the QueryType of this query (either Construct or Projection}
+     */
+    public QueryType getQueryType() {
+        return type;
     }
 
     /**
      * @return Metadata about the root node of a query that is updated within the Fluo app.
      */
-    public QueryMetadata getQueryMetadata() {
+    public Optional<QueryMetadata> getQueryMetadata() {
         return queryMetadata;
     }
+    
+    public Optional<ConstructQueryMetadata> getConstructQueryMetadata() {
+        return constructMetadata;
+    }
 
     /**
      * Get a Statement Pattern node's metadata.
@@ -175,6 +223,7 @@ public class FluoQuery {
             final FluoQuery fluoQuery = (FluoQuery)o;
             return new EqualsBuilder()
                     .append(queryMetadata, fluoQuery.queryMetadata)
+                    .append(constructMetadata,  fluoQuery.constructMetadata)
                     .append(statementPatternMetadata, fluoQuery.statementPatternMetadata)
                     .append(filterMetadata, fluoQuery.filterMetadata)
                     .append(joinMetadata, fluoQuery.joinMetadata)
@@ -189,8 +238,13 @@ public class FluoQuery {
     public String toString() {
         final StringBuilder builder = new StringBuilder();
 
-        if(queryMetadata != null) {
-            builder.append( queryMetadata.toString() );
+        if(queryMetadata.isPresent()) {
+            builder.append( queryMetadata.get().toString() );
+            builder.append("\n");
+        }
+        
+        if(constructMetadata.isPresent()) {
+            builder.append( constructMetadata.get().toString() );
             builder.append("\n");
         }
 
@@ -231,6 +285,7 @@ public class FluoQuery {
     public static final class Builder {
 
         private QueryMetadata.Builder queryBuilder = null;
+        private ConstructQueryMetadata.Builder constructBuilder = null;
         private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>();
         private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>();
         private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>();
@@ -239,11 +294,11 @@ public class FluoQuery {
         /**
          * Sets the {@link QueryMetadata.Builder} that is used by this builder.
          *
-         * @param queryMetadata - The builder representing the query's results.
+         * @param queryBuilder - The builder representing the query's results.
          * @return This builder so that method invocation may be chained.
          */
-        public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryMetadata) {
-            this.queryBuilder = queryMetadata;
+        public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryBuilder) {
+            this.queryBuilder = queryBuilder;
             return this;
         }
 
@@ -253,6 +308,26 @@ public class FluoQuery {
         public Optional<QueryMetadata.Builder> getQueryBuilder() {
             return Optional.fromNullable( queryBuilder );
         }
+        
+        /**
+         * Sets the {@link ConstructQueryMetadata.Builder} that is used by this builder.
+         *
+         * @param constructBuilder
+         *            - The builder representing the query's results.
+         * @return This builder so that method invocation may be chained.
+         */
+        public Builder setConstructQueryMetadata(@Nullable final ConstructQueryMetadata.Builder constructBuilder) {
+            this.constructBuilder = constructBuilder;
+            return this;
+        }
+
+        /**
+         * @return The Construct Query metadata builder if one has been set.
+         */
+        public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder() {
+            return Optional.fromNullable( constructBuilder );
+        }
+        
 
         /**
          * Adds a new {@link StatementPatternMetadata.Builder} to this builder.
@@ -345,12 +420,14 @@ public class FluoQuery {
             requireNonNull(nodeId);
             return Optional.fromNullable( joinBuilders.get(nodeId) );
         }
+        
 
         /**
          * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder.
          */
         public FluoQuery build() {
-            final QueryMetadata queryMetadata = queryBuilder.build();
+            Preconditions.checkArgument(
+                    (queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null));
 
             final ImmutableMap.Builder<String, StatementPatternMetadata> spMetadata = ImmutableMap.builder();
             for(final Entry<String, StatementPatternMetadata.Builder> entry : spBuilders.entrySet()) {
@@ -372,7 +449,14 @@ public class FluoQuery {
                 aggregateMetadata.put(entry.getKey(), entry.getValue().build());
             }
 
-            return new FluoQuery(queryMetadata, spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+            if(queryBuilder != null) {
+                return new FluoQuery(queryBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+            }
+            //constructBuilder non-null in this case, but no need to check
+            else {
+                return new FluoQuery(constructBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+            }
+            
         }
     }
 }
\ No newline at end of file


[2/4] incubator-rya git commit: RYA-273-Construct Query Support. Closes #161.

Posted by ca...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 77d6a49..3396114 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -46,6 +46,18 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  *   </table>
  * </p>
  * <p>
+ *   <b>Construct Query Metadata</b>
+ *   <table border="1" style="width:100%">
+ *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:graph</td> <td>The construct graph used to project BindingSets to statements.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>constructMetadata:statements</td> <td>The RDF statements produced by this construct query node.</td> </tr>
+ *   </table>
+ * </p>
+ * <p>
  *   <b>Filter Metadata</b>
  *   <table border="1" style="width:100%">
  *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
@@ -104,6 +116,7 @@ public class FluoQueryColumns {
     public static final String JOIN_METADATA_CF = "joinMetadata";
     public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata";
     public static final String AGGREGATION_METADATA_CF = "aggregationMetadata";
+    public static final String CONSTRUCT_METADATA_CF = "constructMetadata";
 
     /**
      * New triples that have been added to Rya are written as a row in this
@@ -151,6 +164,14 @@ public class FluoQueryColumns {
     public static final Column QUERY_CHILD_NODE_ID = new Column(QUERY_METADATA_CF, "childNodeId");
     public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet");
 
+ // Construct Query Metadata columns.
+    public static final Column CONSTRUCT_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "nodeId");
+    public static final Column CONSTRUCT_VARIABLE_ORDER = new Column(CONSTRUCT_METADATA_CF, "variableOrder");
+    public static final Column CONSTRUCT_GRAPH = new Column(CONSTRUCT_METADATA_CF, "graph");
+    public static final Column CONSTRUCT_CHILD_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "childNodeId");
+    public static final Column CONSTRUCT_STATEMENTS = new Column(CONSTRUCT_METADATA_CF, "statements");
+    public static final Column CONSTRUCT_SPARQL = new Column(CONSTRUCT_METADATA_CF, "sparql");
+
     // Filter Metadata columns.
     public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, "nodeId");
     public static final Column FILTER_VARIABLE_ORDER = new Column(FILTER_METADATA_CF, "veriableOrder");
@@ -201,6 +222,18 @@ public class FluoQueryColumns {
                         QUERY_CHILD_NODE_ID)),
 
         /**
+         * The columns a {@link ConstructQueryMetadata} object's fields are stored within.
+         */
+        CONSTRUCT_COLUMNS(
+                Arrays.asList(CONSTRUCT_NODE_ID,
+                        CONSTRUCT_VARIABLE_ORDER,
+                        CONSTRUCT_GRAPH,
+                        CONSTRUCT_CHILD_NODE_ID,
+                        CONSTRUCT_SPARQL,
+                        CONSTRUCT_STATEMENTS)),
+
+        
+        /**
          * The columns a {@link FilterMetadata} object's fields are stored within.
          */
         FILTER_COLUMNS(

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index dfc3333..5e9d654 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -31,7 +31,8 @@ import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
@@ -39,6 +40,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -103,6 +105,59 @@ public class FluoQueryMetadataDAO {
     }
 
     /**
+     * Write an instance of {@link ConstructQueryMetadata} to the Fluo table.
+     *
+     * @param tx - The transaction that will be used to commit the metadata. (not null)
+     * @param metadata - The Construct Query node metadata that will be written to the table. (not null)
+     */
+    public void write(final TransactionBase tx, final ConstructQueryMetadata metadata) {
+        requireNonNull(tx);
+        requireNonNull(metadata);
+
+        final String rowId = metadata.getNodeId();
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_NODE_ID, rowId);
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, metadata.getChildNodeId() );
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_SPARQL, metadata.getSparql());
+        tx.set(rowId, FluoQueryColumns.CONSTRUCT_GRAPH, ConstructGraphSerializer.toConstructString(metadata.getConstructGraph()));
+    }
+
+    /**
+     * Read an instance of {@link ConstructQueryMetadata} from the Fluo table.
+     *
+     * @param sx - The snapshot that will be used to read the metadata . (not null)
+     * @param nodeId - The nodeId of the Construct Query node that will be read. (not null)
+     * @return The {@link ConstructQueryMetadata} that was read from table.
+     */
+    public ConstructQueryMetadata readConstructQueryMetadata(final SnapshotBase sx, final String nodeId) {
+        return readConstructQueryMetadataBuilder(sx, nodeId).build();
+    }
+
+    private ConstructQueryMetadata.Builder readConstructQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) {
+        requireNonNull(sx);
+        requireNonNull(nodeId);
+
+        // Fetch the values from the Fluo table.
+        final String rowId = nodeId;
+        final Map<Column, String> values = sx.gets(rowId, 
+                FluoQueryColumns.CONSTRUCT_GRAPH,
+                FluoQueryColumns.CONSTRUCT_SPARQL,
+                FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID);
+
+        final String graphString = values.get(FluoQueryColumns.CONSTRUCT_GRAPH);
+        final ConstructGraph graph = ConstructGraphSerializer.toConstructGraph(graphString);
+        final String childNodeId = values.get(FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID);
+        final String sparql = values.get(FluoQueryColumns.CONSTRUCT_SPARQL);
+
+        return ConstructQueryMetadata.builder()
+                .setNodeId(nodeId)
+                .setConstructGraph(graph)
+                .setSparql(sparql)
+                .setChildNodeId(childNodeId);
+    }
+    
+    
+    /**
      * Write an instance of {@link FilterMetadata} to the Fluo table.
      *
      * @param tx - The transaction that will be used to commit the metadata. (not null)
@@ -376,13 +431,25 @@ public class FluoQueryMetadataDAO {
         requireNonNull(tx);
         requireNonNull(query);
 
-        // Store the Query ID so that it may be looked up from the original SPARQL string.
-        final String sparql = query.getQueryMetadata().getSparql();
-        final String queryId = query.getQueryMetadata().getNodeId();
-        tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
-
         // Write the rest of the metadata objects.
-        write(tx, query.getQueryMetadata());
+        switch(query.getQueryType()) {
+        case Construct:
+            ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get();
+            // Store the Query ID so that it may be looked up from the original SPARQL string.
+            final String constructSparql = constructMetadata.getSparql();
+            final String constructQueryId = constructMetadata.getNodeId();
+            tx.set(Bytes.of(constructSparql), FluoQueryColumns.QUERY_ID, Bytes.of(constructQueryId));
+            write(tx, constructMetadata);
+            break;
+        case Projection:
+            QueryMetadata queryMetadata = query.getQueryMetadata().get();
+            // Store the Query ID so that it may be looked up from the original SPARQL string.
+            final String sparql = queryMetadata.getSparql();
+            final String queryId = queryMetadata.getNodeId();
+            tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
+            write(tx, queryMetadata);
+            break;
+        }
 
         for(final FilterMetadata filter : query.getFilterMetadata()) {
             write(tx, filter);
@@ -423,50 +490,62 @@ public class FluoQueryMetadataDAO {
         requireNonNull(childNodeId);
 
         final NodeType childType = NodeType.fromNodeId(childNodeId).get();
-        switch(childType) {
-            case QUERY:
-                // Add this node's metadata.
-                final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId);
-                builder.setQueryMetadata(queryBuilder);
-
-                // Add it's child's metadata.
-                addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId());
-                break;
-
-            case JOIN:
-                // Add this node's metadata.
-                final JoinMetadata.Builder joinBuilder = readJoinMetadataBuilder(sx, childNodeId);
-                builder.addJoinMetadata(joinBuilder);
-
-                // Add it's children's metadata.
-                final JoinMetadata joinMetadata = joinBuilder.build();
-                addChildMetadata(sx, builder, joinMetadata.getLeftChildNodeId());
-                addChildMetadata(sx, builder, joinMetadata.getRightChildNodeId());
-                break;
-
-            case FILTER:
-                // Add this node's metadata.
-                final FilterMetadata.Builder filterBuilder = readFilterMetadataBuilder(sx, childNodeId);
-                builder.addFilterMetadata(filterBuilder);
-
-                // Add it's child's metadata.
-                addChildMetadata(sx, builder, filterBuilder.build().getChildNodeId());
-                break;
-
-            case STATEMENT_PATTERN:
-                // Add this node's metadata.
-                final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId);
-                builder.addStatementPatternBuilder(spBuilder);
-                break;
-
-            case AGGREGATION:
-                // Add this node's metadata.
-                final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId);
-                builder.addAggregateMetadata(aggregationBuilder);
-
-                // Add it's child's metadata.
-                addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId());
-                break;
+        switch (childType) {
+        case QUERY:
+            // Add this node's metadata.
+            final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId);
+            Preconditions.checkArgument(!builder.getQueryBuilder().isPresent());
+            builder.setQueryMetadata(queryBuilder);
+
+            // Add it's child's metadata.
+            addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId());
+            break;
+
+        case CONSTRUCT:
+            final ConstructQueryMetadata.Builder constructBuilder = readConstructQueryMetadataBuilder(sx, childNodeId);
+            Preconditions.checkArgument(!builder.getQueryBuilder().isPresent());
+            builder.setConstructQueryMetadata(constructBuilder);
+            
+            // Add it's child's metadata.
+            addChildMetadata(sx, builder, constructBuilder.build().getChildNodeId());
+            break;
+
+        case AGGREGATION:
+            // Add this node's metadata.
+            final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId);
+            builder.addAggregateMetadata(aggregationBuilder);
+            
+            // Add it's child's metadata.
+            addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId());
+            break;
+            
+        case JOIN:
+            // Add this node's metadata.
+            final JoinMetadata.Builder joinBuilder = readJoinMetadataBuilder(sx, childNodeId);
+            builder.addJoinMetadata(joinBuilder);
+
+            // Add it's children's metadata.
+            final JoinMetadata joinMetadata = joinBuilder.build();
+            addChildMetadata(sx, builder, joinMetadata.getLeftChildNodeId());
+            addChildMetadata(sx, builder, joinMetadata.getRightChildNodeId());
+            break;
+
+        case FILTER:
+            // Add this node's metadata.
+            final FilterMetadata.Builder filterBuilder = readFilterMetadataBuilder(sx, childNodeId);
+            builder.addFilterMetadata(filterBuilder);
+
+            // Add it's child's metadata.
+            addChildMetadata(sx, builder, filterBuilder.build().getChildNodeId());
+            break;
+
+        case STATEMENT_PATTERN:
+            // Add this node's metadata.
+            final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId);
+            builder.addStatementPatternBuilder(spBuilder);
+            break;
+        default:
+            break;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index 064cfe8..23ac286 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -129,7 +129,7 @@ public class QueryMetadata extends CommonNodeMetadata {
     @DefaultAnnotation(NonNull.class)
     public static final class Builder {
 
-        private final String nodeId;
+        private String nodeId;
         private VariableOrder varOrder;
         private String sparql;
         private String childNodeId;
@@ -143,6 +143,7 @@ public class QueryMetadata extends CommonNodeMetadata {
             this.nodeId = checkNotNull(nodeId);
         }
 
+        
         /**
          * Set the variable order of binding sets that are emitted by this node.
          *

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 562470a..631ce60 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -19,7 +19,9 @@
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
@@ -28,6 +30,7 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -35,6 +38,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection;
 import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
@@ -42,21 +47,33 @@ import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.Aggregatio
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.BNodeImpl;
 import org.openrdf.query.algebra.AggregateOperator;
+import org.openrdf.query.algebra.BNodeGenerator;
 import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.ExtensionElem;
 import org.openrdf.query.algebra.Filter;
 import org.openrdf.query.algebra.Group;
 import org.openrdf.query.algebra.GroupElem;
 import org.openrdf.query.algebra.Join;
 import org.openrdf.query.algebra.LeftJoin;
+import org.openrdf.query.algebra.MultiProjection;
 import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
 import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.Reduced;
 import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.UnaryTupleOperator;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
 import org.openrdf.query.algebra.Var;
 import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
 import org.openrdf.query.parser.ParsedQuery;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -158,16 +175,18 @@ public class SparqlFluoQueryBuilder {
 
             // Create the prefix of the id. This makes it a little bit more human readable.
             String prefix;
-            if(node instanceof StatementPattern) {
+            if (node instanceof StatementPattern) {
                 prefix = SP_PREFIX;
-            } else if(node instanceof Filter) {
+            } else if (node instanceof Filter) {
                 prefix = FILTER_PREFIX;
-            } else if(node instanceof Join || node instanceof LeftJoin) {
+            } else if (node instanceof Join || node instanceof LeftJoin) {
                 prefix = JOIN_PREFIX;
-            } else if(node instanceof Projection) {
+            } else if (node instanceof Projection) {
                 prefix = QUERY_PREFIX;
             } else if(node instanceof Extension) {
                 prefix = AGGREGATION_PREFIX;
+            }  else if (node instanceof Reduced) {
+                prefix = CONSTRUCT_PREFIX;
             } else {
                 throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass());
             }
@@ -402,7 +421,7 @@ public class SparqlFluoQueryBuilder {
 
             final QueryModelNode child = node.getArg();
             if(child == null) {
-                throw new IllegalArgumentException("Filter arg connot be null.");
+                throw new IllegalArgumentException("Projection arg connot be null.");
             }
 
             final String childNodeId = nodeIds.getOrMakeId(child);
@@ -417,6 +436,60 @@ public class SparqlFluoQueryBuilder {
             // Walk to the next node.
             super.meet(node);
         }
+        
+        
+        public void meet(Reduced node) {
+            //create id, initialize ConstructQueryMetadata builder, register ConstructQueryMetadata 
+            //builder with FluoQueryBuilder, and add metadata that we currently have
+            final String constructId = nodeIds.getOrMakeId(node);
+            final ConstructQueryMetadata.Builder constructBuilder = ConstructQueryMetadata.builder();
+            constructBuilder.setNodeId(constructId);
+            fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
+            constructBuilder.setSparql(sparql);
+            
+            //get child node
+            QueryModelNode child = node.getArg();
+            Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection);
+            UnaryTupleOperator unary = (UnaryTupleOperator) child;
+            
+            //get ProjectionElemList to build ConstructGraph
+            final List<ProjectionElemList> projections = new ArrayList<>();
+            if(unary instanceof Projection) {
+                projections.add(((Projection) unary).getProjectionElemList());
+            } else {
+                projections.addAll(((MultiProjection)unary).getProjections());
+            }
+            
+            //get ExtensionElems to build ConstructGraph
+            QueryModelNode grandChild = unary.getArg();
+            Preconditions.checkArgument(grandChild instanceof Extension);
+            Extension extension = (Extension) grandChild;
+            final List<ExtensionElem> extensionElems = extension.getElements();
+            final ConstructGraph graph = getConstructGraph(projections, extensionElems);
+            constructBuilder.setConstructGraph(graph);
+            
+            //set child to the next node we care about in Fluo
+            //if Extension's arg is a Group node, then it is an Aggregation, so set child to Extension
+            //otherwise set child to Extension's child (only care about Extensions if they are Aggregations)
+            if(extension.getArg() instanceof Group) {
+                child = extension;
+            } else {
+                child = extension.getArg();
+            }
+            
+            //Set the child node in the ConstructQueryMetadataBuilder
+            String childNodeId = nodeIds.getOrMakeId(child);
+            constructBuilder.setChildNodeId(childNodeId);
+            
+            // Update the child node's metadata.
+            final Set<String> childVars = getVars((TupleExpr)child);
+            final VariableOrder childVarOrder = new VariableOrder(childVars);
+            setChildMetadata(childNodeId, childVarOrder, constructId);
+            
+            //fast forward visitor to next node we care about
+            child.visit(this);
+        }
+        
 
         /**
          * Update a query node's metadata to include it's binding set variable order
@@ -433,57 +506,102 @@ public class SparqlFluoQueryBuilder {
             checkNotNull(parentNodeId);
 
             final NodeType childType = NodeType.fromNodeId(childNodeId).get();
-            switch(childType) {
-                case STATEMENT_PATTERN:
-                    StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
-                    if(spBuilder == null) {
-                        spBuilder = StatementPatternMetadata.builder(childNodeId);
-                        fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
-                    }
+            switch (childType) {
+            case STATEMENT_PATTERN:
+                StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
+                if (spBuilder == null) {
+                    spBuilder = StatementPatternMetadata.builder(childNodeId);
+                    fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
+                }
 
-                    spBuilder.setVarOrder(childVarOrder);
-                    spBuilder.setParentNodeId(parentNodeId);
-                    break;
+                spBuilder.setVarOrder(childVarOrder);
+                spBuilder.setParentNodeId(parentNodeId);
+                break;
 
-                case JOIN:
-                    JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
-                    if(joinBuilder == null) {
-                        joinBuilder = JoinMetadata.builder(childNodeId);
-                        fluoQueryBuilder.addJoinMetadata(joinBuilder);
-                    }
+            case JOIN:
+                JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
+                if (joinBuilder == null) {
+                    joinBuilder = JoinMetadata.builder(childNodeId);
+                    fluoQueryBuilder.addJoinMetadata(joinBuilder);
+                }
 
-                    joinBuilder.setVariableOrder(childVarOrder);
-                    joinBuilder.setParentNodeId(parentNodeId);
-                    break;
+                joinBuilder.setVariableOrder(childVarOrder);
+                joinBuilder.setParentNodeId(parentNodeId);
+                break;
 
-                case FILTER:
-                    FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
-                    if(filterBuilder == null) {
-                        filterBuilder = FilterMetadata.builder(childNodeId);
-                        fluoQueryBuilder.addFilterMetadata(filterBuilder);
-                    }
+            case FILTER:
+                FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
+                if (filterBuilder == null) {
+                    filterBuilder = FilterMetadata.builder(childNodeId);
+                    fluoQueryBuilder.addFilterMetadata(filterBuilder);
+                }
 
-                    filterBuilder.setVarOrder(childVarOrder);
-                    filterBuilder.setParentNodeId(parentNodeId);
-                    break;
+                filterBuilder.setVarOrder(childVarOrder);
+                filterBuilder.setParentNodeId(parentNodeId);
+                break;
 
-                case AGGREGATION:
-                    AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
-                    if(aggregationBuilder == null) {
-                        aggregationBuilder = AggregationMetadata.builder(childNodeId);
-                        fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
-                    }
+            case AGGREGATION:
+                AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
+                if (aggregationBuilder == null) {
+                    aggregationBuilder = AggregationMetadata.builder(childNodeId);
+                    fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
+                }
 
-                    aggregationBuilder.setVariableOrder(childVarOrder);
-                    aggregationBuilder.setParentNodeId(parentNodeId);
-                    break;
+                aggregationBuilder.setVariableOrder(childVarOrder);
+                aggregationBuilder.setParentNodeId(parentNodeId);
+                break;
 
-                case QUERY:
-                    throw new IllegalArgumentException("QUERY nodes do not have children.");
-                default:
-                    throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+            case QUERY:
+                throw new IllegalArgumentException("A QUERY node cannot be the child of another node.");
+            case CONSTRUCT:
+                throw new IllegalArgumentException("A CONSTRUCT node cannot be the child of another node.");
+            default:
+                throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+            }
+        }
+        
+        private ConstructGraph getConstructGraph(List<ProjectionElemList> projections, List<ExtensionElem> extensionElems) {
+            Map<String, Value> valueMap = new HashMap<>();
+            //create valueMap to associate source names with Values
+            for(ExtensionElem elem: extensionElems) {
+                String name = elem.getName();
+                ValueExpr expr = elem.getExpr();
+                if(expr instanceof ValueConstant) {
+                    Value value = ((ValueConstant) expr).getValue();
+                    valueMap.put(name, value);
+                } else if(expr instanceof BNodeGenerator) {
+                    valueMap.put(name, new BNodeImpl(UUID.randomUUID().toString()));
+                }
+            }
+            
+            Set<ConstructProjection> constructProj = new HashSet<>();
+            //build ConstructProjection for each ProjectionElemList
+            for(ProjectionElemList list: projections) {
+                validateProjectionElemList(list);
+                List<Var> vars = new ArrayList<>();
+                for(ProjectionElem elem: list.getElements()) {
+                    String sourceName = elem.getSourceName();
+                    Var var = new Var(sourceName);
+                    if(valueMap.containsKey(sourceName)) {
+                        var.setValue(valueMap.get(sourceName));
+                    }
+                    vars.add(var);
+                }
+                constructProj.add(new ConstructProjection(vars.get(0), vars.get(1), vars.get(2)));
             }
+            
+            return new ConstructGraph(constructProj);
+        }
+        
+        private void validateProjectionElemList(ProjectionElemList list) {
+            List<ProjectionElem> elements = list.getElements();
+            checkArgument(elements.size() == 3);
+            checkArgument(elements.get(0).getTargetName().equals("subject"));
+            checkArgument(elements.get(1).getTargetName().equals("predicate"));
+            checkArgument(elements.get(2).getTargetName().equals("object"));
         }
+        
+        
 
         /**
          * Get the non-constant variables from a {@link TupleExpr}.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java
new file mode 100644
index 0000000..94c8571
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java
@@ -0,0 +1,145 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.collect.Sets;
+
+public class ConstructGraphTest {
+
+    private ValueFactory vf = new ValueFactoryImpl();
+    
+    @Test
+    public void testConstructGraph() throws MalformedQueryException, UnsupportedEncodingException {
+        String query = "select ?x where { ?x <uri:talksTo> <uri:Bob>. ?y <uri:worksAt> ?z }";
+
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructGraph graph = new ConstructGraph(patterns);
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("x", vf.createURI("uri:Joe"));
+        bs.addBinding("y", vf.createURI("uri:Bob"));
+        bs.addBinding("z", vf.createURI("uri:BurgerShack"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs,"FOUO");
+        Set<RyaStatement> statements = graph.createGraphFromBindingSet(vBs);
+        
+        RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Bob"), new RyaURI("uri:worksAt"), new RyaURI("uri:BurgerShack"));
+        Set<RyaStatement> expected = Sets.newHashSet(Arrays.asList(statement1, statement2));
+        expected.forEach(x-> x.setColumnVisibility("FOUO".getBytes()));
+        ConstructGraphTestUtils.ryaStatementSetsEqualIgnoresTimestamp(expected, statements);
+    }
+    
+    @Test
+    public void testConstructGraphBNode() throws MalformedQueryException {
+        String query = "select ?x where { _:b <uri:talksTo> ?x. _:b <uri:worksAt> ?z }";
+
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructGraph graph = new ConstructGraph(patterns);
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("x", vf.createURI("uri:Joe"));
+        bs.addBinding("z", vf.createURI("uri:BurgerShack"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs, "FOUO");
+        Set<RyaStatement> statements = graph.createGraphFromBindingSet(vBs);
+        Set<RyaStatement> statements2 = graph.createGraphFromBindingSet(vBs);
+        
+        RyaURI subject = null;
+        for(RyaStatement statement: statements) {
+            RyaURI subjURI = statement.getSubject();
+            if(subject == null) {
+                subject = subjURI;
+            } else {
+                assertEquals(subjURI, subject);
+            }
+        }
+        RyaURI subject2 = null;
+        for(RyaStatement statement: statements2) {
+            RyaURI subjURI = statement.getSubject();
+            if(subject2 == null) {
+                subject2 = subjURI;
+            } else {
+                assertEquals(subjURI, subject2);
+            }
+        }
+        
+        assertTrue(!subject.equals(subject2));
+
+        ConstructGraphTestUtils.ryaStatementsEqualIgnoresBlankNode(statements, statements2);
+    }
+    
+    
+    @Test
+    public void testConstructGraphSerializer() throws MalformedQueryException {
+        
+        String query = "select ?x where { ?x <uri:talksTo> <uri:Bob>. ?y <uri:worksAt> ?z }";
+
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructGraph graph = new ConstructGraph(patterns);
+        
+        String constructString = ConstructGraphSerializer.toConstructString(graph);
+        ConstructGraph deserialized = ConstructGraphSerializer.toConstructGraph(constructString);
+        
+        assertEquals(graph, deserialized);
+        
+    }
+    
+    @Test
+    public void testConstructGraphSerializerBlankNode() throws MalformedQueryException {
+        
+        String query = "select ?x where { _:b <uri:talksTo> ?x. _:b <uri:worksAt> ?y }";
+
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructGraph graph = new ConstructGraph(patterns);
+        
+        String constructString = ConstructGraphSerializer.toConstructString(graph);
+        ConstructGraph deserialized = ConstructGraphSerializer.toConstructGraph(constructString);
+        
+        assertEquals(graph, deserialized);
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java
new file mode 100644
index 0000000..a12b6de
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java
@@ -0,0 +1,126 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.junit.Assert;
+import org.openrdf.model.Statement;
+
+import com.google.common.base.Objects;
+
+public class ConstructGraphTestUtils {
+
+    public static void ryaStatementSetsEqualIgnoresTimestamp(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+        Assert.assertEquals(new VisibilityStatementSet(statements1), new VisibilityStatementSet(statements2));
+    }
+
+    public static void subGraphsEqualIgnoresTimestamp(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+        Set<VisibilityStatementSet> set1 = new HashSet<>();
+        Set<VisibilityStatementSet> set2 = new HashSet<>();
+        subgraph1.forEach(x->set1.add(new VisibilityStatementSet(x.getStatements())));
+        subgraph2.forEach(x->set2.add(new VisibilityStatementSet(x.getStatements())));
+        Assert.assertEquals(set1, set2);
+    }
+    
+    public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+        Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>();
+        subgraph1.forEach(x->subGraphMap.put(getKey(x), x));
+        subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements()));
+    }
+    
+    private static int getKey(RyaSubGraph subgraph) {
+        int key = 0;
+        for(RyaStatement statement: subgraph.getStatements()) {
+            key += statement.getObject().hashCode();
+        }
+        return key;
+    }
+    
+    public static void ryaStatementsEqualIgnoresBlankNode(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+        Map<String, RyaURI> bNodeMap = new HashMap<>();
+        statements1.forEach(x-> bNodeMap.put(x.getPredicate().getData(), x.getSubject()));
+        statements2.forEach(x -> x.setSubject(bNodeMap.get(x.getPredicate().getData())));
+        ryaStatementSetsEqualIgnoresTimestamp(statements1, statements2);
+    }
+    
+    
+    /**
+     *  Class used for comparing Sets of RyaStatements while ignoring timestamps.
+     *  It is assumed that all RyaStatements in the Set used to construct this class
+     *  have the same visibility.
+     */
+    public static class VisibilityStatementSet {
+        
+        private Set<Statement> statements;
+        private String visibility;
+        
+        public VisibilityStatementSet(Set<RyaStatement> statements) {
+            this.statements = new HashSet<>();
+            statements.forEach(x -> {
+                this.statements.add(RyaToRdfConversions.convertStatement(x));
+                if (visibility == null) {
+                    if (x.getColumnVisibility() != null) {
+                        visibility = new String(x.getColumnVisibility());
+                    } else {
+                        this.visibility = "";
+                    }
+                }
+            });
+        }
+        
+        public VisibilityStatementSet(RyaSubGraph subgraph) {
+            this(subgraph.getStatements());
+        }
+        
+        @Override
+        public boolean equals(Object o) {
+            if(this == o) {
+                return true;
+            }
+            
+            if(o instanceof VisibilityStatementSet) {
+                VisibilityStatementSet that = (VisibilityStatementSet) o;
+                return Objects.equal(this.visibility, that.visibility) && Objects.equal(this.statements, that.statements);
+            }
+            
+            return false;
+        }
+        
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(visibility, statements);
+        }
+        
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            return builder.append("Visiblity Statement Set \n").append("   Statements: " + statements + "\n")
+                    .append("   Visibilities: " + visibility + " \n").toString();
+        }
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java
new file mode 100644
index 0000000..080031e
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java
@@ -0,0 +1,112 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static org.junit.Assert.assertEquals;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.BNode;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+public class ConstructProjectionTest {
+
+    private static final ValueFactory vf = new ValueFactoryImpl();
+    
+    @Test
+    public void testConstructProjectionProjectSubj() throws MalformedQueryException, UnsupportedEncodingException {
+        String query = "select ?x where { ?x <uri:talksTo> <uri:Bob> }";
+        
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructProjection projection = new ConstructProjection(patterns.get(0));
+        
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("x", vf.createURI("uri:Joe"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs, "FOUO");
+        RyaStatement statement = projection.projectBindingSet(vBs, new HashMap<>());
+        
+        RyaStatement expected = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob"));
+        expected.setColumnVisibility("FOUO".getBytes("UTF-8"));
+        expected.setTimestamp(statement.getTimestamp());
+        
+        assertEquals(expected, statement);
+    }
+    
+    @Test
+    public void testConstructProjectionProjPred() throws MalformedQueryException {
+        String query = "select ?p where { <uri:Joe> ?p <uri:Bob> }";
+        
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructProjection projection = new ConstructProjection(patterns.get(0));
+        
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("p", vf.createURI("uri:worksWith"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+        RyaStatement statement = projection.projectBindingSet(vBs, new HashMap<>());
+        
+        RyaStatement expected = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:worksWith"), new RyaURI("uri:Bob"));
+        expected.setTimestamp(statement.getTimestamp());
+        expected.setColumnVisibility(new byte[0]);
+        
+        assertEquals(expected, statement);
+    }
+    
+    @Test
+    public void testConstructProjectionBNodes() throws MalformedQueryException {
+        String query = "select ?o where { _:b <uri:talksTo> ?o }";
+        
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+        ConstructProjection projection = new ConstructProjection(patterns.get(0));
+        
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("o", vf.createURI("uri:Bob"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+        BNode bNode = vf.createBNode();
+        Map<String, BNode> bNodeMap = new HashMap<>();
+        bNodeMap.put("-anon-1", bNode);
+        RyaStatement statement = projection.projectBindingSet(vBs,bNodeMap);
+        
+        RyaStatement expected = new RyaStatement(RdfToRyaConversions.convertResource(bNode), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob"));
+        expected.setTimestamp(statement.getTimestamp());
+        expected.setColumnVisibility(new byte[0]);
+        
+        assertEquals(expected, statement);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
index 4ad5189..60e1bc1 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
@@ -38,10 +38,8 @@ public class FluoStringConverterTest {
         // Setup a StatementPattern that represents "?x <http://worksAt> <http://Chipotle>."
         final Var subject = new Var("x");
         final Var predicate = new Var("-const-http://worksAt", new URIImpl("http://worksAt"));
-        predicate.setAnonymous(true);
         predicate.setConstant(true);
         final Var object = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle"));
-        object.setAnonymous(true);
         object.setConstant(true);
         final StatementPattern pattern = new StatementPattern(subject, predicate, object);
 
@@ -69,10 +67,8 @@ public class FluoStringConverterTest {
         // Enusre it converted to the expected result.
         final Var subject = new Var("x");
         final Var predicate = new Var("-const-http://worksAt", new URIImpl("http://worksAt"));
-        predicate.setAnonymous(true);
         predicate.setConstant(true);
         final Var object = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle"));
-        object.setAnonymous(true);
         object.setConstant(true);
         final StatementPattern expected = new StatementPattern(subject, predicate, object);
 
@@ -89,7 +85,6 @@ public class FluoStringConverterTest {
 
         // Ensure it converted to the expected result.
         final Var expected = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle"));
-        expected.setAnonymous(true);
         expected.setConstant(true);
 
         assertEquals(expected, var);
@@ -105,7 +100,6 @@ public class FluoStringConverterTest {
 
         // Ensure it converted to the expected result.
         final Var expected = new Var("-const-5", new LiteralImpl("5", XMLSchema.INTEGER));
-        expected.setAnonymous(true);
         expected.setConstant(true);
 
         assertEquals(expected, result);
@@ -121,7 +115,6 @@ public class FluoStringConverterTest {
 
         // Ensure it converted to the expected result.
         final Var expected = new Var("-const-Chipotle", new LiteralImpl("Chipotle", XMLSchema.STRING));
-        expected.setAnonymous(true);
         expected.setConstant(true);
 
         assertEquals(expected, result);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java
new file mode 100644
index 0000000..8b9feaf
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java
@@ -0,0 +1,57 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static org.junit.Assert.assertEquals;
+
+import java.util.UUID;
+
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.junit.Test;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+public class RyaSubGraphKafkaSerDeTest {
+
+    private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe();
+    
+    @Test
+    public void serializationTestWithURI() {
+        RyaSubGraph bundle = new RyaSubGraph(UUID.randomUUID().toString());
+        bundle.addStatement(new RyaStatement(new RyaURI("uri:123"), new RyaURI("uri:234"), new RyaURI("uri:345")));
+        bundle.addStatement(new RyaStatement(new RyaURI("uri:345"), new RyaURI("uri:567"), new RyaURI("uri:789")));
+        byte[] bundleBytes = serializer.toBytes(bundle);
+        RyaSubGraph deserializedBundle = serializer.fromBytes(bundleBytes);
+        assertEquals(bundle, deserializedBundle);
+    }
+    
+    
+    @Test
+    public void serializationTestWithLiteral() {
+        RyaSubGraph bundle = new RyaSubGraph(UUID.randomUUID().toString());
+        bundle.addStatement(new RyaStatement(new RyaURI("uri:123"), new RyaURI("uri:234"), new RyaType(XMLSchema.INTEGER, "345")));
+        bundle.addStatement(new RyaStatement(new RyaURI("uri:345"), new RyaURI("uri:567"), new RyaType(XMLSchema.INTEGER, "789")));
+        byte[] bundleBytes = serializer.toBytes(bundle);
+        RyaSubGraph deserializedBundle = serializer.fromBytes(bundleBytes);
+        assertEquals(bundle, deserializedBundle);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
index 74193cf..b9c10d4 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
@@ -28,7 +28,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
 import org.junit.Test;
 
 /**
@@ -93,7 +93,7 @@ public class KafkaExportParametersTest {
 
     @Test
     public void testKafkaResultExporterFactory() {
-        KafkaResultExporterFactory factory = new KafkaResultExporterFactory();
+        KafkaBindingSetExporterFactory factory = new KafkaBindingSetExporterFactory();
         assertNotNull(factory);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
index e1c386d..99ccc58 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
@@ -25,6 +25,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
@@ -55,13 +56,26 @@ public class QueryReportRenderer {
 
         final FluoQuery metadata = queryReport.getFluoQuery();
 
-        final QueryMetadata queryMetadata = metadata.getQueryMetadata();
-        builder.appendItem( new ReportItem("QUERY NODE") );
-        builder.appendItem( new ReportItem("Node ID", queryMetadata.getNodeId()) );
-        builder.appendItem( new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()) );
-        builder.appendItem( new ReportItem("SPARQL", prettyFormatSparql( queryMetadata.getSparql()) ) );
-        builder.appendItem( new ReportItem("Child Node ID", queryMetadata.getChildNodeId()) );
-        builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())) );
+        switch (metadata.getQueryType()) {
+        case Projection:
+            final QueryMetadata queryMetadata = metadata.getQueryMetadata().get();
+            builder.appendItem(new ReportItem("QUERY NODE"));
+            builder.appendItem(new ReportItem("Node ID", queryMetadata.getNodeId()));
+            builder.appendItem(new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()));
+            builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(queryMetadata.getSparql())));
+            builder.appendItem(new ReportItem("Child Node ID", queryMetadata.getChildNodeId()));
+            builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())));
+            break;
+        case Construct:
+            final ConstructQueryMetadata constructMetadata = metadata.getConstructQueryMetadata().get();
+            builder.appendItem(new ReportItem("CONSTRUCT QUERY NODE"));
+            builder.appendItem(new ReportItem("Node ID", constructMetadata.getNodeId()));
+            builder.appendItem(new ReportItem("Variable Order", constructMetadata.getVariableOrder().toString()));
+            builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(constructMetadata.getSparql())));
+            builder.appendItem(new ReportItem("Child Node ID", constructMetadata.getChildNodeId()));
+            builder.appendItem(new ReportItem("Construct Graph", constructMetadata.getConstructGraph().toString()));
+            builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(constructMetadata.getNodeId())));
+        }
 
         for(final FilterMetadata filterMetadata : metadata.getFilterMetadata()) {
             builder.appendItem( new ReportItem("") );

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index 9263362..85edb11 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -1,100 +1,98 @@
 <?xml version="1.0" encoding="utf-8"?>
 <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-    license agreements. See the NOTICE file distributed with this work for additional 
-    information regarding copyright ownership. The ASF licenses this file to 
-    you under the Apache License, Version 2.0 (the "License"); you may not use 
-    this file except in compliance with the License. You may obtain a copy of 
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-    by applicable law or agreed to in writing, software distributed under the 
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-    OF ANY KIND, either express or implied. See the License for the specific 
-    language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	you under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
-    <parent>
-        <groupId>org.apache.rya</groupId>
-        <artifactId>rya.pcj.fluo.parent</artifactId>
-        <version>3.2.11-incubating-SNAPSHOT</version>
-    </parent>
+	<parent>
+		<groupId>org.apache.rya</groupId>
+		<artifactId>rya.pcj.fluo.parent</artifactId>
+		<version>3.2.11-incubating-SNAPSHOT</version>
+	</parent>
 
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>rya.pcj.fluo.integration</artifactId>
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>rya.pcj.fluo.integration</artifactId>
 
-    <name>Apache Rya PCJ Fluo Integration Tests</name>
-    <description>Integration tests for the Rya Fluo application.</description>
+	<name>Apache Rya PCJ Fluo Integration Tests</name>
+	<description>Integration tests for the Rya Fluo application.</description>
 
-    <dependencies>
-        <!-- Rya Runtime Dependencies. -->
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.pcj.fluo.api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.pcj.fluo.client</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.indexing</artifactId>
-        </dependency>
-         <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-api</artifactId>
-        </dependency>
+	<dependencies>
+		<!-- Rya Runtime Dependencies. -->
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.pcj.fluo.api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.pcj.fluo.client</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.indexing</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.fluo</groupId>
+			<artifactId>fluo-api</artifactId>
+		</dependency>
 
-        <!-- Testing dependencies. -->
-        <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-mini</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-         <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-api</artifactId>
-        </dependency>
+		<!-- Testing dependencies. -->
+		<dependency>
+			<groupId>org.apache.fluo</groupId>
+			<artifactId>fluo-mini</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
 
-        <dependency>
-          <groupId>org.apache.kafka</groupId>
-          <artifactId>kafka-clients</artifactId>
-          <version>0.10.1.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
-            <version>0.10.1.0</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>slf4j-log4j12</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <!-- Testing dependencies. -->
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
-            <version>0.10.1.0</version>
-            <classifier>test</classifier>
-            <exclusions>
-                <exclusion>
-                    <artifactId>slf4j-log4j12</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-             <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-recipes-test</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>0.10.1.0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.11</artifactId>
+			<version>0.10.1.0</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>slf4j-log4j12</artifactId>
+					<groupId>org.slf4j</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<!-- Testing dependencies. -->
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.11</artifactId>
+			<version>0.10.1.0</version>
+			<classifier>test</classifier>
+			<exclusions>
+				<exclusion>
+					<artifactId>slf4j-log4j12</artifactId>
+					<groupId>org.slf4j</groupId>
+				</exclusion>
+			</exclusions>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.fluo</groupId>
+			<artifactId>fluo-recipes-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
new file mode 100644
index 0000000..124569b
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
@@ -0,0 +1,126 @@
+package org.apache.rya.indexing.pcj.fluo;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.junit.Assert;
+import org.openrdf.model.Statement;
+
+import com.google.common.base.Objects;
+
+public class ConstructGraphTestUtils {
+
+    public static void ryaStatementSetsEqualIgnoresTimestamp(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+        Assert.assertEquals(new VisibilityStatementSet(statements1), new VisibilityStatementSet(statements2));
+    }
+
+    public static void subGraphsEqualIgnoresTimestamp(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+        Set<VisibilityStatementSet> set1 = new HashSet<>();
+        Set<VisibilityStatementSet> set2 = new HashSet<>();
+        subgraph1.forEach(x->set1.add(new VisibilityStatementSet(x.getStatements())));
+        subgraph2.forEach(x->set2.add(new VisibilityStatementSet(x.getStatements())));
+        Assert.assertEquals(set1, set2);
+    }
+    
+    public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+        Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>();
+        subgraph1.forEach(x->subGraphMap.put(getKey(x), x));
+        subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements()));
+    }
+    
+    private static int getKey(RyaSubGraph subgraph) {
+        int key = 0;
+        for(RyaStatement statement: subgraph.getStatements()) {
+            key += statement.getObject().hashCode();
+        }
+        return key;
+    }
+    
+    public static void ryaStatementsEqualIgnoresBlankNode(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+        Map<String, RyaURI> bNodeMap = new HashMap<>();
+        statements1.forEach(x-> bNodeMap.put(x.getPredicate().getData(), x.getSubject()));
+        statements2.forEach(x -> x.setSubject(bNodeMap.get(x.getPredicate().getData())));
+        ryaStatementSetsEqualIgnoresTimestamp(statements1, statements2);
+    }
+    
+    
+    /**
+     *  Class used for comparing Sets of RyaStatements while ignoring timestamps.
+     *  It is assumed that all RyaStatements in the Set used to construct this class
+     *  have the same visibility.
+     */
+    public static class VisibilityStatementSet {
+        
+        private Set<Statement> statements;
+        private String visibility;
+        
+        public VisibilityStatementSet(Set<RyaStatement> statements) {
+            this.statements = new HashSet<>();
+            statements.forEach(x -> {
+                this.statements.add(RyaToRdfConversions.convertStatement(x));
+                if (visibility == null) {
+                    if (x.getColumnVisibility() != null) {
+                        visibility = new String(x.getColumnVisibility());
+                    } else {
+                        this.visibility = "";
+                    }
+                }
+            });
+        }
+        
+        public VisibilityStatementSet(RyaSubGraph subgraph) {
+            this(subgraph.getStatements());
+        }
+        
+        @Override
+        public boolean equals(Object o) {
+            if(this == o) {
+                return true;
+            }
+            
+            if(o instanceof VisibilityStatementSet) {
+                VisibilityStatementSet that = (VisibilityStatementSet) o;
+                return Objects.equal(this.visibility, that.visibility) && Objects.equal(this.statements, that.statements);
+            }
+            
+            return false;
+        }
+        
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(visibility, statements);
+        }
+        
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            return builder.append("Visiblity Statement Set \n").append("   Statements: " + statements + "\n")
+                    .append("   Visibilities: " + visibility + " \n").toString();
+        }
+        
+    }
+    
+}


[4/4] incubator-rya git commit: RYA-273-Construct Query Support. Closes #161.

Posted by ca...@apache.org.
RYA-273-Construct Query Support. Closes #161.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/60090ad5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/60090ad5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/60090ad5

Branch: refs/heads/master
Commit: 60090ad52de294d55e2bcea2a0629ee19bfb3827
Parents: 646d21b
Author: Caleb Meier <ca...@parsons.com>
Authored: Fri Apr 14 19:20:25 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Thu Jun 22 11:03:02 2017 -0700

----------------------------------------------------------------------
 common/rya.api/pom.xml                          |   6 +-
 .../org/apache/rya/api/domain/RyaSubGraph.java  | 118 +++++++
 .../kryo/RyaStatementSerializer.java            | 162 +++++++++
 .../kryo/RyaSubGraphSerializer.java             |  84 +++++
 ...AbstractAccumuloRdfConfigurationBuilder.java |  26 +-
 .../apache/rya/sail/config/RyaSailFactory.java  |  40 +++
 .../rya/indexing/pcj/fluo/api/CreatePcj.java    |  71 +++-
 .../rya/indexing/pcj/fluo/api/DeletePcj.java    |  11 +-
 .../indexing/pcj/fluo/app/ConstructGraph.java   | 141 ++++++++
 .../pcj/fluo/app/ConstructGraphSerializer.java  |  52 +++
 .../pcj/fluo/app/ConstructProjection.java       | 266 ++++++++++++++
 .../fluo/app/ConstructQueryResultUpdater.java   |  91 +++++
 .../pcj/fluo/app/FluoStringConverter.java       |  51 ++-
 .../fluo/app/IncrementalUpdateConstants.java    |   1 +
 .../rya/indexing/pcj/fluo/app/NodeType.java     |  23 +-
 .../export/IncrementalBindingSetExporter.java   |  69 ++++
 .../IncrementalBindingSetExporterFactory.java   | 104 ++++++
 .../app/export/IncrementalResultExporter.java   |  69 ----
 .../IncrementalResultExporterFactory.java       | 104 ------
 .../export/IncrementalRyaSubGraphExporter.java  |  39 ++
 .../IncrementalRyaSubGraphExporterFactory.java  |  47 +++
 .../export/kafka/KafkaBindingSetExporter.java   |  87 +++++
 .../kafka/KafkaBindingSetExporterFactory.java   |  64 ++++
 .../app/export/kafka/KafkaResultExporter.java   |  87 -----
 .../kafka/KafkaResultExporterFactory.java       |  64 ----
 .../export/kafka/KafkaRyaSubGraphExporter.java  |  81 +++++
 .../kafka/KafkaRyaSubGraphExporterFactory.java  |  62 ++++
 .../app/export/kafka/RyaSubGraphKafkaSerDe.java | 100 ++++++
 .../app/export/rya/RyaBindingSetExporter.java   |  72 ++++
 .../rya/RyaBindingSetExporterFactory.java       |  77 ++++
 .../app/export/rya/RyaExportParameters.java     |  15 +
 .../fluo/app/export/rya/RyaResultExporter.java  |  72 ----
 .../export/rya/RyaResultExporterFactory.java    |  77 ----
 .../fluo/app/observers/BindingSetUpdater.java   |  12 +
 .../observers/ConstructQueryResultObserver.java | 198 +++++++++++
 .../fluo/app/observers/QueryResultObserver.java |  36 +-
 .../fluo/app/query/ConstructQueryMetadata.java  | 192 ++++++++++
 .../indexing/pcj/fluo/app/query/FluoQuery.java  | 106 +++++-
 .../pcj/fluo/app/query/FluoQueryColumns.java    |  33 ++
 .../fluo/app/query/FluoQueryMetadataDAO.java    | 181 +++++++---
 .../pcj/fluo/app/query/QueryMetadata.java       |   3 +-
 .../fluo/app/query/SparqlFluoQueryBuilder.java  | 210 ++++++++---
 .../pcj/fluo/app/ConstructGraphTest.java        | 145 ++++++++
 .../pcj/fluo/app/ConstructGraphTestUtils.java   | 126 +++++++
 .../pcj/fluo/app/ConstructProjectionTest.java   | 112 ++++++
 .../pcj/fluo/app/FluoStringConverterTest.java   |   7 -
 .../pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java |  57 +++
 .../export/rya/KafkaExportParametersTest.java   |   4 +-
 .../fluo/client/util/QueryReportRenderer.java   |  28 +-
 .../rya.pcj.fluo/pcj.fluo.integration/pom.xml   | 180 +++++-----
 .../pcj/fluo/ConstructGraphTestUtils.java       | 126 +++++++
 .../indexing/pcj/fluo/KafkaExportITBase.java    | 143 +++++---
 .../rya/indexing/pcj/fluo/RyaExportITBase.java  |   9 +
 .../indexing/pcj/fluo/api/GetQueryReportIT.java |   2 +-
 .../fluo/app/query/FluoQueryMetadataDAOIT.java  |  94 ++++-
 .../pcj/fluo/integration/CreateDeleteIT.java    |   1 +
 .../pcj/fluo/integration/KafkaExportIT.java     |  36 +-
 .../integration/KafkaRyaSubGraphExportIT.java   | 352 +++++++++++++++++++
 .../pcj/functions/geo/FunctionAdapter.java      |   2 -
 59 files changed, 3986 insertions(+), 842 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/pom.xml
----------------------------------------------------------------------
diff --git a/common/rya.api/pom.xml b/common/rya.api/pom.xml
index f73c006..94f191d 100644
--- a/common/rya.api/pom.xml
+++ b/common/rya.api/pom.xml
@@ -70,7 +70,11 @@ under the License.
             <groupId>com.github.stephenc.jcip</groupId>
             <artifactId>jcip-annotations</artifactId>
         </dependency>
-        
+        <dependency>
+			<groupId>com.esotericsoftware.kryo</groupId>
+			<artifactId>kryo</artifactId>
+			<version>2.24.0</version>
+		</dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java
new file mode 100644
index 0000000..f08eba4
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaSubGraph.java
@@ -0,0 +1,118 @@
+package org.apache.rya.api.domain;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.base.Objects;
+
+/**
+ * This class packages together a collection of {@link RyaStatement}s to form a subgraph
+ */
+public class RyaSubGraph {
+
+    private String id;
+    private Set<RyaStatement> statements;
+    
+    /**
+     * Creates empty subgraph with given id
+     * @param id - id of the created subgraph
+     */
+    public RyaSubGraph(String id) {
+        this.id = id;
+        this.statements = new HashSet<>();
+    }
+    
+    /**
+     * Creates sugraph with specified id and statements
+     * @param id - id of the created subgraph
+     * @param statements - statements that make up subgraph
+     */
+    public RyaSubGraph(String id, Set<RyaStatement> statements) {
+        this.id = id;
+        this.statements = statements;
+    }
+
+    /**
+     * @return id of this subgraph
+     */
+    public String getId() {
+        return id;
+    }
+    
+    /**
+     * @return RyaStatements representing this subgraph
+     */
+    public Set<RyaStatement> getStatements() {
+        return statements;
+    }
+    
+    /**
+     * Sets id of subgraph
+     * @param id - id of subgraph
+     */
+    public void setId(String id) {
+        this.id = id;
+    }
+    
+    /**
+     * Sets subgraph statements to specified statements
+     * @param statements - statements that will be set to subgraph statements
+     */
+    public void setStatements(Set<RyaStatement> statements) {
+        this.statements = statements;
+    }
+    
+
+    /**
+     * Adds statement to this subgraph
+     * @param statement - RyaStatement to be added to subgraph
+     */
+    public void addStatement(RyaStatement statement){
+        statements.add(statement);
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        
+        if(this == other) {
+            return true;
+        }
+        
+        if(other instanceof RyaSubGraph) {
+            RyaSubGraph bundle = (RyaSubGraph) other;
+            return Objects.equal(this.id, ((RyaSubGraph) other).id) && Objects.equal(this.statements,bundle.statements);
+        }
+        
+        return false;
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(this.id, this.statements);
+    }
+    
+    
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Rya Subgraph {\n").append("   Rya Subgraph ID: " + id + "\n")
+                .append("   Rya Statements: " + statements + "\n").toString();
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java
new file mode 100644
index 0000000..6c0efd2
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaStatementSerializer.java
@@ -0,0 +1,162 @@
+package org.apache.rya.api.domain.serialization.kryo;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+
+/**
+ * Kryo Serializer for {@link RyaStatement}s
+ *
+ */
+public class RyaStatementSerializer extends Serializer<RyaStatement> {
+    
+    /**
+     * Uses Kryo to write RyaStatement to {@lin Output}
+     * @param kryo - writes statement to output
+     * @param output - output stream that statement is written to
+     * @param object - statement written to output
+     */
+    public static void writeToKryo(Kryo kryo, Output output, RyaStatement object) {
+        Preconditions.checkNotNull(kryo);
+        Preconditions.checkNotNull(output);
+        Preconditions.checkNotNull(object);
+        output.writeString(object.getSubject().getData());
+        output.writeString(object.getPredicate().getData());
+        output.writeString(object.getObject().getDataType().toString());
+        output.writeString(object.getObject().getData());
+        boolean hasContext = object.getContext() != null;
+        output.writeBoolean(hasContext);
+        if(hasContext){
+            output.writeString(object.getContext().getData());
+        }
+        boolean shouldWrite = object.getColumnVisibility() != null;
+        output.writeBoolean(shouldWrite);
+        if(shouldWrite){
+            output.writeInt(object.getColumnVisibility().length);
+            output.writeBytes(object.getColumnVisibility());
+        }
+        shouldWrite = object.getQualifer() != null;
+        output.writeBoolean(shouldWrite);
+        if(shouldWrite){
+            output.writeString(object.getQualifer());
+        }
+        shouldWrite = object.getTimestamp() != null;
+        output.writeBoolean(shouldWrite);
+        if(shouldWrite){
+            output.writeLong(object.getTimestamp());
+        }
+        shouldWrite = object.getValue() != null;
+        output.writeBoolean(shouldWrite);
+        if(shouldWrite){
+            output.writeBytes(object.getValue());
+        }
+    }   
+
+    /**
+     * Uses Kryo to write RyaStatement to {@lin Output}
+     * @param kryo - writes statement to output
+     * @param output - output stream that statement is written to
+     * @param object - statement written to output
+     */
+    @Override
+    public void write(Kryo kryo, Output output, RyaStatement object) {
+        writeToKryo(kryo, output, object);
+    }
+    
+    /**
+     * Uses Kryo to read a RyaStatement from {@link Input}
+     * @param kryo - reads statement from input
+     * @param input - Input stream that statement is read from
+     * @param type - Type read from input stream
+     * @return - statement read from input stream
+     */
+    public static RyaStatement readFromKryo(Kryo kryo, Input input, Class<RyaStatement> type){
+        return read(input);
+    }
+
+    /**
+     * Reads RyaStatement from {@link Input} stream
+     * @param input - input stream that statement is read from
+     * @return - statement read from input stream
+     */
+    public static RyaStatement read(Input input){
+        Preconditions.checkNotNull(input);
+        String subject = input.readString();
+        String predicate = input.readString();
+        String objectType = input.readString();
+        String objectValue = input.readString();
+        RyaType value;
+        if (objectType.equals(XMLSchema.ANYURI.toString())){
+            value = new RyaURI(objectValue);
+        }
+        else {
+            value = new RyaType(new URIImpl(objectType), objectValue);
+        }
+        RyaStatement statement = new RyaStatement(new RyaURI(subject), new RyaURI(predicate), value);
+        int length = 0;
+        boolean hasNextValue = input.readBoolean();
+        if (hasNextValue){
+            statement.setContext(new RyaURI(input.readString()));
+        }
+        hasNextValue = input.readBoolean();
+        if (hasNextValue){
+            length = input.readInt();
+            statement.setColumnVisibility(input.readBytes(length));
+        }
+        hasNextValue = input.readBoolean();
+        if (hasNextValue){
+            statement.setQualifer(input.readString());
+        }
+        hasNextValue = input.readBoolean();
+        if (hasNextValue){
+            statement.setTimestamp(input.readLong());
+        }
+        hasNextValue = input.readBoolean();
+        if (hasNextValue){
+            length = input.readInt();
+            statement.setValue(input.readBytes(length));
+        }
+
+        return statement;
+    }
+
+    /**
+     * Uses Kryo to read a RyaStatement from {@link Input}
+     * @param kryo - reads statement from input
+     * @param input - Input stream that statement is read from
+     * @param type - Type read from input stream
+     * @return - statement read from input stream
+     */
+    @Override
+    public RyaStatement read(Kryo kryo, Input input, Class<RyaStatement> type) {
+        return readFromKryo(kryo, input, type);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java
new file mode 100644
index 0000000..dbb6c3b
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/serialization/kryo/RyaSubGraphSerializer.java
@@ -0,0 +1,84 @@
+package org.apache.rya.api.domain.serialization.kryo;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo based Serializer/Deserializer for {@link RyaSubGraph}.
+ *
+ */
+public class RyaSubGraphSerializer extends Serializer<RyaSubGraph>{
+    static final Logger log = LoggerFactory.getLogger(RyaSubGraphSerializer.class);
+
+    /**
+     * Use Kryo to write RyaSubGraph to {@link Output} stream
+     * @param kryo - used to write subgraph to output stream
+     * @param output - stream that subgraph is written to
+     * @param object - subgraph written to output stream
+     */
+    @Override
+    public void write(Kryo kryo, Output output, RyaSubGraph object) {
+        output.writeString(object.getId());
+        output.writeInt(object.getStatements().size());
+        for (RyaStatement statement : object.getStatements()){
+            RyaStatementSerializer.writeToKryo(kryo, output, statement);
+        }
+    }
+    
+    /**
+     * Reads RyaSubGraph from {@link Input} stream
+     * @param input - stream that subgraph is read from
+     * @return subgraph read from input stream
+     */
+    public static RyaSubGraph read(Input input){
+        RyaSubGraph bundle = new RyaSubGraph(input.readString());
+        int numStatements = input.readInt();
+        for (int i=0; i < numStatements; i++){
+            bundle.addStatement(RyaStatementSerializer.read(input));
+        }       
+        return bundle;
+    }
+
+    /**
+     * Uses Kryo to read RyaSubGraph from {@link Input} stream
+     * @param kryo - used to read subgraph from input stream
+     * @param input - stream that subgraph is read from
+     * @param type - class of object to be read from input stream (RyaSubgraph)
+     * @return subgraph read from input stream
+     */
+    @Override
+    public RyaSubGraph read(Kryo kryo, Input input, Class<RyaSubGraph> type) {
+        RyaSubGraph bundle = new RyaSubGraph(input.readString());
+        int numStatements = input.readInt();
+        
+        for (int i=0; i < numStatements; i++){
+            bundle.addStatement(RyaStatementSerializer.readFromKryo(kryo, input, RyaStatement.class));
+        }       
+        return bundle;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java
index e342db2..d1422f6 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AbstractAccumuloRdfConfigurationBuilder.java
@@ -44,19 +44,19 @@ public abstract class AbstractAccumuloRdfConfigurationBuilder<B extends Abstract
     private boolean useComposite = false;
     private boolean useSelectivity = false;
 
-    protected static final String ACCUMULO_USER = "accumulo.user";
-    protected static final String ACCUMULO_PASSWORD = "accumulo.password";
-    protected static final String ACCUMULO_INSTANCE = "accumulo.instance";
-    protected static final String ACCUMULO_AUTHS = "accumulo.auths";
-    protected static final String ACCUMULO_VISIBILITIES = "accumulo.visibilities";
-    protected static final String ACCUMULO_ZOOKEEPERS = "accumulo.zookeepers";
-    protected static final String ACCUMULO_RYA_PREFIX = "accumulo.rya.prefix";
-    protected static final String USE_INFERENCE = "use.inference";
-    protected static final String USE_DISPLAY_QUERY_PLAN = "use.display.plan";
-    protected static final String USE_MOCK_ACCUMULO = "use.mock";
-    protected static final String USE_PREFIX_HASHING = "use.prefix.hashing";
-    protected static final String USE_COUNT_STATS = "use.count.stats";
-    protected static final String USE_JOIN_SELECTIVITY = "use.join.selectivity";
+    public static final String ACCUMULO_USER = "accumulo.user";
+    public static final String ACCUMULO_PASSWORD = "accumulo.password";
+    public static final String ACCUMULO_INSTANCE = "accumulo.instance";
+    public static final String ACCUMULO_AUTHS = "accumulo.auths";
+    public static final String ACCUMULO_VISIBILITIES = "accumulo.visibilities";
+    public static final String ACCUMULO_ZOOKEEPERS = "accumulo.zookeepers";
+    public static final String ACCUMULO_RYA_PREFIX = "accumulo.rya.prefix";
+    public static final String USE_INFERENCE = "use.inference";
+    public static final String USE_DISPLAY_QUERY_PLAN = "use.display.plan";
+    public static final String USE_MOCK_ACCUMULO = "use.mock";
+    public static final String USE_PREFIX_HASHING = "use.prefix.hashing";
+    public static final String USE_COUNT_STATS = "use.count.stats";
+    public static final String USE_JOIN_SELECTIVITY = "use.join.selectivity";
 
     /**
      * Sets Accumulo user. This is a required parameter to connect to an

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
index bdb33ce..e156f86 100644
--- a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
@@ -131,6 +131,19 @@ public class RyaSailFactory {
         return dao;
     }
 
+    /**
+     * Creates AccumuloRyaDAO without updating the AccumuloRdfConfiguration.  This method does not force
+     * the user's configuration to be consistent with the Rya Instance configuration.  As a result, new index
+     * tables might be created when using this method.  This method does not require the {@link AccumuloRyaInstanceDetailsRepository}
+     * to exist.  This is for internal use, backwards compatibility and testing purposes only.  It is recommended that
+     * {@link RyaSailFactory#getAccumuloDAOWithUpdatedConfig(AccumuloRdfConfiguration)} be used for new installations of Rya.
+     * 
+     * @param config - user configuration
+     * @return - AccumuloRyaDAO with Indexers configured according to user's specification
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     * @throws RyaDAOException
+     */
     public static AccumuloRyaDAO getAccumuloDAO(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException {
         final Connector connector = ConfigUtils.getConnector(config);
         final AccumuloRyaDAO dao = new AccumuloRyaDAO();
@@ -142,6 +155,33 @@ public class RyaSailFactory {
         dao.init();
         return dao;
     }
+    
+    /**
+     * Creates an AccumuloRyaDAO after updating the AccumuloRdfConfiguration so that it is consistent
+     * with the configuration of the RyaInstance that the user is trying to connect to.  This ensures
+     * that user configuration aligns with Rya instance configuration and prevents the creation of 
+     * new index tables based on a user's query configuration.  This method requires the {@link AccumuloRyaInstanceDetailsRepository}
+     * to exist.
+     * 
+     * @param config - user's query configuration
+     * @return - AccumuloRyaDAO with an updated configuration that is consistent with the Rya instance configuration
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     * @throws RyaDAOException
+     */
+    public static AccumuloRyaDAO getAccumuloDAOWithUpdatedConfig(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException {
+        
+        String ryaInstance = config.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
+        Objects.requireNonNull(ryaInstance, "RyaInstance or table prefix is missing from configuration."+RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
+        String user = config.get(AccumuloRdfConfiguration.CLOUDBASE_USER);
+        String pswd = config.get(AccumuloRdfConfiguration.CLOUDBASE_PASSWORD);
+        Objects.requireNonNull(user, "Accumulo user name is missing from configuration."+AccumuloRdfConfiguration.CLOUDBASE_USER);
+        Objects.requireNonNull(pswd, "Accumulo user password is missing from configuration."+AccumuloRdfConfiguration.CLOUDBASE_PASSWORD);
+        config.setTableLayoutStrategy( new TablePrefixLayoutStrategy(ryaInstance) );
+        updateAccumuloConfig(config, user, pswd, ryaInstance);
+        
+        return getAccumuloDAO(config);
+    }
 
     public static void updateAccumuloConfig(final AccumuloRdfConfiguration config, final String user, final String pswd, final String ryaInstance) throws AccumuloException, AccumuloSecurityException {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
index 1de0813..a17f02f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -113,9 +113,49 @@ public class CreatePcj {
         checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0.");
         this.spInsertBatchSize = spInsertBatchSize;
     }
+    
+    
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  This method does not
+     * require a pcjId and does not require a PCJ table to have already been created via {@link PrecomputedJoinStorage}.
+     * This method only adds the metadata to the Fluo table to incrementally generate query results.  Since there
+     * is no PCJ table, the incremental results must be exported to some external queuing service such as Kafka.
+     * This method currently only supports SPARQL COSNTRUCT queries, as they only export to Kafka by default. 
+     *
+     * @param sparql - SPARQL query whose results will be updated in the Fluo table
+     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+     * @return The metadata that was written to the Fluo application for the PCJ.
+     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+     * @throws RuntimeException If SPARQL query is not a CONSTRUCT query.
+     */
+    public FluoQuery createFluoPcj(final FluoClient fluo, String sparql) throws MalformedQueryException, PcjException {
+        requireNonNull(sparql);
+        requireNonNull(fluo);
+
+        // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo.
+        // We use these IDs later when scanning Rya for historic Statement Pattern matches
+        // as well as setting up automatic exports.
+        final NodeIds nodeIds = new NodeIds();
+        final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null);
+        final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
+        checkArgument(fluoQuery.getConstructQueryMetadata().isPresent(), "Sparql query: " + sparql + " must begin with a construct.");
+
+        try (Transaction tx = fluo.newTransaction()) {
+            // Write the query's structure to Fluo.
+            new FluoQueryMetadataDAO().write(tx, fluoQuery);
+            tx.commit();
+        }
+
+        return fluoQuery;
+    }
+
+    
 
     /**
-     * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  This method requires that a
+     * PCJ table already exist for the query corresponding to the pcjId.  Results will be exported
+     * to this table.
      *
      * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
      * @param pcjStorage - Provides access to the PCJ index. (not null)
@@ -146,12 +186,14 @@ public class CreatePcj {
         try (Transaction tx = fluo.newTransaction()) {
             // Write the query's structure to Fluo.
             new FluoQueryMetadataDAO().write(tx, fluoQuery);
-
-            // The results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ.
-            final String queryId = fluoQuery.getQueryMetadata().getNodeId();
-            tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
-            tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
-
+            
+            if (fluoQuery.getQueryMetadata().isPresent()) {
+                // If the query is not a construct query, 
+                // the results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ.
+                final String queryId = fluoQuery.getQueryMetadata().get().getNodeId();
+                tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
+                tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
+            } 
             // Flush the changes to Fluo.
             tx.commit();
         }
@@ -165,7 +207,9 @@ public class CreatePcj {
      * This call scans Rya for Statement Pattern matches and inserts them into
      * the Fluo application. The Fluo application will then maintain the intermediate
      * results as new triples are inserted and export any new query results to the
-     * {@code pcjId} within the provided {@code pcjStorage}.
+     * {@code pcjId} within the provided {@code pcjStorage}.  This method requires that a
+     * PCJ table already exist for the query corresponding to the pcjId.  Results will be exported
+     * to this table.
      *
      * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
      * @param pcjStorage - Provides access to the PCJ index. (not null)
@@ -227,9 +271,14 @@ public class CreatePcj {
         } catch (final IOException e) {
             log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e);
         }
-
-        // return queryId to the caller for later monitoring from the export.
-        return fluoQuery.getQueryMetadata().getNodeId();
+        
+        //return queryId to the caller for later monitoring from the export
+        if(fluoQuery.getConstructQueryMetadata().isPresent()) {
+            return fluoQuery.getConstructQueryMetadata().get().getNodeId();
+        } 
+        
+        return fluoQuery.getQueryMetadata().get().getNodeId();
+        
     }
 
     private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
index c11f9fb..87eb9cc 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
@@ -34,6 +34,7 @@ import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
@@ -139,6 +140,12 @@ public class DeletePcj {
                 nodeIds.add(queryChild);
                 getChildNodeIds(tx, queryChild, nodeIds);
                 break;
+            case CONSTRUCT:
+                final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId);
+                final String constructChild = constructMeta.getChildNodeId();
+                nodeIds.add(constructChild);
+                getChildNodeIds(tx, constructChild, nodeIds);
+                break;
             case JOIN:
                 final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId);
                 final String lchild = joinMeta.getLeftChildNodeId();
@@ -229,7 +236,7 @@ public class DeletePcj {
 
 
     /**
-     * Deletes all BindingSets associated with the specified nodeId.
+     * Deletes all results (BindingSets or Statements) associated with the specified nodeId.
      *
      * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null)
      * @param client - Used to delete the data. (not null)
@@ -240,7 +247,7 @@ public class DeletePcj {
 
         final NodeType type = NodeType.fromNodeId(nodeId).get();
         Transaction tx = client.newTransaction();
-        while(deleteDataBatch(tx, getIterator(tx, nodeId, type.getBsColumn()), type.getBsColumn())) {
+        while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) {
             tx = client.newTransaction();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java
new file mode 100644
index 0000000..6c6f833
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraph.java
@@ -0,0 +1,141 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.BNode;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.BNodeImpl;
+import org.openrdf.query.algebra.StatementPattern;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Creates a construct query graph (represented as a Set of
+ * {@link RyaStatement}s with Binding names subject, predicate, object) from a
+ * given BindingSet and the underlying {@link ConstructProjection}s.
+ *
+ */
+public class ConstructGraph {
+
+    private Set<ConstructProjection> projections;
+    private Set<String> bNodeNames;
+    
+    /**
+     * Creates a ConstructGraph from the specified collection of {@link ConstructProjection}s.
+     * @param projections - ConstructProjections used to create a ConstructGraph
+     */
+    public ConstructGraph(Set<ConstructProjection> projections) {
+        Preconditions.checkNotNull(projections);
+        Preconditions.checkArgument(projections.size() > 0);
+        this.projections = projections;
+        this.bNodeNames = getBNodeNames(projections);
+    }
+    
+    /**
+     * Creates a ConstructGraph from the given Collection of {@link StatementPattern}s.
+     * @param patterns - StatementPatterns used to create a ConstructGraph
+     */
+    public ConstructGraph(Collection<StatementPattern> patterns) {
+        Preconditions.checkNotNull(patterns);
+        Preconditions.checkArgument(patterns.size() > 0);
+        Set<ConstructProjection> projections = new HashSet<>();
+        for(StatementPattern pattern: patterns) {
+            projections.add(new ConstructProjection(pattern));
+        }
+        this.projections = projections;
+        this.bNodeNames = getBNodeNames(projections);
+    }
+    
+    private Set<String> getBNodeNames(Set<ConstructProjection> projections) {
+        Set<String> bNodeNames = new HashSet<>();
+        for (ConstructProjection projection : projections) {
+            Optional<Value> optVal = projection.getSubjValue();
+            if (optVal.isPresent() && optVal.get() instanceof BNode) {
+                bNodeNames.add(projection.getSubjectSourceName());
+            }
+        }
+        return bNodeNames;
+    }
+    
+    private Map<String, BNode> getBNodeMap() {
+        Map<String, BNode> bNodeMap = new HashMap<>();
+        for(String name: bNodeNames) {
+            bNodeMap.put(name, new BNodeImpl(UUID.randomUUID().toString()));
+        }
+        return bNodeMap;
+    }
+    
+    /**
+     * @return - the {@link ConstructProjection}s used to build the construct graph
+     * returned by {@link ConstructGraph#createGraphFromBindingSet(VisibilityBindingSet)}.
+     */
+    public Set<ConstructProjection> getProjections() {
+        return projections;
+    }
+    
+    /**
+     * Creates a construct query graph represented as a Set of {@link RyaStatement}s 
+     * @param bs - VisiblityBindingSet used to build statement BindingSets
+     * @return - Set of RyaStatements that represent a construct query graph.  
+     */
+    public Set<RyaStatement> createGraphFromBindingSet(VisibilityBindingSet bs) {
+        Set<RyaStatement> bSets = new HashSet<>();
+        long ts = System.currentTimeMillis();
+        Map<String, BNode> bNodes = getBNodeMap();
+        for(ConstructProjection projection: projections) {
+            RyaStatement statement = projection.projectBindingSet(bs, bNodes);
+            //ensure that all RyaStatements in graph have the same timestamp
+            statement.setTimestamp(ts);
+            bSets.add(statement);
+        }
+        return bSets;
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if(this == o) {
+            return true;
+        }
+        
+        if(o instanceof ConstructGraph) {
+            ConstructGraph graph = (ConstructGraph) o;
+            return this.projections.equals(graph.projections);
+        }
+        return false;
+    }
+    
+    @Override
+    public int hashCode() {
+        int hash = 17;
+        for(ConstructProjection projection: projections) {
+            hash += projection.hashCode();
+        }
+        
+        return hash;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java
new file mode 100644
index 0000000..82a6c6c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphSerializer.java
@@ -0,0 +1,52 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Converts {@link ConstructGraph}s to and from Strings for
+ * storage and retrieval from Fluo. 
+ *
+ */
+public class ConstructGraphSerializer {
+
+    public static final String SP_DELIM = "\u0002";
+    
+    public static ConstructGraph toConstructGraph(String graphString) {
+        Set<ConstructProjection> projections = new HashSet<>();
+        String[] spStrings = graphString.split(SP_DELIM);
+        for(String sp: spStrings) {
+           projections.add(new ConstructProjection(FluoStringConverter.toStatementPattern(sp))); 
+        }
+        return new ConstructGraph(projections);
+    }
+    
+    public static String toConstructString(ConstructGraph graph) {
+        Set<ConstructProjection> projections = graph.getProjections();
+        Set<String> spStrings = new HashSet<>();
+        for(ConstructProjection projection: projections) {
+            spStrings.add(FluoStringConverter.toStatementPatternString(projection.getStatementPatternRepresentation()));
+        }
+        return Joiner.on(SP_DELIM).join(spStrings);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
new file mode 100644
index 0000000..6c1aa01
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
@@ -0,0 +1,266 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.BNode;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.BNodeImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This class projects a VisibilityBindingSet onto a RyaStatement. The Binding
+ * {@link Value}s that get projected onto subject, predicate and object are
+ * indicated by the names {@link ConstructProjection#getSubjectSourceVar()},
+ * {@link ConstructProjection#getPredicateSourceVar()} and
+ * {@link ConstructProjection#getObjectSourceVar()} and must satisfy standard
+ * RDF constraints for RDF subjects, predicates and objects. The purpose of
+ * projecting {@link BindingSet}s in this way is to provide functionality for
+ * SPARQL Construct queries which create RDF statements from query results.
+ *
+ */
+public class ConstructProjection {
+
+    private static final Logger log = Logger.getLogger(ConstructProjection.class);
+    private String subjName;
+    private String predName;
+    private String objName;
+    private Optional<Value> subjValue;
+    private Optional<Value> predValue;
+    private Optional<Value> objValue;
+    private Var subjVar;
+    private Var predVar;
+    private Var objVar;
+
+    public ConstructProjection(Var subjectVar, Var predicateVar, Var objectVar) {
+        Preconditions.checkNotNull(subjectVar);
+        Preconditions.checkNotNull(predicateVar);
+        Preconditions.checkNotNull(objectVar);
+        subjName = subjectVar.getName();
+        predName = predicateVar.getName();
+        objName = objectVar.getName();
+        Preconditions.checkNotNull(subjName);
+        Preconditions.checkNotNull(predName);
+        Preconditions.checkNotNull(objName);
+        this.subjVar = subjectVar;
+        this.predVar = predicateVar;
+        this.objVar = objectVar;
+        if((subjVar.isAnonymous() || subjName.startsWith("-anon-")) && subjectVar.getValue() == null) {
+            subjValue = Optional.of(new BNodeImpl(""));
+        } else {
+            subjValue = Optional.ofNullable(subjectVar.getValue());
+        }
+        predValue = Optional.ofNullable(predicateVar.getValue());
+        objValue = Optional.ofNullable(objectVar.getValue());
+    }
+
+    public ConstructProjection(StatementPattern pattern) {
+        this(pattern.getSubjectVar(), pattern.getPredicateVar(), pattern.getObjectVar());
+    }
+
+    /**
+     * Returns a Var with info about the Value projected onto the RyaStatement
+     * subject. If the org.openrdf.query.algebra.Var returned by this method is
+     * not constant (as indicated by {@link Var#isConstant()}, then
+     * {@link Var#getName()} is the Binding name that gets projected. If the Var
+     * is constant, then {@link Var#getValue()} is assigned to the subject
+     * 
+     * @return {@link org.openrdf.query.algebra.Var} containing info about
+     *         Binding that gets projected onto the subject
+     */
+    public String getSubjectSourceName() {
+        return subjName;
+    }
+
+    /**
+     * Returns a Var with info about the Value projected onto the RyaStatement
+     * predicate. If the org.openrdf.query.algebra.Var returned by this method
+     * is not constant (as indicated by {@link Var#isConstant()}, then
+     * {@link Var#getName()} is the Binding name that gets projected. If the Var
+     * is constant, then {@link Var#getValue()} is assigned to the predicate
+     * 
+     * @return {@link org.openrdf.query.algebra.Var} containing info about
+     *         Binding that gets projected onto the predicate
+     */
+    public String getPredicateSourceName() {
+        return predName;
+    }
+
+    /**
+     * Returns a Var with info about the Value projected onto the RyaStatement
+     * object. If the org.openrdf.query.algebra.Var returned by this method is
+     * not constant (as indicated by {@link Var#isConstant()}, then
+     * {@link Var#getName()} is the Binding name that gets projected. If the Var
+     * is constant, then {@link Var#getValue()} is assigned to the object
+     * 
+     * @return {@link org.openrdf.query.algebra.Var} containing info about
+     *         Binding that gets projected onto the object
+     */
+    public String getObjectSourceName() {
+        return objName;
+    }
+
+    /**
+     * @return Value set for RyaStatement subject (if present)
+     */
+    public Optional<Value> getSubjValue() {
+        return subjValue;
+    }
+
+    /**
+     * @return Value set for RyaStatement predicate (if present)
+     */
+    public Optional<Value> getPredValue() {
+        return predValue;
+    }
+
+    /**
+     * @return Value set for RyaStatement object (if present)
+     */
+    public Optional<Value> getObjValue() {
+        return objValue;
+    }
+    
+
+    /**
+     * @return SubjectPattern representation of this ConstructProjection
+     *         containing the {@link ConstructProjection#subjectSourceVar},
+     *         {@link ConstructProjection#predicateSourceVar},
+     *         {@link ConstructProjection#objectSourceVar}
+     */
+    public StatementPattern getStatementPatternRepresentation() {
+        return new StatementPattern(subjVar, predVar, objVar);
+    }
+
+    /**
+     * Projects a given BindingSet onto a RyaStatement. The subject, predicate,
+     * and object are extracted from the input VisibilityBindingSet (if the
+     * subjectSourceVar, predicateSourceVar, objectSourceVar is resp.
+     * non-constant) and from the Var Value itself (if subjectSourceVar,
+     * predicateSource, objectSourceVar is resp. constant).
+     * 
+     * 
+     * @param vBs
+     *            - Visibility BindingSet that gets projected onto an RDF
+     *            Statement BindingSet with Binding names subject, predicate and
+     *            object
+     * @param   bNodeMap - Optional Map used to pass {@link BNode}s for given variable names into
+     *          multiple {@link ConstructProjection}s.  This allows a ConstructGraph to create
+     *          RyaStatements with the same BNode for a given variable name across multiple ConstructProjections.
+     * @return - RyaStatement whose values are determined by
+     *         {@link ConstructProjection#getSubjectSourceVar()},
+     *         {@link ConstructProjection#getPredicateSourceVar()},
+     *         {@link ConstructProjection#getObjectSourceVar()}.
+     * 
+     */
+    public RyaStatement projectBindingSet(VisibilityBindingSet vBs, Map<String, BNode> bNodes) {
+     
+        Preconditions.checkNotNull(vBs);
+        Preconditions.checkNotNull(bNodes);
+        
+        Value subj = getValue(subjName, subjValue, vBs, bNodes);
+        Value pred = getValue(predName, predValue, vBs, bNodes);
+        Value obj = getValue(objName, objValue, vBs, bNodes);
+        
+        Preconditions.checkNotNull(subj);
+        Preconditions.checkNotNull(pred);
+        Preconditions.checkNotNull(obj);
+        Preconditions.checkArgument(subj instanceof Resource);
+        Preconditions.checkArgument(pred instanceof URI);
+
+        RyaURI subjType = RdfToRyaConversions.convertResource((Resource) subj);
+        RyaURI predType = RdfToRyaConversions.convertURI((URI) pred);
+        RyaType objectType = RdfToRyaConversions.convertValue(obj);
+
+        RyaStatement statement = new RyaStatement(subjType, predType, objectType);
+        try {
+            statement.setColumnVisibility(vBs.getVisibility().getBytes("UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            log.trace("Unable to decode column visibility.  RyaStatement being created without column visibility.");
+        }
+        return statement;
+    }
+    
+    private Value getValue(String name, Optional<Value> optValue, VisibilityBindingSet bs, Map<String, BNode> bNodes) {
+        Value returnValue = null;
+        if (optValue.isPresent()) {
+            Value tempValue = optValue.get();
+            if(tempValue instanceof BNode) {
+                Preconditions.checkArgument(bNodes.containsKey(name));
+                returnValue = bNodes.get(name);
+            } else {
+                returnValue = tempValue;
+            }
+        } else {
+            returnValue = bs.getValue(name);
+        }
+        return returnValue;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o instanceof ConstructProjection) {
+            ConstructProjection projection = (ConstructProjection) o;
+            return new EqualsBuilder().append(this.subjName, projection.subjName).append(this.predName, projection.predName)
+                    .append(this.objName, projection.objName).append(this.subjValue, projection.subjValue)
+                    .append(this.predValue, projection.predValue).append(this.objValue, projection.objValue).isEquals();
+        }
+        return false;
+
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(this.subjName, this.predName, this.objName, this.subjValue, this.predValue, this.objValue);
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Construct Projection {\n").append("   Subject Name: " + subjName + "\n")
+                .append("   Subject Value: " + subjValue + "\n").append("   Predicate Name: " + predName + "\n")
+                .append("   Predicate Value: " + predValue + "\n").append("   Object Name: " + objName + "\n")
+                .append("   Object Value: " + objValue + "\n").append("}").toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
new file mode 100644
index 0000000..d8d60b5
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
@@ -0,0 +1,91 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaSchema;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * This class creates results for the ConstructQuery.  This class applies the {@link ConstructGraph}
+ * associated with the Construct Query to generate a collection of {@link RyaStatement}s.  These statements
+ * are then used to form a {@link RyaSubGraph} that is serialized and stored as a value in the Column 
+ * {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}.
+ *
+ */
+public class ConstructQueryResultUpdater {
+
+    private static final Logger log = Logger.getLogger(ConstructQueryResultUpdater.class);
+    private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe();
+    
+    /**
+     * Updates the Construct Query results by applying the {@link ConnstructGraph} to
+     * create a {@link RyaSubGraph} and then writing the subgraph to {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}.
+     * @param tx - transaction used to write the subgraph
+     * @param bs - BindingSet that the ConstructProjection expands into a subgraph
+     * @param metadata - metadata that the ConstructProjection is extracted from
+     */
+    public void updateConstructQueryResults(TransactionBase tx, VisibilityBindingSet bs, ConstructQueryMetadata metadata) {
+        
+        String nodeId = metadata.getNodeId();
+        Column column = FluoQueryColumns.CONSTRUCT_STATEMENTS;
+        ConstructGraph graph = metadata.getConstructGraph();
+        
+        try {
+            Set<RyaStatement> statements = graph.createGraphFromBindingSet(bs);
+            RyaSubGraph subgraph = new RyaSubGraph(metadata.getNodeId(), statements);
+            String resultId = nodeId + "_" + getSubGraphId(subgraph);
+            tx.set(Bytes.of(resultId), column, Bytes.of(serializer.toBytes(subgraph)));
+        } catch (Exception e) {
+            log.trace("Unable to serialize RyaStatement generated by ConstructGraph: " + graph + " from BindingSet: " + bs );
+        }
+    }
+    
+    /**
+     * Generates a simple hash used as an id for the subgraph.  Id generated as hash as opposed
+     * to UUID to avoid the same subgraph result being stored under multiple UUID.  
+     * @param subgraph - subgraph that an id is need for
+     * @return - hash of subgraph used as an id
+     */
+    private int getSubGraphId(RyaSubGraph subgraph) {
+        int id = 17;
+        id = 31*id + subgraph.getId().hashCode();
+        for(RyaStatement statement: subgraph.getStatements()) {
+            int statementId = 7;
+            if(!statement.getSubject().getData().startsWith(RyaSchema.BNODE_NAMESPACE)) {
+                statementId = 17*statementId + statement.getSubject().hashCode();
+            }
+            statementId = 17*statementId + statement.getPredicate().hashCode();
+            statementId = 17*statementId + statement.getObject().hashCode();
+            id += statementId;
+        }
+        return Math.abs(id);
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
index 5221c21..05a8d1c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
@@ -23,17 +23,25 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DE
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE;
 
+import java.util.UUID;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
+import org.openrdf.model.BNode;
 import org.openrdf.model.Literal;
 import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.BNodeImpl;
 import org.openrdf.model.impl.LiteralImpl;
 import org.openrdf.model.impl.URIImpl;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.algebra.Var;
 
+import com.google.common.base.Preconditions;
+
+import org.apache.rya.api.domain.RyaSchema;
 import org.apache.rya.api.domain.RyaType;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 
@@ -85,25 +93,33 @@ public class FluoStringConverter {
      */
     public static Var toVar(final String varString) {
         checkNotNull(varString);
-
-        if(varString.startsWith("-const-")) {
-            // The variable is a constant value.
-            final String[] varParts = varString.split(TYPE_DELIM);
-            final String name = varParts[0];
-            final String valueString = name.substring("-const-".length());
-
+        final String[] varParts = varString.split(TYPE_DELIM);
+        final String name = varParts[0];
+        
+        // The variable is a constant value.
+        if(varParts.length > 1) {
             final String dataTypeString = varParts[1];
             if(dataTypeString.equals(URI_TYPE)) {
                 // Handle a URI object.
+                Preconditions.checkArgument(varParts.length == 2);
+                final String valueString = name.substring("-const-".length());
                 final Var var = new Var(name, new URIImpl(valueString));
-                var.setAnonymous(true);
+                var.setConstant(true);
+                return var;
+            } else if(dataTypeString.equals(RyaSchema.BNODE_NAMESPACE)) { 
+                // Handle a BNode object
+                Preconditions.checkArgument(varParts.length == 3);
+                Var var = new Var(name);
+                var.setValue(new BNodeImpl(varParts[2]));
                 return var;
             } else {
-                // Literal value.
+                // Handle a Literal Value.
+                Preconditions.checkArgument(varParts.length == 2);
+                final String valueString = name.substring("-const-".length());
                 final URI dataType = new URIImpl(dataTypeString);
                 final Literal value = new LiteralImpl(valueString, dataType);
                 final Var var = new Var(name, value);
-                var.setAnonymous(true);
+                var.setConstant(true);
                 return var;
             }
         } else {
@@ -126,19 +142,24 @@ public class FluoStringConverter {
 
         final Var subjVar = sp.getSubjectVar();
         String subj = subjVar.getName();
-        if(subjVar.isConstant()) {
-            subj = subj + TYPE_DELIM + URI_TYPE;
-        }
+        if(subjVar.getValue() != null) {
+            Value subjValue = subjVar.getValue();
+            if (subjValue instanceof BNode ) {
+                subj = subj + TYPE_DELIM + RyaSchema.BNODE_NAMESPACE + TYPE_DELIM + ((BNode) subjValue).getID(); 
+            } else {
+                subj = subj + TYPE_DELIM + URI_TYPE;
+            }
+        } 
 
         final Var predVar = sp.getPredicateVar();
         String pred = predVar.getName();
-        if(predVar.isConstant()) {
+        if(predVar.getValue() != null) {
             pred = pred + TYPE_DELIM + URI_TYPE;
         }
 
         final Var objVar = sp.getObjectVar();
         String obj = objVar.getName();
-        if (objVar.isConstant()) {
+        if (objVar.getValue() != null) {
             final RyaType rt = RdfToRyaConversions.convertValue(objVar.getValue());
             obj =  obj + TYPE_DELIM + rt.getDataType().stringValue();
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index be4df71..f9d14b5 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -33,6 +33,7 @@ public class IncrementalUpdateConstants {
     public static final String FILTER_PREFIX = "FILTER";
     public static final String AGGREGATION_PREFIX = "AGGREGATION";
     public static final String QUERY_PREFIX = "QUERY";
+    public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
 
     public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI";
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
index 5365e30..b829b7e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
@@ -20,6 +20,7 @@ package org.apache.rya.indexing.pcj.fluo.app;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
@@ -30,7 +31,6 @@ import java.util.List;
 import org.apache.fluo.api.data.Column;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QueryNodeMetadataColumns;
-import org.openrdf.query.BindingSet;
 
 import com.google.common.base.Optional;
 
@@ -42,23 +42,24 @@ public enum NodeType {
     JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET),
     STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET),
     QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET),
-    AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET);
+    AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET),
+    CONSTRUCT(QueryNodeMetadataColumns.CONSTRUCT_COLUMNS, FluoQueryColumns.CONSTRUCT_STATEMENTS);
 
     //Metadata Columns associated with given NodeType
     private QueryNodeMetadataColumns metadataColumns;
 
-    //Column where BindingSet results are stored for given NodeType
-    private Column bindingSetColumn;
+    //Column where results are stored for given NodeType
+    private Column resultColumn;
 
     /**
      * Constructs an instance of {@link NodeType}.
      *
      * @param metadataColumns - Metadata {@link Column}s associated with this {@link NodeType}. (not null)
-     * @param bindingSetColumn - The {@link Column} used to store this {@link NodeType|'s {@link BindingSet}s. (not null)
+     * @param resultColumn - The {@link Column} used to store this {@link NodeType}'s results. (not null)
      */
-    private NodeType(final QueryNodeMetadataColumns metadataColumns, final Column bindingSetColumn) {
+    private NodeType(QueryNodeMetadataColumns metadataColumns, Column resultColumn) {
     	this.metadataColumns = requireNonNull(metadataColumns);
-    	this.bindingSetColumn = requireNonNull(bindingSetColumn);
+    	this.resultColumn = requireNonNull(resultColumn);
     }
 
     /**
@@ -70,10 +71,10 @@ public enum NodeType {
 
 
     /**
-     * @return The {@link Column} used to store this {@link NodeType|'s {@link BindingSet}s.
+     * @return The {@link Column} used to store this {@link NodeType}'s query results.
      */
-    public Column getBsColumn() {
-    	return bindingSetColumn;
+    public Column getResultColumn() {
+    	return resultColumn;
     }
 
     /**
@@ -98,6 +99,8 @@ public enum NodeType {
             type = QUERY;
         } else if(nodeId.startsWith(AGGREGATION_PREFIX)) {
             type = AGGREGATION;
+        } else if(nodeId.startsWith(CONSTRUCT_PREFIX)) {
+            type = CONSTRUCT;
         }
 
         return Optional.fromNullable(type);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
new file mode 100644
index 0000000..c2f4cb4
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Exports a single Binding Set that is a new result for a SPARQL query to some
+ * other location.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface IncrementalBindingSetExporter extends AutoCloseable {
+
+    /**
+     * Export a Binding Set that is a result of a SPARQL query that does not include a Group By clause.
+     *
+     * @param tx - The Fluo transaction this export is a part of. (not null)
+     * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null)
+     * @param bindingSetString - The Binding Set as it was represented within the Fluo application. (not null)
+     * @throws ResultExportException The result could not be exported.
+     */
+    public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException;
+
+    /**
+     * A result could not be exported.
+     */
+    public static class ResultExportException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link ResultExportException}.
+         *
+         * @param message - Explains why the exception was thrown.
+         */
+        public ResultExportException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link ResultExportException}.
+         *
+         * @param message - Explains why the exception was thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public ResultExportException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
new file mode 100644
index 0000000..1bf492a
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+import com.google.common.base.Optional;
+
+import org.apache.fluo.api.observer.Observer.Context;
+
+/**
+ * Builds instances of {@link IncrementalBindingSetExporter} using the provided
+ * configurations.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface IncrementalBindingSetExporterFactory {
+
+    /**
+     * Builds an instance of {@link IncrementalBindingSetExporter} using the
+     * configurations that are provided.
+     *
+     * @param context - Contains the host application's configuration values
+     *   and any parameters that were provided at initialization. (not null)
+     * @return An exporter if configurations were found in the context; otherwise absent.
+     * @throws IncrementalExporterFactoryException A non-configuration related
+     *   problem has occurred and the exporter could not be created as a result.
+     * @throws ConfigurationException Thrown if configuration values were
+     *   provided, but an instance of the exporter could not be initialized
+     *   using them. This could be because they were improperly formatted,
+     *   a required field was missing, or some other configuration based problem.
+     */
+    public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
+
+    /**
+     * Indicates a {@link IncrementalBindingSetExporter} could not be created by a
+     * {@link IncrementalBindingSetExporterFactory}.
+     */
+    public static class IncrementalExporterFactoryException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link }.
+         *
+         * @param message - Explains why this exception is being thrown.
+         */
+        public IncrementalExporterFactoryException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link }.
+         *
+         * @param message - Explains why this exception is being thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public IncrementalExporterFactoryException(final String message, final Throwable t) {
+            super(message, t);
+        }
+    }
+
+    /**
+     * The configuration could not be interpreted because required fields were
+     * missing or a value wasn't properly formatted.
+     */
+    public static class ConfigurationException extends IncrementalExporterFactoryException {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs an instance of {@link ConfigurationException}.
+         *
+         * @param message - Explains why this exception is being thrown.
+         */
+        public ConfigurationException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs an instance of {@link ConfigurationException}.
+         *
+         * @param message - Explains why this exception is being thrown.
+         * @param cause - The exception that caused this one to be thrown.
+         */
+        public ConfigurationException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
deleted file mode 100644
index 02dced7..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Exports a single Binding Set that is a new result for a SPARQL query to some
- * other location.
- */
-@DefaultAnnotation(NonNull.class)
-public interface IncrementalResultExporter extends AutoCloseable {
-
-    /**
-     * Export a Binding Set that is a result of a SPARQL query that does not include a Group By clause.
-     *
-     * @param tx - The Fluo transaction this export is a part of. (not null)
-     * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null)
-     * @param bindingSetString - The Binding Set as it was represented within the Fluo application. (not null)
-     * @throws ResultExportException The result could not be exported.
-     */
-    public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException;
-
-    /**
-     * A result could not be exported.
-     */
-    public static class ResultExportException extends Exception {
-        private static final long serialVersionUID = 1L;
-
-        /**
-         * Constructs an instance of {@link ResultExportException}.
-         *
-         * @param message - Explains why the exception was thrown.
-         */
-        public ResultExportException(final String message) {
-            super(message);
-        }
-
-        /**
-         * Constructs an instance of {@link ResultExportException}.
-         *
-         * @param message - Explains why the exception was thrown.
-         * @param cause - The exception that caused this one to be thrown.
-         */
-        public ResultExportException(final String message, final Throwable cause) {
-            super(message, cause);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
deleted file mode 100644
index f9fe2bd..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.observer.Observer.Context;
-
-/**
- * Builds instances of {@link IncrementalResultExporter} using the provided
- * configurations.
- */
-@DefaultAnnotation(NonNull.class)
-public interface IncrementalResultExporterFactory {
-
-    /**
-     * Builds an instance of {@link IncrementalResultExporter} using the
-     * configurations that are provided.
-     *
-     * @param context - Contains the host application's configuration values
-     *   and any parameters that were provided at initialization. (not null)
-     * @return An exporter if configurations were found in the context; otherwise absent.
-     * @throws IncrementalExporterFactoryException A non-configuration related
-     *   problem has occurred and the exporter could not be created as a result.
-     * @throws ConfigurationException Thrown if configuration values were
-     *   provided, but an instance of the exporter could not be initialized
-     *   using them. This could be because they were improperly formatted,
-     *   a required field was missing, or some other configuration based problem.
-     */
-    public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
-
-    /**
-     * Indicates a {@link IncrementalResultExporter} could not be created by a
-     * {@link IncrementalResultExporterFactory}.
-     */
-    public static class IncrementalExporterFactoryException extends Exception {
-        private static final long serialVersionUID = 1L;
-
-        /**
-         * Constructs an instance of {@link }.
-         *
-         * @param message - Explains why this exception is being thrown.
-         */
-        public IncrementalExporterFactoryException(final String message) {
-            super(message);
-        }
-
-        /**
-         * Constructs an instance of {@link }.
-         *
-         * @param message - Explains why this exception is being thrown.
-         * @param cause - The exception that caused this one to be thrown.
-         */
-        public IncrementalExporterFactoryException(final String message, final Throwable t) {
-            super(message, t);
-        }
-    }
-
-    /**
-     * The configuration could not be interpreted because required fields were
-     * missing or a value wasn't properly formatted.
-     */
-    public static class ConfigurationException extends IncrementalExporterFactoryException {
-        private static final long serialVersionUID = 1L;
-
-        /**
-         * Constructs an instance of {@link ConfigurationException}.
-         *
-         * @param message - Explains why this exception is being thrown.
-         */
-        public ConfigurationException(final String message) {
-            super(message);
-        }
-
-        /**
-         * Constructs an instance of {@link ConfigurationException}.
-         *
-         * @param message - Explains why this exception is being thrown.
-         * @param cause - The exception that caused this one to be thrown.
-         */
-        public ConfigurationException(final String message, final Throwable cause) {
-            super(message, cause);
-        }
-    }
-}
\ No newline at end of file