You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/18 18:51:30 UTC

[5/5] incubator-rya git commit: RYA-282-Nested-Query. Closes #192.

RYA-282-Nested-Query. Closes #192.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e387818b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e387818b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e387818b

Branch: refs/heads/master
Commit: e387818ba22b07a07432d62eea172da6c33d793f
Parents: 6ce0b00
Author: Caleb Meier <ca...@parsons.com>
Authored: Thu Jul 20 06:57:38 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Fri Aug 18 11:50:36 2017 -0700

----------------------------------------------------------------------
 .../api/client/accumulo/AccumuloCreatePCJ.java  |   3 +-
 .../api/client/accumulo/AccumuloDeletePCJ.java  |   4 +-
 .../client/accumulo/AccumuloCreatePCJIT.java    |   1 +
 .../rya/api/client/accumulo/FluoITBase.java     |   4 +
 .../indexing/pcj/fluo/api/CreateFluoPcj.java    | 385 ++++++++++++++
 .../rya/indexing/pcj/fluo/api/CreatePcj.java    | 450 ----------------
 .../indexing/pcj/fluo/api/DeleteFluoPcj.java    | 312 +++++++++++
 .../rya/indexing/pcj/fluo/api/DeletePcj.java    | 305 -----------
 .../pcj/fluo/app/ConstructProjection.java       |   4 -
 .../fluo/app/ConstructQueryResultUpdater.java   |  46 +-
 .../pcj/fluo/app/FilterResultUpdater.java       |  23 +-
 .../pcj/fluo/app/FluoStringConverter.java       |   2 -
 .../fluo/app/IncrementalUpdateConstants.java    |   4 +
 .../pcj/fluo/app/JoinResultUpdater.java         |  31 +-
 .../rya/indexing/pcj/fluo/app/NodeType.java     |  57 +-
 .../pcj/fluo/app/PeriodicQueryUpdater.java      |   1 -
 .../pcj/fluo/app/ProjectionResultUpdater.java   |  89 ++++
 .../pcj/fluo/app/QueryResultUpdater.java        |  20 +-
 .../export/kafka/KafkaRyaSubGraphExporter.java  |   2 -
 .../fluo/app/observers/BindingSetUpdater.java   |  14 +-
 .../fluo/app/observers/ProjectionObserver.java  |  65 +++
 .../fluo/app/observers/QueryResultObserver.java |   2 +-
 .../pcj/fluo/app/query/AggregationMetadata.java |   8 +-
 .../pcj/fluo/app/query/CommonNodeMetadata.java  |  14 +
 .../fluo/app/query/ConstructQueryMetadata.java  |  94 ++--
 .../pcj/fluo/app/query/FilterMetadata.java      |  11 +-
 .../indexing/pcj/fluo/app/query/FluoQuery.java  | 234 ++++++---
 .../pcj/fluo/app/query/FluoQueryColumns.java    |  45 +-
 .../fluo/app/query/FluoQueryMetadataDAO.java    | 148 ++++--
 .../pcj/fluo/app/query/JoinMetadata.java        |  17 +-
 .../fluo/app/query/PeriodicQueryMetadata.java   |   9 +-
 .../pcj/fluo/app/query/ProjectionMetadata.java  | 236 +++++++++
 .../fluo/app/query/QueryBuilderVisitorBase.java | 119 +++++
 .../pcj/fluo/app/query/QueryMetadata.java       |  98 +++-
 .../app/query/QueryMetadataVisitorBase.java     | 113 ++++
 .../fluo/app/query/SparqlFluoQueryBuilder.java  | 444 +++++++++++-----
 .../app/query/StatementPatternMetadata.java     |   7 +-
 .../pcj/fluo/app/util/FluoQueryUtils.java       |  62 +++
 .../pcj/fluo/app/util/PeriodicQueryUtil.java    |  76 +--
 .../app/util/VariableOrderUpdateVisitor.java    | 166 ++++++
 .../fluo/app/query/PeriodicQueryUtilTest.java   |  10 +-
 .../fluo/app/query/QueryBuilderVisitorTest.java | 105 ++++
 .../app/query/QueryMetadataVisitorTest.java     | 109 ++++
 .../fluo/client/command/NewQueryCommand.java    |   4 +-
 .../fluo/client/util/QueryReportRenderer.java   |  41 +-
 .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java  |   4 +-
 .../pcj/fluo/ConstructGraphTestUtils.java       |  15 +-
 .../indexing/pcj/fluo/api/GetPcjMetadataIT.java |   4 +-
 .../indexing/pcj/fluo/api/GetQueryReportIT.java |   4 +-
 .../fluo/app/query/FluoQueryMetadataDAOIT.java  | 163 +++++-
 .../pcj/fluo/integration/BatchDeleteIT.java     |   8 +-
 .../pcj/fluo/integration/CreateDeleteIT.java    |  10 +-
 .../indexing/pcj/fluo/integration/InputIT.java  |  10 +-
 .../pcj/fluo/integration/KafkaExportIT.java     | 163 +++++-
 .../integration/KafkaRyaSubGraphExportIT.java   |  86 ++-
 .../indexing/pcj/fluo/integration/QueryIT.java  | 517 ++++++++++++-------
 .../pcj/fluo/integration/RyaExportIT.java       |   4 +-
 .../RyaInputIncrementalUpdateIT.java            |   8 +-
 .../pcj/fluo/integration/StreamingTestIT.java   |   4 +-
 .../HistoricStreamingVisibilityIT.java          |   4 +-
 .../pcj/fluo/visibility/PcjVisibilityIT.java    |   4 +-
 .../pcj/fluo/test/base/KafkaExportITBase.java   |   4 +-
 .../rya/pcj/fluo/test/base/RyaExportITBase.java |   2 +
 .../PeriodicNotificationProviderIT.java         |   9 +-
 .../notification/api/CreatePeriodicQuery.java   |  10 +-
 .../recovery/PeriodicNotificationProvider.java  |   9 +-
 66 files changed, 3569 insertions(+), 1467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
index 3fe1042..644189a 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
@@ -38,6 +38,7 @@ import org.apache.rya.api.instance.RyaDetailsUpdater;
 import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
 import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
 import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
@@ -145,7 +146,7 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
                 cd.getZookeepers(),
                 fluoAppName);) {
             // Initialize the PCJ within the Fluo application.
-            final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj();
+            final CreateFluoPcj fluoCreatePcj = new CreateFluoPcj();
             fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
index 96e6d58..eb2b2d7 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
@@ -31,7 +31,7 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
-import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
+import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
@@ -123,7 +123,7 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ {
                 cd.getZookeepers(),
                 fluoAppName)) {
             // Delete the PCJ from the Fluo App.
-            new DeletePcj(1000).deletePcj(fluoClient, pcjId);
+            new DeleteFluoPcj(1000).deletePcj(fluoClient, pcjId);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
index 9bbf01f..3463a02 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
@@ -34,6 +34,7 @@ import org.apache.rya.api.instance.RyaDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
 import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds;
+import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
index 2e16412..113b397 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
@@ -39,8 +39,10 @@ import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
 import org.apache.rya.accumulo.MiniAccumuloSingleton;
 import org.apache.rya.accumulo.RyaTestInstanceRule;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
@@ -219,6 +221,8 @@ public abstract class FluoITBase {
         observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
         observers.add(new ObserverSpecification(JoinObserver.class.getName()));
         observers.add(new ObserverSpecification(FilterObserver.class.getName()));
+        observers.add(new ObserverSpecification(ProjectionObserver.class.getName()));
+        observers.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName()));
 
         // Provide export parameters child test classes may provide to the
         // export observer.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
