You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/06/22 18:05:15 UTC

[3/4] incubator-rya git commit: RYA-273-Construct Query Support. Closes #161.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
new file mode 100644
index 0000000..797502c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
@@ -0,0 +1,39 @@
+package org.apache.rya.indexing.pcj.fluo.app.export;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+
+/**
+ * Incrementally exports {@link RyaSubGraph}s that are generated by SPARQL Construct Queries
+ * from the Rya-Fluo application to the core Rya tables.
+ *
+ */
+public interface IncrementalRyaSubGraphExporter extends AutoCloseable {
+
+    /**
+     * Export a RyaSubGraph that is the result of SPARQL Construct Query.
+     *
+     * @param constructID - The Fluo Id of the construct query the created the RyaSubGraph
+     * @param subgraph - The RyaSubGraph to export (non-null)
+     * @throws ResultExportException The result could not be exported.
+     */
+    public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
new file mode 100644
index 0000000..ecbec09
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
@@ -0,0 +1,47 @@
+package org.apache.rya.indexing.pcj.fluo.app.export;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+
+import com.google.common.base.Optional;
+
+/**
+ * Builds instances of {@link IncrementalRyaSubGraphExporter} using the provided
+ * configurations.
+ */
+public interface IncrementalRyaSubGraphExporterFactory {
+
+    /**
+     * Builds an instance of {@link IncrementalRyaSubGraphExporter} using the
+     * configurations that are provided.
+     *
+     * @param context - Contains the host application's configuration values
+     *   and any parameters that were provided at initialization. (not null)
+     * @return An exporter if configurations were found in the context; otherwise absent.
+     * @throws IncrementalExporterFactoryException A non-configuration related
+     *   problem has occurred and the exporter could not be created as a result.
+     * @throws ConfigurationException Thrown if configuration values were
+     *   provided, but an instance of the exporter could not be initialized
+     *   using them. This could be because they were improperly formatted,
+     *   a required field was missing, or some other configuration based problem.
+     */
+    public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
new file mode 100644
index 0000000..152d156
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * Incrementally exports SPARQL query results to Kafka topics.
+ */
+public class KafkaBindingSetExporter implements IncrementalBindingSetExporter {
+    private static final Logger log = Logger.getLogger(KafkaBindingSetExporter.class);
+
+    private final KafkaProducer<String, VisibilityBindingSet> producer;
+
+    /**
+     * Constructs an instance given a Kafka producer.
+     *
+     * @param producer
+     *            for sending result set alerts to a broker. (not null)
+     *            Can be created and configured by {@link KafkaBindingSetExporterFactory}
+     */
+    public KafkaBindingSetExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
+        super();
+        checkNotNull(producer, "Producer is required.");
+        this.producer = producer;
+    }
+
+    /**
+     * Send the results to the topic using the queryID as the topicname
+     */
+    @Override
+    public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
+        checkNotNull(fluoTx);
+        checkNotNull(queryId);
+        checkNotNull(result);
+        try {
+            final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
+            final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
+            log.trace(msg);
+
+            // Send the result to the topic whose name matches the PCJ ID.
+            final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result);
+            final Future<RecordMetadata> future = producer.send(rec);
+
+            // Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
+            future.get();
+
+            log.debug("producer.send(rec) completed");
+
+        } catch (final Throwable e) {
+            throw new ResultExportException("A result could not be exported to Kafka.", e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        producer.close(5, TimeUnit.SECONDS);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
new file mode 100644
index 0000000..5507037
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import com.google.common.base.Optional;
+
+/**
+ * Creates instances of {@link KafkaBindingSetExporter}.
+ * <p/>
+ * Configure a Kafka producer by adding several required Key/values as described here:
+ * http://kafka.apache.org/documentation.html#producerconfigs
+ * <p/>
+ * Here is a simple example:
+ * <pre>
+ *     Properties producerConfig = new Properties();
+ *     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ *     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ *     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ * </pre>
+ * 
+ * @see ProducerConfig
+ */
+public class KafkaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory {
+    private static final Logger log = Logger.getLogger(KafkaBindingSetExporterFactory.class);
+    @Override
+    public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+        final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
+        log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
+        if (exportParams.isExportToKafka()) {
+            // Setup Kafka connection
+            KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig());
+            // Create the exporter
+            final IncrementalBindingSetExporter exporter = new KafkaBindingSetExporter(producer);
+            return Optional.of(exporter);
+        } else {
+            return Optional.absent();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
deleted file mode 100644
index 72ec947..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-/**
- * Incrementally exports SPARQL query results to Kafka topics.
- */
-public class KafkaResultExporter implements IncrementalResultExporter {
-    private static final Logger log = Logger.getLogger(KafkaResultExporter.class);
-
-    private final KafkaProducer<String, VisibilityBindingSet> producer;
-
-    /**
-     * Constructs an instance given a Kafka producer.
-     *
-     * @param producer
-     *            for sending result set alerts to a broker. (not null)
-     *            Can be created and configured by {@link KafkaResultExporterFactory}
-     */
-    public KafkaResultExporter(final KafkaProducer<String, VisibilityBindingSet> producer) {
-        super();
-        checkNotNull(producer, "Producer is required.");
-        this.producer = producer;
-    }
-
-    /**
-     * Send the results to the topic using the queryID as the topicname
-     */
-    @Override
-    public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
-        checkNotNull(fluoTx);
-        checkNotNull(queryId);
-        checkNotNull(result);
-        try {
-            final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
-            final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
-            log.trace(msg);
-
-            // Send the result to the topic whose name matches the PCJ ID.
-            final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result);
-            final Future<RecordMetadata> future = producer.send(rec);
-
-            // Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
-            future.get();
-
-            log.debug("producer.send(rec) completed");
-
-        } catch (final Throwable e) {
-            throw new ResultExportException("A result could not be exported to Kafka.", e);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        producer.close(5, TimeUnit.SECONDS);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
deleted file mode 100644
index 995e9d9..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
-
-import org.apache.fluo.api.observer.Observer.Context;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-import com.google.common.base.Optional;
-
-/**
- * Creates instances of {@link KafkaResultExporter}.
- * <p/>
- * Configure a Kafka producer by adding several required Key/values as described here:
- * http://kafka.apache.org/documentation.html#producerconfigs
- * <p/>
- * Here is a simple example:
- * <pre>
- *     Properties producerConfig = new Properties();
- *     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- *     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- *     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- * </pre>
- * 
- * @see ProducerConfig
- */
-public class KafkaResultExporterFactory implements IncrementalResultExporterFactory {
-    private static final Logger log = Logger.getLogger(KafkaResultExporterFactory.class);
-    @Override
-    public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
-        final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
-        log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
-        if (exportParams.isExportToKafka()) {
-            // Setup Kafka connection
-            KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig());
-            // Create the exporter
-            final IncrementalResultExporter exporter = new KafkaResultExporter(producer);
-            return Optional.of(exporter);
-        } else {
-            return Optional.absent();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
new file mode 100644
index 0000000..a15743f
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
@@ -0,0 +1,81 @@
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application 
+ *
+ */
+public class KafkaRyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
+
+    private final KafkaProducer<String, RyaSubGraph> producer;
+    private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporter.class);
+
+    public KafkaRyaSubGraphExporter(KafkaProducer<String, RyaSubGraph> producer) {
+        checkNotNull(producer);
+        this.producer = producer;
+    }
+    
+    /**
+     * Exports the RyaSubGraph to a Kafka topic equivalent to the result returned by {@link RyaSubGraph#getId()}
+     * @param subgraph - RyaSubGraph exported to Kafka
+     * @param contructID - rowID of result that is exported. Used for logging purposes.
+     */
+    @Override
+    public void export(String constructID, RyaSubGraph subGraph) throws ResultExportException {
+        checkNotNull(constructID);
+        checkNotNull(subGraph);
+        try {
+            // Send the result to the topic whose name matches the PCJ ID.
+            final ProducerRecord<String, RyaSubGraph> rec = new ProducerRecord<>(subGraph.getId(), subGraph);
+            final Future<RecordMetadata> future = producer.send(rec);
+
+            // Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
+            future.get();
+
+            log.debug("Producer successfully sent record with id: " + constructID + " and statements: " + subGraph.getStatements());
+
+        } catch (final Throwable e) {
+            throw new ResultExportException("A result could not be exported to Kafka.", e);
+        }
+    }
+
+    /**
+     * Closes exporter.
+     */
+    @Override
+    public void close() throws Exception {
+        producer.close(5, TimeUnit.SECONDS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
new file mode 100644
index 0000000..2c1e4c0
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
@@ -0,0 +1,62 @@
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory;
+
+import com.google.common.base.Optional;
+
+/**
+ * Factory for creating {@link KafkaRyaSubGraphExporter}s that are used for
+ * exporting {@link RyaSubGraph}s from the Rya Fluo application to Kafka.
+ *
+ */
+public class KafkaRyaSubGraphExporterFactory implements IncrementalRyaSubGraphExporterFactory {
+
+    private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporterFactory.class);
+    
+    /**
+     * Builds a {@link KafkaRyaSubGraphExporter}.
+     * @param context - {@link Context} object used to pass configuration parameters
+     * @return an Optional consisting of an IncrementalSubGraphExproter if it can be constructed
+     * @throws IncrementalExporterFactoryException
+     * @throws ConfigurationException
+     */
+    @Override
+    public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+        final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
+        log.debug("KafkaRyaSubGraphExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
+        if (exportParams.isExportToKafka()) {
+            // Setup Kafka connection
+            KafkaProducer<String, RyaSubGraph> producer = new KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig());
+            // Create the exporter
+            final IncrementalRyaSubGraphExporter exporter = new KafkaRyaSubGraphExporter(producer);
+            return Optional.of(exporter);
+        } else {
+            return Optional.absent();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java
new file mode 100644
index 0000000..ed20e8a
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java
@@ -0,0 +1,100 @@
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.serialization.kryo.RyaSubGraphSerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for {@link RyaSubGraph}s.
+ *
+ */
+public class RyaSubGraphKafkaSerDe implements Serializer<RyaSubGraph>, Deserializer<RyaSubGraph> {
+
+    private final Kryo kryo;
+    
+    public RyaSubGraphKafkaSerDe() {
+        kryo = new Kryo();
+        kryo.register(RyaSubGraph.class,new RyaSubGraphSerializer());
+    }
+    
+  /**
+   * Deserializes from bytes to a RyaSubGraph
+   * @param bundleBytes - byte representation of RyaSubGraph
+   * @return - Deserialized RyaSubGraph
+   */
+    public RyaSubGraph fromBytes(byte[] bundleBytes) {
+        return kryo.readObject(new Input(bundleBytes), RyaSubGraph.class);
+    }
+
+    /**
+     * Serializes RyaSubGraph to bytes
+     * @param bundle - RyaSubGraph to be serialized
+     * @return - serialized bytes from RyaSubGraph
+     */
+    public byte[] toBytes(RyaSubGraph bundle) {
+        Output output = new Output(new ByteArrayOutputStream());
+        kryo.writeObject(output, bundle, new RyaSubGraphSerializer());
+        return output.getBuffer();
+    }
+
+    /**
+     * Deserializes RyaSubGraph
+     * @param topic - topic that data is associated with (no effect) 
+     * @param bundleBytes - bytes to be deserialized
+     * @return - deserialized RyaSubGraph
+     */
+    @Override
+    public RyaSubGraph deserialize(String topic, byte[] bundleBytes) {
+        return fromBytes(bundleBytes);
+    }
+
+    /**
+     * Serializes RyaSubGraph
+     * @param subgraph - subgraph to be serialized
+     * @param topic - topic that data is associated with
+     * @return - serialized bytes from subgraph
+     */
+    @Override
+    public byte[] serialize(String topic, RyaSubGraph subgraph) {
+        return toBytes(subgraph);
+    }
+
+    /**
+     * Closes serializer (no effect)
+     */
+    @Override
+    public void close() {
+    }
+    
+    /**
+     * Configures serializer (no effect)
+     */
+    @Override
+    public void configure(Map<String, ?> arg0, boolean arg1) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
new file mode 100644
index 0000000..84d3ce6
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collections;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
+ */
+public class RyaBindingSetExporter implements IncrementalBindingSetExporter {
+
+    private final PrecomputedJoinStorage pcjStorage;
+
+    /**
+     * Constructs an instance of {@link RyaBindingSetExporter}.
+     *
+     * @param pcjStorage - The PCJ storage the new results will be exported to. (not null)
+     */
+    public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage) {
+        this.pcjStorage = checkNotNull(pcjStorage);
+    }
+
+    @Override
+    public void export(
+            final TransactionBase fluoTx,
+            final String queryId,
+            final VisibilityBindingSet result) throws ResultExportException {
+        requireNonNull(fluoTx);
+        requireNonNull(queryId);
+        requireNonNull(result);
+
+        // Look up the ID the PCJ represents within the PCJ Storage.
+        final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
+
+        try {
+            pcjStorage.addResults(pcjId, Collections.singleton(result));
+        } catch (final PCJStorageException e) {
+            throw new ResultExportException("A result could not be exported to Rya.", e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        pcjStorage.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
new file mode 100644
index 0000000..86d593f
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+
+import com.google.common.base.Optional;
+
+import org.apache.fluo.api.observer.Observer.Context;
+
+/**
+ * Creates instances of {@link RyaBindingSetExporter}.
+ */
+public class RyaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory {
+
+    @Override
+    public Optional<IncrementalBindingSetExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+        checkNotNull(context);
+
+        // Wrap the context's parameters for parsing.
+        final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() );
+
+        if(params.isExportToRya()) {
+            // Setup Zookeeper connection info.
+            final String accumuloInstance = params.getAccumuloInstanceName().get();
+            final String zookeeperServers =  params.getZookeeperServers().get().replaceAll(";", ",");
+            final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers);
+
+            try {
+                // Setup Accumulo connection info.
+                final String exporterUsername = params.getExporterUsername().get();
+                final String exporterPassword = params.getExporterPassword().get();
+                final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword));
+
+                // Setup Rya PCJ Storage.
+                final String ryaInstanceName = params.getRyaInstanceName().get();
+                final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
+
+                // Make the exporter.
+                final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage);
+                return Optional.of(exporter);
+
+            } catch (final AccumuloException | AccumuloSecurityException e) {
+                throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e);
+            }
+        } else {
+            return Optional.absent();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
index cba6a43..a1ba5b8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
@@ -45,6 +45,7 @@ public class RyaExportParameters extends ParametersBase {
     public static final String CONF_EXPORTER_PASSWORD = "pcj.fluo.export.rya.exporterPassword";
 
     public static final String CONF_RYA_INSTANCE_NAME = "pcj.fluo.export.rya.ryaInstanceName";
+    public static final String CONF_FLUO_APP_NAME = "pcj.fluo.export.rya.fluo.application.name";
 
     /**
      * Constructs an instance of {@link RyaExportParameters}.
@@ -147,4 +148,18 @@ public class RyaExportParameters extends ParametersBase {
     public Optional<String> getExporterPassword() {
         return Optional.fromNullable( params.get(CONF_EXPORTER_PASSWORD) );
     }
+    
+    /**
+     * @param fluoApplicationName - The name of the Rya Fluo application
+     */
+    public void setFluoApplicationName(@Nullable final String fluoApplicationName) {
+        params.put(CONF_FLUO_APP_NAME, fluoApplicationName);
+    }
+    
+    /**
+     * @return The name of the Rya Fluo application
+     */
+    public Optional<String> getFluoApplicationName() {
+        return Optional.fromNullable(params.get(CONF_FLUO_APP_NAME));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
deleted file mode 100644
index b8b3c45..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.rya;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.Objects.requireNonNull;
-
-import java.util.Collections;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-/**
- * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
- */
-public class RyaResultExporter implements IncrementalResultExporter {
-
-    private final PrecomputedJoinStorage pcjStorage;
-
-    /**
-     * Constructs an instance of {@link RyaResultExporter}.
-     *
-     * @param pcjStorage - The PCJ storage the new results will be exported to. (not null)
-     */
-    public RyaResultExporter(final PrecomputedJoinStorage pcjStorage) {
-        this.pcjStorage = checkNotNull(pcjStorage);
-    }
-
-    @Override
-    public void export(
-            final TransactionBase fluoTx,
-            final String queryId,
-            final VisibilityBindingSet result) throws ResultExportException {
-        requireNonNull(fluoTx);
-        requireNonNull(queryId);
-        requireNonNull(result);
-
-        // Look up the ID the PCJ represents within the PCJ Storage.
-        final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
-
-        try {
-            pcjStorage.addResults(pcjId, Collections.singleton(result));
-        } catch (final PCJStorageException e) {
-            throw new ResultExportException("A result could not be exported to Rya.", e);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        pcjStorage.close();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
deleted file mode 100644
index c695272..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.rya;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.observer.Observer.Context;
-
-/**
- * Creates instances of {@link RyaResultExporter}.
- */
-public class RyaResultExporterFactory implements IncrementalResultExporterFactory {
-
-    @Override
-    public Optional<IncrementalResultExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException {
-        checkNotNull(context);
-
-        // Wrap the context's parameters for parsing.
-        final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() );
-
-        if(params.isExportToRya()) {
-            // Setup Zookeeper connection info.
-            final String accumuloInstance = params.getAccumuloInstanceName().get();
-            final String zookeeperServers =  params.getZookeeperServers().get().replaceAll(";", ",");
-            final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers);
-
-            try {
-                // Setup Accumulo connection info.
-                final String exporterUsername = params.getExporterUsername().get();
-                final String exporterPassword = params.getExporterPassword().get();
-                final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword));
-
-                // Setup Rya PCJ Storage.
-                final String ryaInstanceName = params.getRyaInstanceName().get();
-                final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
-
-                // Make the exporter.
-                final IncrementalResultExporter exporter = new RyaResultExporter(pcjStorage);
-                return Optional.of(exporter);
-
-            } catch (final AccumuloException | AccumuloSecurityException e) {
-                throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e);
-            }
-        } else {
-            return Optional.absent();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index dc4b3b4..ac131e3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -26,11 +26,13 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructQueryResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
@@ -57,6 +59,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
     private final FilterResultUpdater filterUpdater = new FilterResultUpdater();
     private final QueryResultUpdater queryUpdater = new QueryResultUpdater();
     private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater();
+    private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater();
 
     @Override
     public abstract ObservedColumn getObservedColumn();
@@ -102,6 +105,15 @@ public abstract class BindingSetUpdater extends AbstractObserver {
                 }
                 break;
 
+            case CONSTRUCT: 
+                final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId);
+                try{
+                    constructUpdater.updateConstructQueryResults(tx, observedBindingSet, constructQuery);
+                } catch (final Exception e) {
+                    throw new RuntimeException("Could not process a Query node.", e);
+                }
+                break;
+                
             case FILTER:
                 final FilterMetadata parentFilter = queryDao.readFilterMetadata(tx, parentNodeId);
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
new file mode 100644
index 0000000..f0fef07
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -0,0 +1,198 @@
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.resolver.triple.TripleRow;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
+ * Construct Query {@link RyaStatement}s and exports the results using the
+ * {@link IncrementalRyaSubGraphExporter}s that are registered with this
+ * Observer.
+ *
+ */
+public class ConstructQueryResultObserver extends AbstractObserver {
+
+    private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver();
+    private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class);
+    private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe();
+
+    /**
+     * We expect to see the same expressions a lot, so we cache the simplified
+     * forms.
+     */
+    private final Map<String, String> simplifiedVisibilities = new HashMap<>();
+
+    /**
+     * Builders for each type of result exporter we support.
+     */
+    private static final ImmutableSet<IncrementalRyaSubGraphExporterFactory> factories = ImmutableSet
+            .<IncrementalRyaSubGraphExporterFactory> builder().add(new KafkaRyaSubGraphExporterFactory()).build();
+
+    /**
+     * The exporters that are configured.
+     */
+    private ImmutableSet<IncrementalRyaSubGraphExporter> exporters = null;
+
+    /**
+     * Before running, determine which exporters are configured and set them up.
+     */
+    @Override
+    public void init(final Context context) {
+        final ImmutableSet.Builder<IncrementalRyaSubGraphExporter> exportersBuilder = ImmutableSet.builder();
+
+        for (final IncrementalRyaSubGraphExporterFactory builder : factories) {
+            try {
+                log.debug("ConstructQueryResultObserver.init(): for each exportersBuilder=" + builder);
+
+                final Optional<IncrementalRyaSubGraphExporter> exporter = builder.build(context);
+                if (exporter.isPresent()) {
+                    exportersBuilder.add(exporter.get());
+                }
+            } catch (final IncrementalExporterFactoryException e) {
+                log.error("Could not initialize a result exporter.", e);
+            }
+        }
+
+        exporters = exportersBuilder.build();
+    }
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+        return new ObservedColumn(FluoQueryColumns.CONSTRUCT_STATEMENTS, NotificationType.STRONG);
+    }
+
+    @Override
+    public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+        Bytes bytes = tx.get(row, col);
+        RyaSubGraph subgraph = serializer.fromBytes(bytes.toArray());
+        Set<RyaStatement> statements = subgraph.getStatements();
+        if (statements.size() > 0) {
+            byte[] visibility = statements.iterator().next().getColumnVisibility();
+            visibility = simplifyVisibilities(visibility);
+            for(RyaStatement statement: statements) {
+                statement.setColumnVisibility(visibility);
+            }
+            subgraph.setStatements(statements);
+
+            for (IncrementalRyaSubGraphExporter exporter : exporters) {
+                exporter.export(row.toString(), subgraph);
+            }
+        }
+        //add generated triples back into Fluo for chaining queries together
+        insertTriples(tx, subgraph.getStatements());
+    }
+    
+    @Override
+    public void close() {
+        if(exporters != null) {
+            for(final IncrementalRyaSubGraphExporter exporter : exporters) {
+                try {
+                    exporter.close();
+                } catch(final Exception e) {
+                    log.warn("Problem encountered while closing one of the exporters.", e);
+                }
+            }
+        }
+    }
+
+    private byte[] simplifyVisibilities(byte[] visibilityBytes) throws UnsupportedEncodingException {
+        // Simplify the result's visibilities and cache new simplified
+        // visibilities
+        String visibility = new String(visibilityBytes, "UTF-8");
+        if (!simplifiedVisibilities.containsKey(visibility)) {
+            String simplified = VisibilitySimplifier.simplify(visibility);
+            simplifiedVisibilities.put(visibility, simplified);
+        }
+        return simplifiedVisibilities.get(visibility).getBytes("UTF-8");
+    }
+    
+    private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) {
+
+        for (final RyaStatement triple : triples) {
+            Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility());
+            try {
+                tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
+            } catch (final TripleRowResolverException e) {
+                log.error("Could not convert a Triple into the SPO format: " + triple);
+            }
+        }
+    }
+    
+
+    /**
+     * Converts a triple into a byte[] holding the Rya SPO representation of it.
+     *
+     * @param triple - The triple to convert. (not null)
+     * @return The Rya SPO representation of the triple.
+     * @throws TripleRowResolverException The triple could not be converted.
+     */
+    public static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException {
+        checkNotNull(triple);
+        final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple);
+        final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
+        return spoRow.getRow();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 28c92af..b675ba7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -30,12 +30,12 @@ import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.utils.VisibilitySimplifier;
 import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
@@ -58,16 +58,16 @@ public class QueryResultObserver extends AbstractObserver {
     /**
      * Builders for each type of result exporter we support.
      */
-    private static final ImmutableSet<IncrementalResultExporterFactory> factories =
-            ImmutableSet.<IncrementalResultExporterFactory>builder()
-                .add(new RyaResultExporterFactory())
-                .add(new KafkaResultExporterFactory())
+    private static final ImmutableSet<IncrementalBindingSetExporterFactory> factories =
+            ImmutableSet.<IncrementalBindingSetExporterFactory>builder()
+                .add(new RyaBindingSetExporterFactory())
+                .add(new KafkaBindingSetExporterFactory())
                 .build();
 
     /**
      * The exporters that are configured.
      */
-    private ImmutableSet<IncrementalResultExporter> exporters = null;
+    private ImmutableSet<IncrementalBindingSetExporter> exporters = null;
 
     @Override
     public ObservedColumn getObservedColumn() {
@@ -79,13 +79,13 @@ public class QueryResultObserver extends AbstractObserver {
      */
     @Override
     public void init(final Context context) {
-        final ImmutableSet.Builder<IncrementalResultExporter> exportersBuilder = ImmutableSet.builder();
-
-        for(final IncrementalResultExporterFactory builder : factories) {
-        	log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder);
+        final ImmutableSet.Builder<IncrementalBindingSetExporter> exportersBuilder = ImmutableSet.builder();
 
+        for(final IncrementalBindingSetExporterFactory builder : factories) {
             try {
-                final Optional<IncrementalResultExporter> exporter = builder.build(context);
+                log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder);
+
+                final Optional<IncrementalBindingSetExporter> exporter = builder.build(context);
                 if(exporter.isPresent()) {
                     exportersBuilder.add(exporter.get());
                 }
@@ -117,7 +117,7 @@ public class QueryResultObserver extends AbstractObserver {
         result.setVisibility( simplifiedVisibilities.get(visibility) );
 
         // Export the result using each of the provided exporters.
-        for(final IncrementalResultExporter exporter : exporters) {
+        for(final IncrementalBindingSetExporter exporter : exporters) {
             try {
                 exporter.export(tx, queryId, result);
             } catch (final ResultExportException e) {
@@ -129,7 +129,7 @@ public class QueryResultObserver extends AbstractObserver {
     @Override
     public void close() {
         if(exporters != null) {
-            for(final IncrementalResultExporter exporter : exporters) {
+            for(final IncrementalBindingSetExporter exporter : exporters) {
                 try {
                     exporter.close();
                 } catch(final Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
new file mode 100644
index 0000000..e836c5d
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
@@ -0,0 +1,192 @@
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Metadata object used to store metadata for Construct Query Nodes found in
+ * SPARQL queries.
+ *
+ */
+public class ConstructQueryMetadata extends CommonNodeMetadata {
+
+    private String childNodeId;
+    private ConstructGraph graph;
+    private String sparql;
+
+    /**
+     * Creates ConstructQueryMetadata object from the provided metadata arguments.
+     * @param nodeId - id for the ConstructQueryNode
+     * @param childNodeId - id for the child of the ConstructQueryNode
+     * @param graph - {@link ConstructGraph} used to project {@link BindingSet}s onto sets of statement representing construct graph
+     * @param sparql - SPARQL query containing construct graph
+     */
+    public ConstructQueryMetadata(String nodeId, String childNodeId, ConstructGraph graph, String sparql) {
+        super(nodeId, new VariableOrder("subject", "predicate", "object"));
+        Preconditions.checkNotNull(childNodeId);
+        Preconditions.checkNotNull(graph);
+        Preconditions.checkNotNull(sparql);
+        this.childNodeId = childNodeId;
+        this.graph = graph;
+        this.sparql = sparql;
+    }
+
+    /**
+     * @return sparql query string representing this construct query
+     */
+    public String getSparql() {
+        return sparql;
+    }
+
+    /**
+     * @return The node whose results are projected onto the given
+     *         {@link ConstructGraph}.
+     */
+    public String getChildNodeId() {
+        return childNodeId;
+    }
+
+    /**
+     * @return The ConstructGraph used to form statement {@link BindingSet}s for
+     *         this Construct Query
+     */
+    public ConstructGraph getConstructGraph() {
+        return graph;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), childNodeId, graph, sparql);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o instanceof ConstructQueryMetadata) {
+            ConstructQueryMetadata queryMetadata = (ConstructQueryMetadata) o;
+            if (super.equals(queryMetadata)) {
+                return new EqualsBuilder().append(childNodeId, queryMetadata.childNodeId).append(graph, queryMetadata.graph)
+                        .append(sparql, queryMetadata.sparql).isEquals();
+            }
+            return false;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("Construct Query Metadata {\n").append("    Node ID: " + super.getNodeId() + "\n")
+                .append("    SPARQL QUERY: " + sparql + "\n").append("    Variable Order: " + super.getVariableOrder() + "\n")
+                .append("    Child Node ID: " + childNodeId + "\n").append("    Construct Graph: " + graph.getProjections() + "\n")
+                .append("}").toString();
+    }
+
+    /**
+     * Creates a new {@link Builder} for this class.
+     *
+     * @return A new {@link Builder} for this class.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builds instances of {@link QueryMetadata}.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static final class Builder {
+
+
+        private String nodeId;
+        private ConstructGraph graph;
+        private String childNodeId;
+        private String sparql;
+
+        /**
+         * Set the node Id that identifies this Construct Query Node
+         * 
+         * @param nodeId
+         *            id for this node
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setNodeId(String nodeId) {
+            this.nodeId = nodeId;
+            return this;
+        }
+        
+        /**
+         * Set the SPARQL String representing this construct query
+         * @param SPARQL string representing this construct query
+         */
+        public Builder setSparql(String sparql) {
+            this.sparql = sparql;
+            return this;
+        }
+
+        /**
+         * Set the ConstructGraph used to form statement {@link BindingSet}s for
+         * this Construct Query
+         *
+         * @param varOrder
+         *            - ConstructGraph to project {@link BindingSet}s onto RDF
+         *            statements
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setConstructGraph(ConstructGraph graph) {
+            this.graph = graph;
+            return this;
+        }
+
+        /**
+         * Set the node whose results are projected onto the given
+         * {@link ConstructGraph}.
+         *
+         * @param childNodeId
+         *            - The node whose results are projected onto the given
+         *            {@link ConstructGraph}.
+         * @return This builder so that method invocations may be chained.
+         */
+        public Builder setChildNodeId(String childNodeId) {
+            this.childNodeId = childNodeId;
+            return this;
+        }
+
+        /**
+         * @return An instance of {@link ConstructQueryMetadata} build using
+         *         this builder's values.
+         */
+        public ConstructQueryMetadata build() {
+            return new ConstructQueryMetadata(nodeId, childNodeId, graph, sparql);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index 3230a5d..a701052 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -43,11 +44,14 @@ import net.jcip.annotations.Immutable;
 @DefaultAnnotation(NonNull.class)
 public class FluoQuery {
 
-    private final QueryMetadata queryMetadata;
+    private final Optional<QueryMetadata> queryMetadata;
+    private final Optional<ConstructQueryMetadata> constructMetadata;
     private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata;
     private final ImmutableMap<String, FilterMetadata> filterMetadata;
     private final ImmutableMap<String, JoinMetadata> joinMetadata;
     private final ImmutableMap<String, AggregationMetadata> aggregationMetadata;
+    private final QueryType type;
+    public static enum QueryType {Projection, Construct};
 
     /**
      * Constructs an instance of {@link FluoQuery}. Private because applications
@@ -67,21 +71,65 @@ public class FluoQuery {
             final QueryMetadata queryMetadata,
             final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
             final ImmutableMap<String, FilterMetadata> filterMetadata,
+            final ImmutableMap<String, JoinMetadata> joinMetadata, 
+            final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
+                this.aggregationMetadata = requireNonNull(aggregationMetadata);
+        this.queryMetadata = Optional.of(requireNonNull(queryMetadata));
+        this.constructMetadata = Optional.absent();
+        this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
+        this.filterMetadata = requireNonNull(filterMetadata);
+        this.joinMetadata = requireNonNull(joinMetadata);
+        this.type = QueryType.Projection;
+    }
+    
+    
+    /**
+     * Constructs an instance of {@link FluoQuery}. Private because applications
+     * must use {@link Builder} instead.
+     *
+     * @param constructMetadata - The root node of a query that is updated in Fluo. (not null)
+     * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as
+     *   it is represented within the Fluo app. (not null)
+     * @param filterMetadata A map from Node ID to Filter metadata as it is represented
+     *   within the Fluo app. (not null)
+     * @param joinMetadata A map from Node ID to Join metadata as it is represented
+     *   within the Fluo app. (not null)
+     * @param aggregationMetadata - A map from Node ID to Aggregation metadata as it is
+     *   represented within the Fluo app. (not null)
+     */
+    private FluoQuery(
+            final ConstructQueryMetadata constructMetadata,
+            final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
+            final ImmutableMap<String, FilterMetadata> filterMetadata,
             final ImmutableMap<String, JoinMetadata> joinMetadata,
             final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
-        this.queryMetadata = requireNonNull(queryMetadata);
+        this.constructMetadata = Optional.of(requireNonNull(constructMetadata));
+        this.queryMetadata = Optional.absent();
         this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
         this.filterMetadata = requireNonNull(filterMetadata);
         this.joinMetadata = requireNonNull(joinMetadata);
-        this.aggregationMetadata = requireNonNull(aggregationMetadata);
+        this.aggregationMetadata = aggregationMetadata;
+        this.type = QueryType.Construct;
+    }
+    
+    /**
+     * Returns the {@link QueryType} of this query
+     * @return the QueryType of this query (either Construct or Projection}
+     */
+    public QueryType getQueryType() {
+        return type;
     }
 
     /**
      * @return Metadata about the root node of a query that is updated within the Fluo app.
      */
-    public QueryMetadata getQueryMetadata() {
+    public Optional<QueryMetadata> getQueryMetadata() {
         return queryMetadata;
     }
+    
+    public Optional<ConstructQueryMetadata> getConstructQueryMetadata() {
+        return constructMetadata;
+    }
 
     /**
      * Get a Statement Pattern node's metadata.
@@ -175,6 +223,7 @@ public class FluoQuery {
             final FluoQuery fluoQuery = (FluoQuery)o;
             return new EqualsBuilder()
                     .append(queryMetadata, fluoQuery.queryMetadata)
+                    .append(constructMetadata,  fluoQuery.constructMetadata)
                     .append(statementPatternMetadata, fluoQuery.statementPatternMetadata)
                     .append(filterMetadata, fluoQuery.filterMetadata)
                     .append(joinMetadata, fluoQuery.joinMetadata)
@@ -189,8 +238,13 @@ public class FluoQuery {
     public String toString() {
         final StringBuilder builder = new StringBuilder();
 
-        if(queryMetadata != null) {
-            builder.append( queryMetadata.toString() );
+        if(queryMetadata.isPresent()) {
+            builder.append( queryMetadata.get().toString() );
+            builder.append("\n");
+        }
+        
+        if(constructMetadata.isPresent()) {
+            builder.append( constructMetadata.get().toString() );
             builder.append("\n");
         }
 
@@ -231,6 +285,7 @@ public class FluoQuery {
     public static final class Builder {
 
         private QueryMetadata.Builder queryBuilder = null;
+        private ConstructQueryMetadata.Builder constructBuilder = null;
         private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>();
         private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>();
         private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>();
@@ -239,11 +294,11 @@ public class FluoQuery {
         /**
          * Sets the {@link QueryMetadata.Builder} that is used by this builder.
          *
-         * @param queryMetadata - The builder representing the query's results.
+         * @param queryBuilder - The builder representing the query's results.
          * @return This builder so that method invocation may be chained.
          */
-        public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryMetadata) {
-            this.queryBuilder = queryMetadata;
+        public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryBuilder) {
+            this.queryBuilder = queryBuilder;
             return this;
         }
 
@@ -253,6 +308,26 @@ public class FluoQuery {
         public Optional<QueryMetadata.Builder> getQueryBuilder() {
             return Optional.fromNullable( queryBuilder );
         }
+        
+        /**
+         * Sets the {@link ConstructQueryMetadata.Builder} that is used by this builder.
+         *
+         * @param constructBuilder
+         *            - The builder representing the query's results.
+         * @return This builder so that method invocation may be chained.
+         */
+        public Builder setConstructQueryMetadata(@Nullable final ConstructQueryMetadata.Builder constructBuilder) {
+            this.constructBuilder = constructBuilder;
+            return this;
+        }
+
+        /**
+         * @return The Construct Query metadata builder if one has been set.
+         */
+        public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder() {
+            return Optional.fromNullable( constructBuilder );
+        }
+        
 
         /**
          * Adds a new {@link StatementPatternMetadata.Builder} to this builder.
@@ -345,12 +420,14 @@ public class FluoQuery {
             requireNonNull(nodeId);
             return Optional.fromNullable( joinBuilders.get(nodeId) );
         }
+        
 
         /**
          * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder.
          */
         public FluoQuery build() {
-            final QueryMetadata queryMetadata = queryBuilder.build();
+            Preconditions.checkArgument(
+                    (queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null));
 
             final ImmutableMap.Builder<String, StatementPatternMetadata> spMetadata = ImmutableMap.builder();
             for(final Entry<String, StatementPatternMetadata.Builder> entry : spBuilders.entrySet()) {
@@ -372,7 +449,14 @@ public class FluoQuery {
                 aggregateMetadata.put(entry.getKey(), entry.getValue().build());
             }
 
-            return new FluoQuery(queryMetadata, spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+            if(queryBuilder != null) {
+                return new FluoQuery(queryBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+            }
+            //constructBuilder non-null in this case, but no need to check
+            else {
+                return new FluoQuery(constructBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+            }
+            
         }
     }
 }
\ No newline at end of file