new file mode 100644
index 0000000..e450960
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.api.persist.query.BatchRyaQuery;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.calrissian.mango.collect.CloseableIterable;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query.
+ * <p>
+ * This is a two phase process.
+ * <ol>
+ *   <li>Setup metadata about each node of the query using a single Fluo transaction. </li>
+ *   <li>Scan Rya for binding sets that match each Statement Pattern from the query
+ *       and use a separate Fluo transaction for each batch that is inserted. This
+ *       ensure historic triples will be included in the query's results.</li>
+ * </ol>
+ * After the first step is finished, any new Triples that are added to the Fluo
+ * application will be matched against statement patterns, the final results
+ * will percolate to the top of the query, and those results will be exported to
+ * Rya's query system.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CreateFluoPcj {
+    private static final Logger log = Logger.getLogger(CreateFluoPcj.class);
+
+    /**
+     * The default Statement Pattern batch insert size is 1000.
+     */
+    private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
+
+    /**
+     * The maximum number of binding sets that will be inserted into each Statement
+     * Pattern's result set per Fluo transaction.
+     */
+    private final int spInsertBatchSize;
+
+    /**
+     * Constructs an instance of {@link CreateFluoPcj} that uses
+     * {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size.
+     */
+    public CreateFluoPcj() {
+        this(DEFAULT_SP_INSERT_BATCH_SIZE);
+    }
+
+    /**
+     * Constructs an instance of {@link CreateFluoPcj}.
+     *
+     * @param spInsertBatchSize - The maximum number of binding sets that will be
+     *   inserted into each Statement Pattern's result set per Fluo transaction.
+     */
+    public CreateFluoPcj(final int spInsertBatchSize) {
+        checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0.");
+        this.spInsertBatchSize = spInsertBatchSize;
+    }
+    
+
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method
+     * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated
+     * inside of Fluo.  This method assumes that the user will export the results to Kafka or
+     * some other external resource.  The export id is equivalent to the queryId that is returned,
+     * which is in contrast to the other createPcj methods in this class which accept an external pcjId
+     * that is used to identify the Accumulo table or Kafka topic for exporting results.
+     *
+     * @param sparql - sparql query String to be registered with Fluo
+     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+     * @return The metadata that was written to the Fluo application for the PCJ.
+     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+     */
+    public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException {
+        Preconditions.checkNotNull(sparql);
+        Preconditions.checkNotNull(fluo);
+        
+        String pcjId = UUID.randomUUID().toString().replaceAll("-", "");
+        return createPcj(pcjId, sparql, fluo);
+    }
+    
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  This method provides
+     * no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely
+     * creates the FluoQuery (metadata) inside of Fluo so that results and be incrementally generated
+     * inside of Fluo.  This method assumes that the user will export the results to Kafka or
+     * some other external resource.
+     *
+     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
+     * @param sparql - sparql query String to be registered with Fluo
+     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+     * @return The metadata that was written to the Fluo application for the PCJ.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+     */
+    public FluoQuery createPcj(
+            final String pcjId,
+            final String sparql,
+            final FluoClient fluo) throws MalformedQueryException {
+        requireNonNull(pcjId);
+        requireNonNull(sparql);
+        requireNonNull(fluo);
+
+        FluoQuery fluoQuery = makeFluoQuery(sparql, pcjId);
+        writeFluoQuery(fluo, fluoQuery, pcjId);
+
+        return fluoQuery;
+    }
+    
+    private FluoQuery makeFluoQuery(String sparql, String pcjId) throws MalformedQueryException {
+        
+        String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
+        
+        SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+        builder.setFluoQueryId(queryId);
+        builder.setSparql(sparql);
+        
+        return builder.build();
+    }
+    
+    private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) {
+        try (Transaction tx = fluo.newTransaction()) {
+            // Write the query's structure to Fluo.
+            new FluoQueryMetadataDAO().write(tx, fluoQuery);
+            
+            // Flush the changes to Fluo.
+            tx.commit();
+        }
+    }
+
+    
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  The method takes in an
+     * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists.
+     *
+     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
+     * @param pcjStorage - Provides access to the PCJ index. (not null)
+     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+     * @return The metadata that was written to the Fluo application for the PCJ.
+     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+     */
+    public FluoQuery createPcj(
+            final String pcjId,
+            final PrecomputedJoinStorage pcjStorage,
+            final FluoClient fluo) throws MalformedQueryException, PcjException {
+        requireNonNull(pcjId);
+        requireNonNull(pcjStorage);
+        requireNonNull(fluo);
+
+        // Parse the query's structure for the metadata that will be written to fluo.
+        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+        final String sparql = pcjMetadata.getSparql();
+        return createPcj(pcjId, sparql, fluo);
+    }
+    
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+     * <p>
+     * This call scans Rya for Statement Pattern matches and inserts them into
+     * the Fluo application. This method does not verify that a PcjTable with the
+     * the given pcjId actually exists. It is assumed that results for any query registered
+     * using this method will be exported to Kafka or some other external service.
+     *
+     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
+     * @param sparql - sparql query that will registered with Fluo. (not null)
+     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+     * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
+     * @return The Fluo application's Query ID of the query that was created.
+     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+     * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
+     */
+    public String withRyaIntegration(
+            final String pcjId,
+            final String sparql,
+            final FluoClient fluo,
+            final Connector accumulo,
+            final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
+        requireNonNull(pcjId);
+        requireNonNull(sparql);
+        requireNonNull(fluo);
+        requireNonNull(accumulo);
+        requireNonNull(ryaInstance);
+
+        
+        // Write the SPARQL query's structure to the Fluo Application.
+        final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo);
+        //import results already ingested into Rya that match query
+        importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
+        // return queryId to the caller for later monitoring from the export.
+        return fluoQuery.getQueryMetadata().getNodeId();
+    }
+    
+
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+     * <p>
+     * This call scans Rya for Statement Pattern matches and inserts them into
+     * the Fluo application. The Fluo application will then maintain the intermediate
+     * results as new triples are inserted and export any new query results to the
+     * {@code pcjId} within the provided {@code pcjStorage}.  This method requires that a
+     * PCJ table already exist for the query corresponding to the pcjId.  Results will be exported
+     * to this table.
+     *
+     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
+     * @param pcjStorage - Provides access to the PCJ index. (not null)
+     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+     * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
+     * @return The Fluo application's Query ID of the query that was created.
+     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+     * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
+     */
+    public String withRyaIntegration(
+            final String pcjId,
+            final PrecomputedJoinStorage pcjStorage,
+            final FluoClient fluo,
+            final Connector accumulo,
+            final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
+        requireNonNull(pcjId);
+        requireNonNull(pcjStorage);
+        requireNonNull(fluo);
+        requireNonNull(accumulo);
+        requireNonNull(ryaInstance);
+        
+        // Parse the query's structure for the metadata that will be written to fluo.
+        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+        final String sparql = pcjMetadata.getSparql();
+        
+        return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance);
+    }
+    
+    private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance)
+            throws RyaDAOException {
+        // Reuse the same set object while performing batch inserts.
+        final Set<RyaStatement> queryBatch = new HashSet<>();
+
+        // Iterate through each of the statement patterns and insert their
+        // historic matches into Fluo.
+        for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) {
+            // Get an iterator over all of the binding sets that match the
+            // statement pattern.
+            final StatementPattern pattern = FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern());
+            queryBatch.add(spToRyaStatement(pattern));
+        }
+
+        // Create AccumuloRyaQueryEngine to query for historic results
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(ryaInstance);
+        conf.setAuths(getAuths(accumulo));
+
+        try (final AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf);
+                CloseableIterable<RyaStatement> queryIterable = queryEngine.query(new BatchRyaQuery(queryBatch))) {
+            final Set<RyaStatement> triplesBatch = new HashSet<>();
+
+            // Insert batches of the binding sets into Fluo.
+            for (final RyaStatement ryaStatement : queryIterable) {
+                if (triplesBatch.size() == spInsertBatchSize) {
+                    writeBatch(fluo, triplesBatch);
+                    triplesBatch.clear();
+                }
+
+                triplesBatch.add(ryaStatement);
+            }
+
+            if (!triplesBatch.isEmpty()) {
+                writeBatch(fluo, triplesBatch);
+                triplesBatch.clear();
+            }
+        } catch (final IOException e) {
+            log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e);
+        }
+    }
+
+    private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) {
+        checkNotNull(fluo);
+        checkNotNull(batch);
+        new InsertTriples().insert(fluo, batch);
+    }
+
+    private static RyaStatement spToRyaStatement(final StatementPattern sp) {
+        final Value subjVal = sp.getSubjectVar().getValue();
+        final Value predVal = sp.getPredicateVar().getValue();
+        final Value objVal = sp.getObjectVar().getValue();
+
+        RyaURI subjURI = null;
+        RyaURI predURI = null;
+        RyaType objType = null;
+
+        if(subjVal != null) {
+            if(!(subjVal instanceof Resource)) {
+                throw new AssertionError("Subject must be a Resource.");
+            }
+            subjURI = RdfToRyaConversions.convertResource((Resource) subjVal);
+        }
+
+        if (predVal != null) {
+            if(!(predVal instanceof URI)) {
+                throw new AssertionError("Predicate must be a URI.");
+            }
+            predURI = RdfToRyaConversions.convertURI((URI) predVal);
+        }
+
+        if (objVal != null ) {
+            objType = RdfToRyaConversions.convertValue(objVal);
+        }
+
+        return new RyaStatement(subjURI, predURI, objType);
+    }
+
+    private String[] getAuths(final Connector accumulo) {
+        Authorizations auths;
+        try {
+            auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami());
+            final List<byte[]> authList = auths.getAuthorizations();
+            final String[] authArray = new String[authList.size()];
+            for(int i = 0; i < authList.size(); i++){
+                authArray[i] = new String(authList.get(i), "UTF-8");
+            }
+            return authArray;
+        } catch (AccumuloException | AccumuloSecurityException | UnsupportedEncodingException e) {
+            throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
deleted file mode 100644
index 767d9d2..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.api;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.Objects.requireNonNull;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.api.persist.query.BatchRyaQuery;
-import org.apache.rya.api.resolver.RdfToRyaConversions;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
-import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
-import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
-import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.calrissian.mango.collect.CloseableIterable;
-import org.openrdf.model.Resource;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import com.google.common.base.Preconditions;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query.
- * <p>
- * This is a two phase process.
- * <ol>
- *   <li>Setup metadata about each node of the query using a single Fluo transaction. </li>
- *   <li>Scan Rya for binding sets that match each Statement Pattern from the query
- *       and use a separate Fluo transaction for each batch that is inserted. This
- *       ensure historic triples will be included in the query's results.</li>
- * </ol>
- * After the first step is finished, any new Triples that are added to the Fluo
- * application will be matched against statement patterns, the final results
- * will percolate to the top of the query, and those results will be exported to
- * Rya's query system.
- */
-@DefaultAnnotation(NonNull.class)
-public class CreatePcj {
-    private static final Logger log = Logger.getLogger(CreatePcj.class);
-
-    /**
-     * The default Statement Pattern batch insert size is 1000.
-     */
-    private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
-
-    /**
-     * The maximum number of binding sets that will be inserted into each Statement
-     * Pattern's result set per Fluo transaction.
-     */
-    private final int spInsertBatchSize;
-
-    /**
-     * Constructs an instance of {@link CreatePcj} that uses
-     * {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size.
-     */
-    public CreatePcj() {
-        this(DEFAULT_SP_INSERT_BATCH_SIZE);
-    }
-
-    /**
-     * Constructs an instance of {@link CreatePcj}.
-     *
-     * @param spInsertBatchSize - The maximum number of binding sets that will be
-     *   inserted into each Statement Pattern's result set per Fluo transaction.
-     */
-    public CreatePcj(final int spInsertBatchSize) {
-        checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0.");
-        this.spInsertBatchSize = spInsertBatchSize;
-    }
-    
-    
-    /**
-     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  This method does not
-     * require a pcjId and does not require a PCJ table to have already been created via {@link PrecomputedJoinStorage}.
-     * This method only adds the metadata to the Fluo table to incrementally generate query results.  Since there
-     * is no PCJ table, the incremental results must be exported to some external queuing service such as Kafka.
-     * This method currently only supports SPARQL COSNTRUCT queries, as they only export to Kafka by default. 
-     *
-     * @param sparql - SPARQL query whose results will be updated in the Fluo table
-     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
-     * @return The metadata that was written to the Fluo application for the PCJ.
-     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
-     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
-     * @throws RuntimeException If SPARQL query is not a CONSTRUCT query.
-     */
-    public FluoQuery createFluoPcj(final FluoClient fluo, String sparql) throws MalformedQueryException, PcjException {
-        requireNonNull(sparql);
-        requireNonNull(fluo);
-
-        // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo.
-        // We use these IDs later when scanning Rya for historic Statement Pattern matches
-        // as well as setting up automatic exports.
-        final NodeIds nodeIds = new NodeIds();
-        final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null);
-        final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
-        checkArgument(fluoQuery.getConstructQueryMetadata().isPresent(), "Sparql query: " + sparql + " must begin with a construct.");
-
-        try (Transaction tx = fluo.newTransaction()) {
-            // Write the query's structure to Fluo.
-            new FluoQueryMetadataDAO().write(tx, fluoQuery);
-            tx.commit();
-        }
-
-        return fluoQuery;
-    }
-
-    
-
-
-    /**
-     * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method
-     * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated
-     * inside of Fluo.  This method assumes that the user will export the results to Kafka or
-     * some other external resource.  The export id is equivalent to the queryId that is returned,
-     * which is in contrast to the other createPcj methods in this class which accept an external pcjId
-     * that is used to identify the Accumulo table or Kafka topic for exporting results.
-     *
-     * @param sparql - sparql query String to be registered with Fluo
-     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
-     * @return queryId - The id of the root of the query metadata tree in Fluo
-     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
-     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
-     */
-    public String createPcj(String sparql, FluoClient fluo) throws MalformedQueryException {
-        Preconditions.checkNotNull(sparql);
-        Preconditions.checkNotNull(fluo);
-        
-        FluoQuery fluoQuery = makeFluoQuery(sparql);
-        String queryId = null;
-        if(fluoQuery.getQueryMetadata().isPresent()) {
-            queryId = fluoQuery.getQueryMetadata().get().getNodeId();
-            queryId = queryId.split(IncrementalUpdateConstants.QUERY_PREFIX)[1];
-        } else {
-            queryId = fluoQuery.getConstructQueryMetadata().get().getNodeId();
-            queryId = queryId.split(IncrementalUpdateConstants.CONSTRUCT_PREFIX)[1];
-        }
-        
-        String[] idArray = queryId.split("_");
-        String id = idArray[idArray.length - 1];
-        
-        writeFluoQuery(fluo, fluoQuery, id);
-        return id;
-    }
-    
-    /**
-     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  This method provides
-     * no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely
-     * creates the FluoQuery (metadata) inside of Fluo so that results and be incrementally generated
-     * inside of Fluo.  This method assumes that the user will export the results to Kafka or
-     * some other external resource.
-     *
-     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
-     * @param sparql - sparql query String to be registered with Fluo
-     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
-     * @return The metadata that was written to the Fluo application for the PCJ.
-     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
-     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
-     */
-    public FluoQuery createPcj(
-            final String pcjId,
-            final String sparql,
-            final FluoClient fluo) throws MalformedQueryException, PcjException {
-        requireNonNull(pcjId);
-        requireNonNull(sparql);
-        requireNonNull(fluo);
-
-        FluoQuery fluoQuery = makeFluoQuery(sparql);
-        writeFluoQuery(fluo, fluoQuery, pcjId);
-
-        return fluoQuery;
-    }
-    
-    private FluoQuery makeFluoQuery(String sparql) throws MalformedQueryException {
-        
-        // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo.
-        // We use these IDs later when scanning Rya for historic Statement Pattern matches
-        // as well as setting up automatic exports.
-        final NodeIds nodeIds = new NodeIds();
-
-        // Parse the query's structure for the metadata that will be written to fluo.
-        final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null);
-        return new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
-    }
-    
-    private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) {
-        try (Transaction tx = fluo.newTransaction()) {
-            // Write the query's structure to Fluo.
-            new FluoQueryMetadataDAO().write(tx, fluoQuery);
-
-            // The results of the query are eventually exported to an instance
-            // of Rya, so store the Rya ID for the PCJ.
-            QueryMetadata metadata = fluoQuery.getQueryMetadata().orNull();
-            if (metadata != null) {
-                String queryId = metadata.getNodeId();
-                tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
-                tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
-            }
-
-            // Flush the changes to Fluo.
-            tx.commit();
-        }
-    }
-
-    
-    /**
-     * Tells the Fluo PCJ Updater application to maintain a new PCJ.  The method takes in an
-     * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists.
-     *
-     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
-     * @param pcjStorage - Provides access to the PCJ index. (not null)
-     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
-     * @return The metadata that was written to the Fluo application for the PCJ.
-     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
-     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
-     */
-    public FluoQuery createPcj(
-            final String pcjId,
-            final PrecomputedJoinStorage pcjStorage,
-            final FluoClient fluo) throws MalformedQueryException, PcjException {
-        requireNonNull(pcjId);
-        requireNonNull(pcjStorage);
-        requireNonNull(fluo);
-
-        // Parse the query's structure for the metadata that will be written to fluo.
-        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
-        final String sparql = pcjMetadata.getSparql();
-        return createPcj(pcjId, sparql, fluo);
-    }
-    
-    /**
-     * Tells the Fluo PCJ Updater application to maintain a new PCJ.
-     * <p>
-     * This call scans Rya for Statement Pattern matches and inserts them into
-     * the Fluo application. This method does not verify that a PcjTable with the
-     * the given pcjId actually exists. It is assumed that results for any query registered
-     * using this method will be exported to Kafka or some other external service.
-     *
-     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
-     * @param sparql - sparql query that will registered with Fluo. (not null)
-     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
-     * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
-     * @return The Fluo application's Query ID of the query that was created.
-     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
-     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
-     * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
-     */
-    public String withRyaIntegration(
-            final String pcjId,
-            final String sparql,
-            final FluoClient fluo,
-            final Connector accumulo,
-            final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
-        requireNonNull(pcjId);
-        requireNonNull(sparql);
-        requireNonNull(fluo);
-        requireNonNull(accumulo);
-        requireNonNull(ryaInstance);
-
-        
-        // Write the SPARQL query's structure to the Fluo Application.
-        final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo);
-        //import results already ingested into Rya that match query
-        importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
-        // return queryId to the caller for later monitoring from the export.
-        return fluoQuery.getQueryMetadata().get().getNodeId();
-    }
-    
-
-    /**
-     * Tells the Fluo PCJ Updater application to maintain a new PCJ.
-     * <p>
-     * This call scans Rya for Statement Pattern matches and inserts them into
-     * the Fluo application. The Fluo application will then maintain the intermediate
-     * results as new triples are inserted and export any new query results to the
-     * {@code pcjId} within the provided {@code pcjStorage}.  This method requires that a
-     * PCJ table already exist for the query corresponding to the pcjId.  Results will be exported
-     * to this table.
-     *
-     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
-     * @param pcjStorage - Provides access to the PCJ index. (not null)
-     * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
-     * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
-     * @return The Fluo application's Query ID of the query that was created.
-     * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
-     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
-     * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
-     */
-    public String withRyaIntegration(
-            final String pcjId,
-            final PrecomputedJoinStorage pcjStorage,
-            final FluoClient fluo,
-            final Connector accumulo,
-            final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
-        requireNonNull(pcjId);
-        requireNonNull(pcjStorage);
-        requireNonNull(fluo);
-        requireNonNull(accumulo);
-        requireNonNull(ryaInstance);
-        
-        // Parse the query's structure for the metadata that will be written to fluo.
-        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
-        final String sparql = pcjMetadata.getSparql();
-        
-        return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance);
-    }
-    
-    private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance)
-            throws RyaDAOException {
-        // Reuse the same set object while performing batch inserts.
-        final Set<RyaStatement> queryBatch = new HashSet<>();
-
-        // Iterate through each of the statement patterns and insert their
-        // historic matches into Fluo.
-        for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) {
-            // Get an iterator over all of the binding sets that match the
-            // statement pattern.
-            final StatementPattern pattern = FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern());
-            queryBatch.add(spToRyaStatement(pattern));
-        }
-
-        // Create AccumuloRyaQueryEngine to query for historic results
-        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix(ryaInstance);
-        conf.setAuths(getAuths(accumulo));
-
-        try (final AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf);
-                CloseableIterable<RyaStatement> queryIterable = queryEngine.query(new BatchRyaQuery(queryBatch))) {
-            final Set<RyaStatement> triplesBatch = new HashSet<>();
-
-            // Insert batches of the binding sets into Fluo.
-            for (final RyaStatement ryaStatement : queryIterable) {
-                if (triplesBatch.size() == spInsertBatchSize) {
-                    writeBatch(fluo, triplesBatch);
-                    triplesBatch.clear();
-                }
-
-                triplesBatch.add(ryaStatement);
-            }
-
-            if (!triplesBatch.isEmpty()) {
-                writeBatch(fluo, triplesBatch);
-                triplesBatch.clear();
-            }
-        } catch (final IOException e) {
-            log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e);
-        }
-    }
-
-    private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) {
-        checkNotNull(fluo);
-        checkNotNull(batch);
-        new InsertTriples().insert(fluo, batch);
-    }
-
-    private static RyaStatement spToRyaStatement(final StatementPattern sp) {
-        final Value subjVal = sp.getSubjectVar().getValue();
-        final Value predVal = sp.getPredicateVar().getValue();
-        final Value objVal = sp.getObjectVar().getValue();
-
-        RyaURI subjURI = null;
-        RyaURI predURI = null;
-        RyaType objType = null;
-
-        if(subjVal != null) {
-            if(!(subjVal instanceof Resource)) {
-                throw new AssertionError("Subject must be a Resource.");
-            }
-            subjURI = RdfToRyaConversions.convertResource((Resource) subjVal);
-        }
-
-        if (predVal != null) {
-            if(!(predVal instanceof URI)) {
-                throw new AssertionError("Predicate must be a URI.");
-            }
-            predURI = RdfToRyaConversions.convertURI((URI) predVal);
-        }
-
-        if (objVal != null ) {
-            objType = RdfToRyaConversions.convertValue(objVal);
-        }
-
-        return new RyaStatement(subjURI, predURI, objType);
-    }
-
-    private String[] getAuths(final Connector accumulo) {
-        Authorizations auths;
-        try {
-            auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami());
-            final List<byte[]> authList = auths.getAuthorizations();
-            final String[] authArray = new String[authList.size()];
-            for(int i = 0; i < authList.size(); i++){
-                authArray[i] = new String(authList.get(i), "UTF-8");
-            }
-            return authArray;
-        } catch (AccumuloException | AccumuloSecurityException | UnsupportedEncodingException e) {
-            throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
new file mode 100644
index 0000000..58a52fb
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.openrdf.query.BindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Deletes a Pre-computed Join (PCJ) from Fluo.
+ * <p>
+ * This is a two phase process.
+ * <ol>
+ * <li>Delete metadata about each node of the query using a single Fluo
+ * transaction. This prevents new {@link BindingSet}s from being created when
+ * new triples are inserted.</li>
+ * <li>Delete BindingSets associated with each node of the query. This is done
+ * in a batch fashion to guard against large delete transactions that don't fit
+ * into memory.</li>
+ * </ol>
+ */
+@DefaultAnnotation(NonNull.class)
+public class DeleteFluoPcj {
+
+    private final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+    private final int batchSize;
+
+    /**
+     * Constructs an instance of {@link DeleteFluoPcj}.
+     *
+     * @param batchSize - The number of entries that will be deleted at a time. (> 0)
+     */
+    public DeleteFluoPcj(final int batchSize) {
+        checkArgument(batchSize > 0);
+        this.batchSize = batchSize;
+    }
+
+    /**
+     * Deletes all metadata and {@link BindingSet}s associated with a Rya
+     * Precomputed Join Index from the Fluo application that is incrementally
+     * updating it.
+     *
+     * @param client - Connects to the Fluo application that is updating the PCJ
+     *            Index. (not null)
+     * @param pcjId - The PCJ ID for the query that will removed from the Fluo
+     *            application. (not null)
+     */
+    public void deletePcj(final FluoClient client, final String pcjId) {
+        requireNonNull(client);
+        requireNonNull(pcjId);
+
+        final Transaction tx = client.newTransaction();
+
+        // Delete the query's metadata. This halts input.
+        final List<String> nodeIds = getNodeIds(tx, pcjId);
+        deleteMetadata(tx, nodeIds, pcjId);
+
+        // Delete the binding sets associated with the query's nodes.
+        for (final String nodeId : nodeIds) {
+            deleteData(client, nodeId);
+        }
+    }
+
+    /**
+     * This method retrieves all of the nodeIds that are part of the query with
+     * specified pcjId.
+     *
+     * @param tx - Transaction of a given Fluo table. (not null)
+     * @param pcjId - Id of query. (not null)
+     * @return list of Node IDs associated with the query {@code pcjId}.
+     */
+    private List<String> getNodeIds(Transaction tx, String pcjId) {
+        requireNonNull(tx);
+        requireNonNull(pcjId);
+
+        // Get the ID that tracks the query within the Fluo application.
+        final String queryId = getQueryIdFromPcjId(tx, pcjId);
+
+        // Get the query's children nodes.
+        final List<String> nodeIds = new ArrayList<>();
+        nodeIds.add(queryId);
+        getChildNodeIds(tx, queryId, nodeIds);
+        return nodeIds;
+    }
+
+    /**
+     * Recursively navigate query tree to extract all of the nodeIds.
+     *
+     * @param tx - Transaction of a given Fluo table. (not null)
+     * @param nodeId - Current node in query tree. (not null)
+     * @param nodeIds - The Node IDs extracted from query tree. (not null)
+     */
+    private void getChildNodeIds(final Transaction tx, final String nodeId, final List<String> nodeIds) {
+        requireNonNull(tx);
+        requireNonNull(nodeId);
+        requireNonNull(nodeIds);
+
+        final NodeType type = NodeType.fromNodeId(nodeId).get();
+        switch (type) {
+            case QUERY:
+                final QueryMetadata queryMeta = dao.readQueryMetadata(tx, nodeId);
+                final String queryChild = queryMeta.getChildNodeId();
+                nodeIds.add(queryChild);
+                getChildNodeIds(tx, queryChild, nodeIds);
+                break;
+            case CONSTRUCT:
+                final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId);
+                final String constructChild = constructMeta.getChildNodeId();
+                nodeIds.add(constructChild);
+                getChildNodeIds(tx, constructChild, nodeIds);
+                break;
+            case JOIN:
+                final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId);
+                final String lchild = joinMeta.getLeftChildNodeId();
+                final String rchild = joinMeta.getRightChildNodeId();
+                nodeIds.add(lchild);
+                nodeIds.add(rchild);
+                getChildNodeIds(tx, lchild, nodeIds);
+                getChildNodeIds(tx, rchild, nodeIds);
+                break;
+            case FILTER:
+                final FilterMetadata filterMeta = dao.readFilterMetadata(tx, nodeId);
+                final String filterChild = filterMeta.getChildNodeId();
+                nodeIds.add(filterChild);
+                getChildNodeIds(tx, filterChild, nodeIds);
+                break;
+            case AGGREGATION:
+                final AggregationMetadata aggMeta = dao.readAggregationMetadata(tx, nodeId);
+                final String aggChild = aggMeta.getChildNodeId();
+                nodeIds.add(aggChild);
+                getChildNodeIds(tx, aggChild, nodeIds);
+                break;
+            case PERIODIC_QUERY:
+                final PeriodicQueryMetadata periodicMeta = dao.readPeriodicQueryMetadata(tx, nodeId);
+                final String periodicChild = periodicMeta.getChildNodeId();
+                nodeIds.add(periodicChild);
+                getChildNodeIds(tx, periodicChild, nodeIds);
+                break;
+            case PROJECTION:
+                final ProjectionMetadata projectionMetadata = dao.readProjectionMetadata(tx, nodeId);
+                final String projectionChild = projectionMetadata.getChildNodeId();
+                nodeIds.add(projectionChild);
+                getChildNodeIds(tx, projectionChild, nodeIds);
+                break;
+            case STATEMENT_PATTERN:
+                break;
+        }
+    }
+
+    /**
+     * Deletes metadata for all nodeIds associated with a given queryId in a
+     * single transaction. Prevents additional BindingSets from being created as
+     * new triples are added.
+     *
+     * @param tx - Transaction of a given Fluo table. (not null)
+     * @param nodeIds - Nodes whose metatdata will be deleted. (not null)
+     * @param pcjId - The PCJ ID of the query whose will be deleted. (not null)
+     */
+    private void deleteMetadata(final Transaction tx, final List<String> nodeIds, final String pcjId) {
+        requireNonNull(tx);
+        requireNonNull(nodeIds);
+        requireNonNull(pcjId);
+
+        try (final Transaction typeTx = tx) {
+            deletePcjIdAndSparqlMetadata(typeTx, pcjId);
+
+            for (final String nodeId : nodeIds) {
+                final NodeType type = NodeType.fromNodeId(nodeId).get();
+                deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns());
+            }
+            typeTx.commit();
+        }
+    }
+
+    /**
+     * Deletes all metadata for a Query Node.
+     *
+     * @param tx - Transaction the deletes will be performed with. (not null)
+     * @param nodeId - The Node ID of the query node to delete. (not null)
+     * @param columns - The columns that will be deleted. (not null)
+     */
+    private void deleteMetadataColumns(final Transaction tx, final String nodeId, final List<Column> columns) {
+        requireNonNull(tx);
+        requireNonNull(columns);
+        requireNonNull(nodeId);
+
+        final Bytes row = Bytes.of(nodeId);
+        for (final Column column : columns) {
+            tx.delete(row, column);
+        }
+    }
+
+    /**
+     * Deletes high level query meta for converting from queryId to pcjId and
+     * vice versa, as well as converting from sparql to queryId.
+     *
+     * @param tx - Transaction the deletes will be performed with. (not null)
+     * @param pcjId - The PCJ whose metadata will be deleted. (not null)
+     */
+    private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) {
+        requireNonNull(tx);
+        requireNonNull(pcjId);
+
+        final String queryId = getQueryIdFromPcjId(tx, pcjId);
+        final String sparql = getSparqlFromQueryId(tx, queryId);
+        tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID);
+        tx.delete(sparql, FluoQueryColumns.QUERY_ID);
+        tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID);
+    }
+
+    /**
+     * Deletes all results (BindingSets or Statements) associated with the specified nodeId.
+     *
+     * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null)
+     * @param client - Used to delete the data. (not null)
+     */
+    private void deleteData(final FluoClient client, final String nodeId) {
+        requireNonNull(client);
+        requireNonNull(nodeId);
+
+        final NodeType type = NodeType.fromNodeId(nodeId).get();
+        Transaction tx = client.newTransaction();
+        while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) {
+            tx = client.newTransaction();
+        }
+    }
+
+    private CellScanner getIterator(final Transaction tx, final String nodeId, final Column column) {
+        requireNonNull(tx);
+        requireNonNull(nodeId);
+        requireNonNull(column);
+
+        return tx.scanner().fetch(column).over(Span.prefix(nodeId)).build();
+    }
+
+    private boolean deleteDataBatch(final Transaction tx, final CellScanner scanner, final Column column) {
+        requireNonNull(tx);
+        requireNonNull(scanner);
+        requireNonNull(column);
+
+        try (Transaction ntx = tx) {
+            int count = 0;
+            final Iterator<RowColumnValue> iter = scanner.iterator();
+            while (iter.hasNext() && count < batchSize) {
+                final Bytes row = iter.next().getRow();
+                count++;
+                tx.delete(row, column);
+            }
+
+            final boolean hasNext = iter.hasNext();
+            tx.commit();
+            return hasNext;
+        }
+    }
+
+    private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) {
+        requireNonNull(tx);
+        requireNonNull(pcjId);
+
+        final Bytes queryIdBytes = tx.get(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID);
+        return queryIdBytes.toString();
+    }
+
+    private String getSparqlFromQueryId(final Transaction tx, final String queryId) {
+        requireNonNull(tx);
+        requireNonNull(queryId);
+
+        final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId);
+        return metadata.getSparql();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
deleted file mode 100644
index 3052c1d..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.api;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.openrdf.query.BindingSet;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Deletes a Pre-computed Join (PCJ) from Fluo.
- * <p>
- * This is a two phase process.
- * <ol>
- * <li>Delete metadata about each node of the query using a single Fluo
- * transaction. This prevents new {@link BindingSet}s from being created when
- * new triples are inserted.</li>
- * <li>Delete BindingSets associated with each node of the query. This is done
- * in a batch fashion to guard against large delete transactions that don't fit
- * into memory.</li>
- * </ol>
- */
-@DefaultAnnotation(NonNull.class)
-public class DeletePcj {
-
-    private final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
-    private final int batchSize;
-
-    /**
-     * Constructs an instance of {@link DeletePcj}.
-     *
-     * @param batchSize - The number of entries that will be deleted at a time. (> 0)
-     */
-    public DeletePcj(final int batchSize) {
-        checkArgument(batchSize > 0);
-        this.batchSize = batchSize;
-    }
-
-    /**
-     * Deletes all metadata and {@link BindingSet}s associated with a Rya
-     * Precomputed Join Index from the Fluo application that is incrementally
-     * updating it.
-     *
-     * @param client - Connects to the Fluo application that is updating the PCJ
-     *            Index. (not null)
-     * @param pcjId - The PCJ ID for the query that will removed from the Fluo
-     *            application. (not null)
-     */
-    public void deletePcj(final FluoClient client, final String pcjId) {
-        requireNonNull(client);
-        requireNonNull(pcjId);
-
-        final Transaction tx = client.newTransaction();
-
-        // Delete the query's metadata. This halts input.
-        final List<String> nodeIds = getNodeIds(tx, pcjId);
-        deleteMetadata(tx, nodeIds, pcjId);
-
-        // Delete the binding sets associated with the query's nodes.
-        for (final String nodeId : nodeIds) {
-            deleteData(client, nodeId);
-        }
-    }
-
-    /**
-     * This method retrieves all of the nodeIds that are part of the query with
-     * specified pcjId.
-     *
-     * @param tx - Transaction of a given Fluo table. (not null)
-     * @param pcjId - Id of query. (not null)
-     * @return list of Node IDs associated with the query {@code pcjId}.
-     */
-    private List<String> getNodeIds(Transaction tx, String pcjId) {
-        requireNonNull(tx);
-        requireNonNull(pcjId);
-
-        // Get the ID that tracks the query within the Fluo application.
-        final String queryId = getQueryIdFromPcjId(tx, pcjId);
-
-        // Get the query's children nodes.
-        final List<String> nodeIds = new ArrayList<>();
-        nodeIds.add(queryId);
-        getChildNodeIds(tx, queryId, nodeIds);
-        return nodeIds;
-    }
-
-    /**
-     * Recursively navigate query tree to extract all of the nodeIds.
-     *
-     * @param tx - Transaction of a given Fluo table. (not null)
-     * @param nodeId - Current node in query tree. (not null)
-     * @param nodeIds - The Node IDs extracted from query tree. (not null)
-     */
-    private void getChildNodeIds(final Transaction tx, final String nodeId, final List<String> nodeIds) {
-        requireNonNull(tx);
-        requireNonNull(nodeId);
-        requireNonNull(nodeIds);
-
-        final NodeType type = NodeType.fromNodeId(nodeId).get();
-        switch (type) {
-            case QUERY:
-                final QueryMetadata queryMeta = dao.readQueryMetadata(tx, nodeId);
-                final String queryChild = queryMeta.getChildNodeId();
-                nodeIds.add(queryChild);
-                getChildNodeIds(tx, queryChild, nodeIds);
-                break;
-            case CONSTRUCT:
-                final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId);
-                final String constructChild = constructMeta.getChildNodeId();
-                nodeIds.add(constructChild);
-                getChildNodeIds(tx, constructChild, nodeIds);
-                break;
-            case JOIN:
-                final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId);
-                final String lchild = joinMeta.getLeftChildNodeId();
-                final String rchild = joinMeta.getRightChildNodeId();
-                nodeIds.add(lchild);
-                nodeIds.add(rchild);
-                getChildNodeIds(tx, lchild, nodeIds);
-                getChildNodeIds(tx, rchild, nodeIds);
-                break;
-            case FILTER:
-                final FilterMetadata filterMeta = dao.readFilterMetadata(tx, nodeId);
-                final String filterChild = filterMeta.getChildNodeId();
-                nodeIds.add(filterChild);
-                getChildNodeIds(tx, filterChild, nodeIds);
-                break;
-            case AGGREGATION:
-                final AggregationMetadata aggMeta = dao.readAggregationMetadata(tx, nodeId);
-                final String aggChild = aggMeta.getChildNodeId();
-                nodeIds.add(aggChild);
-                getChildNodeIds(tx, aggChild, nodeIds);
-                break;
-            case PERIODIC_QUERY:
-                final PeriodicQueryMetadata periodicMeta = dao.readPeriodicQueryMetadata(tx, nodeId);
-                final String periodicChild = periodicMeta.getChildNodeId();
-                nodeIds.add(periodicChild);
-                getChildNodeIds(tx, periodicChild, nodeIds);
-                break;
-            case STATEMENT_PATTERN:
-                break;
-        }
-    }
-
-    /**
-     * Deletes metadata for all nodeIds associated with a given queryId in a
-     * single transaction. Prevents additional BindingSets from being created as
-     * new triples are added.
-     *
-     * @param tx - Transaction of a given Fluo table. (not null)
-     * @param nodeIds - Nodes whose metatdata will be deleted. (not null)
-     * @param pcjId - The PCJ ID of the query whose will be deleted. (not null)
-     */
-    private void deleteMetadata(final Transaction tx, final List<String> nodeIds, final String pcjId) {
-        requireNonNull(tx);
-        requireNonNull(nodeIds);
-        requireNonNull(pcjId);
-
-        try (final Transaction typeTx = tx) {
-            deletePcjIdAndSparqlMetadata(typeTx, pcjId);
-
-            for (final String nodeId : nodeIds) {
-                final NodeType type = NodeType.fromNodeId(nodeId).get();
-                deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns());
-            }
-            typeTx.commit();
-        }
-    }
-
-    /**
-     * Deletes all metadata for a Query Node.
-     *
-     * @param tx - Transaction the deletes will be performed with. (not null)
-     * @param nodeId - The Node ID of the query node to delete. (not null)
-     * @param columns - The columns that will be deleted. (not null)
-     */
-    private void deleteMetadataColumns(final Transaction tx, final String nodeId, final List<Column> columns) {
-        requireNonNull(tx);
-        requireNonNull(columns);
-        requireNonNull(nodeId);
-
-        final Bytes row = Bytes.of(nodeId);
-        for (final Column column : columns) {
-            tx.delete(row, column);
-        }
-    }
-
-    /**
-     * Deletes high level query meta for converting from queryId to pcjId and
-     * vice versa, as well as converting from sparql to queryId.
-     *
-     * @param tx - Transaction the deletes will be performed with. (not null)
-     * @param pcjId - The PCJ whose metadata will be deleted. (not null)
-     */
-    private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) {
-        requireNonNull(tx);
-        requireNonNull(pcjId);
-
-        final String queryId = getQueryIdFromPcjId(tx, pcjId);
-        final String sparql = getSparqlFromQueryId(tx, queryId);
-        tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID);
-        tx.delete(sparql, FluoQueryColumns.QUERY_ID);
-        tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID);
-    }
-
-    /**
-     * Deletes all results (BindingSets or Statements) associated with the specified nodeId.
-     *
-     * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null)
-     * @param client - Used to delete the data. (not null)
-     */
-    private void deleteData(final FluoClient client, final String nodeId) {
-        requireNonNull(client);
-        requireNonNull(nodeId);
-
-        final NodeType type = NodeType.fromNodeId(nodeId).get();
-        Transaction tx = client.newTransaction();
-        while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) {
-            tx = client.newTransaction();
-        }
-    }
-
-    private CellScanner getIterator(final Transaction tx, final String nodeId, final Column column) {
-        requireNonNull(tx);
-        requireNonNull(nodeId);
-        requireNonNull(column);
-
-        return tx.scanner().fetch(column).over(Span.prefix(nodeId)).build();
-    }
-
-    private boolean deleteDataBatch(final Transaction tx, final CellScanner scanner, final Column column) {
-        requireNonNull(tx);
-        requireNonNull(scanner);
-        requireNonNull(column);
-
-        try (Transaction ntx = tx) {
-            int count = 0;
-            final Iterator<RowColumnValue> iter = scanner.iterator();
-            while (iter.hasNext() && count < batchSize) {
-                final Bytes row = iter.next().getRow();
-                count++;
-                tx.delete(row, column);
-            }
-
-            final boolean hasNext = iter.hasNext();
-            tx.commit();
-            return hasNext;
-        }
-    }
-
-    private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) {
-        requireNonNull(tx);
-        requireNonNull(pcjId);
-
-        final Bytes queryIdBytes = tx.get(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID);
-        return queryIdBytes.toString();
-    }
-
-    private String getSparqlFromQueryId(final Transaction tx, final String queryId) {
-        requireNonNull(tx);
-        requireNonNull(queryId);
-
-        final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId);
-        return metadata.getSparql();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
index 6c1aa01..76b62d8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
@@ -19,12 +19,8 @@ package org.apache.rya.indexing.pcj.fluo.app;
  * under the License.
  */
 import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
index d8d60b5..6642780 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
@@ -23,12 +23,13 @@ import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.log4j.Logger;
-import org.apache.rya.api.domain.RyaSchema;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaSubGraph;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
 /**
@@ -53,39 +54,26 @@ public class ConstructQueryResultUpdater {
     public void updateConstructQueryResults(TransactionBase tx, VisibilityBindingSet bs, ConstructQueryMetadata metadata) {
         
         String nodeId = metadata.getNodeId();
+        VariableOrder varOrder = metadata.getVariableOrder();
         Column column = FluoQueryColumns.CONSTRUCT_STATEMENTS;
         ConstructGraph graph = metadata.getConstructGraph();
+        String parentId = metadata.getParentNodeId();
         
-        try {
+        // Create the Row Key for the emitted binding set. It does not contain visibilities.
+        final Bytes resultRow = RowKeyUtil.makeRowKey(nodeId, varOrder, bs);
+
+        // If this is a new binding set, then emit it.
+        if(tx.get(resultRow, column) == null || varOrder.getVariableOrders().size() < bs.size()) {
             Set<RyaStatement> statements = graph.createGraphFromBindingSet(bs);
-            RyaSubGraph subgraph = new RyaSubGraph(metadata.getNodeId(), statements);
-            String resultId = nodeId + "_" + getSubGraphId(subgraph);
-            tx.set(Bytes.of(resultId), column, Bytes.of(serializer.toBytes(subgraph)));
-        } catch (Exception e) {
-            log.trace("Unable to serialize RyaStatement generated by ConstructGraph: " + graph + " from BindingSet: " + bs );
-        }
-    }
-    
-    /**
-     * Generates a simple hash used as an id for the subgraph.  Id generated as hash as opposed
-     * to UUID to avoid the same subgraph result being stored under multiple UUID.  
-     * @param subgraph - subgraph that an id is need for
-     * @return - hash of subgraph used as an id
-     */
-    private int getSubGraphId(RyaSubGraph subgraph) {
-        int id = 17;
-        id = 31*id + subgraph.getId().hashCode();
-        for(RyaStatement statement: subgraph.getStatements()) {
-            int statementId = 7;
-            if(!statement.getSubject().getData().startsWith(RyaSchema.BNODE_NAMESPACE)) {
-                statementId = 17*statementId + statement.getSubject().hashCode();
-            }
-            statementId = 17*statementId + statement.getPredicate().hashCode();
-            statementId = 17*statementId + statement.getObject().hashCode();
-            id += statementId;
+            RyaSubGraph subgraph = new RyaSubGraph(parentId, statements);
+            final Bytes nodeValueBytes = Bytes.of(serializer.toBytes(subgraph));
+
+            log.trace(
+                    "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+                    "New Binding Set: " + subgraph + "\n");
+
+            tx.set(resultRow, column, nodeValueBytes);
         }
-        return Math.abs(id);
     }
-    
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
index 1c99051..7cfa216 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
 import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
 import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
@@ -46,8 +45,6 @@ import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
 import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl;
 import org.openrdf.query.algebra.evaluation.util.QueryEvaluationUtil;
 
-import com.google.common.base.Optional;
-
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 import info.aduna.iteration.CloseableIteration;
@@ -114,24 +111,16 @@ public class FilterResultUpdater {
         // Evaluate whether the child BindingSet satisfies the filter's condition.
         final ValueExpr condition = filter.getCondition();
         if (isTrue(condition, childBindingSet)) {
-            // Create the Filter's binding set from the child's.
-            final VariableOrder filterVarOrder = filterMetadata.getVariableOrder();
-            final BindingSet filterBindingSet = BindingSetUtil.keepBindings(filterVarOrder, childBindingSet);
 
             // Create the Row Key for the emitted binding set. It does not contain visibilities.
-            final Bytes resultRow = RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, filterBindingSet);
-
-            // If this is a new binding set, then emit it.
-            if(tx.get(resultRow, FluoQueryColumns.FILTER_BINDING_SET) == null) {
-                final VisibilityBindingSet visBindingSet = new VisibilityBindingSet(filterBindingSet, childBindingSet.getVisibility());
-                final Bytes nodeValueBytes = BS_SERDE.serialize(visBindingSet);
+            final VariableOrder filterVarOrder = filterMetadata.getVariableOrder();
+            final Bytes resultRow = RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, childBindingSet);
 
-                log.trace(
-                        "Transaction ID: " + tx.getStartTimestamp() + "\n" +
-                        "New Binding Set: " + visBindingSet + "\n");
+            // Serialize and emit BindingSet
+            final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);
+            log.trace("Transaction ID: " + tx.getStartTimestamp() + "\n" + "New Binding Set: " + childBindingSet + "\n");
 
-                tx.set(resultRow, FluoQueryColumns.FILTER_BINDING_SET, nodeValueBytes);
-            }
+            tx.set(resultRow, FluoQueryColumns.FILTER_BINDING_SET, nodeValueBytes);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
index 05a8d1c..43a36de 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
@@ -23,8 +23,6 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DE
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE;
 
-import java.util.UUID;
-
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;