You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:02:02 UTC

[9/9] incubator-rya git commit: RYA-280-Periodic Query Service. Closes #177.

RYA-280-Periodic Query Service. Closes #177.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/2ca85427
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/2ca85427
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/2ca85427

Branch: refs/heads/master
Commit: 2ca854271c2eb928e9ccd10d40599c8c535952fa
Parents: ab8035a
Author: Caleb Meier <ca...@parsons.com>
Authored: Fri Apr 14 19:20:25 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Wed Aug 2 14:00:45 2017 -0700

----------------------------------------------------------------------
 common/rya.api/pom.xml                          |   6 +-
 .../client/accumulo/AccumuloCreatePCJIT.java    |   1 +
 extras/pom.xml                                  |   1 +
 .../pcj/storage/PeriodicQueryResultStorage.java | 115 +++++
 .../storage/PeriodicQueryStorageException.java  |  49 ++
 .../storage/PeriodicQueryStorageMetadata.java   |  99 ++++
 .../AccumuloPeriodicQueryResultStorage.java     | 270 ++++++++++
 .../AccumuloValueBindingSetIterator.java        |  73 +++
 .../pcj/storage/accumulo/PcjTables.java         |   5 +-
 .../accumulo/PeriodicQueryTableNameFactory.java |  55 ++
 .../accumulo/VisibilityBindingSetSerDe.java     |  76 +++
 .../accumulo/VisibilityBindingSetSerDeTest.java |  52 ++
 .../accumulo/accumulo/AccumuloPcjStorageIT.java | 276 ----------
 .../integration/AccumuloPcjStorageIT.java       | 276 ++++++++++
 .../AccumuloPeriodicQueryResultStorageIT.java   | 271 ++++++++++
 extras/rya.pcj.fluo/README.md                   |  25 +-
 .../rya/indexing/pcj/fluo/api/CreatePcj.java    | 184 +++++--
 .../rya/indexing/pcj/fluo/api/DeletePcj.java    |  53 +-
 extras/rya.pcj.fluo/pcj.fluo.app/pom.xml        | 101 ++--
 .../rya/indexing/pcj/fluo/app/FilterFinder.java |  84 ---
 .../pcj/fluo/app/FilterResultUpdater.java       |  14 +-
 .../fluo/app/IncrementalUpdateConstants.java    |   5 +
 .../pcj/fluo/app/JoinResultUpdater.java         |   1 +
 .../rya/indexing/pcj/fluo/app/NodeType.java     |   4 +
 .../pcj/fluo/app/PeriodicQueryUpdater.java      | 138 +++++
 .../pcj/fluo/app/QueryResultUpdater.java        |   3 +-
 .../pcj/fluo/app/VisibilityBindingSetSerDe.java |  77 ---
 .../batch/AbstractBatchBindingSetUpdater.java   |  55 ++
 .../app/batch/AbstractSpanBatchInformation.java | 101 ++++
 .../fluo/app/batch/BasicBatchInformation.java   |  81 +++
 .../fluo/app/batch/BatchBindingSetUpdater.java  |  43 ++
 .../pcj/fluo/app/batch/BatchInformation.java    |  57 +++
 .../pcj/fluo/app/batch/BatchInformationDAO.java |  59 +++
 .../pcj/fluo/app/batch/BatchObserver.java       |  63 +++
 .../pcj/fluo/app/batch/BatchRowKeyUtil.java     |  68 +++
 .../app/batch/JoinBatchBindingSetUpdater.java   | 184 +++++++
 .../fluo/app/batch/JoinBatchInformation.java    | 255 ++++++++++
 .../app/batch/SpanBatchBindingSetUpdater.java   | 128 +++++
 .../app/batch/SpanBatchDeleteInformation.java   |  95 ++++
 .../serializer/BatchInformationSerializer.java  |  58 +++
 .../serializer/BatchInformationTypeAdapter.java |  73 +++
 .../BatchInformationTypeAdapterFactory.java     |  65 +++
 .../JoinBatchInformationTypeAdapter.java        |  94 ++++
 .../SpanBatchInformationTypeAdapter.java        |  69 +++
 .../export/kafka/KafkaBindingSetExporter.java   |   8 +-
 .../app/export/rya/RyaBindingSetExporter.java   |  15 +-
 .../rya/RyaBindingSetExporterFactory.java       |   7 +-
 .../fluo/app/observers/BindingSetUpdater.java   |  16 +-
 .../pcj/fluo/app/observers/FilterObserver.java  |   2 +-
 .../pcj/fluo/app/observers/JoinObserver.java    |   2 +-
 .../app/observers/PeriodicQueryObserver.java    |  72 +++
 .../fluo/app/observers/QueryResultObserver.java |   2 +-
 .../app/observers/StatementPatternObserver.java |   2 +-
 .../pcj/fluo/app/observers/TripleObserver.java  |   2 +-
 .../pcj/fluo/app/query/AggregationMetadata.java |  18 +
 .../pcj/fluo/app/query/FilterMetadata.java      |  67 +--
 .../indexing/pcj/fluo/app/query/FluoQuery.java  |  81 ++-
 .../pcj/fluo/app/query/FluoQueryColumns.java    |  66 ++-
 .../fluo/app/query/FluoQueryMetadataDAO.java    | 119 ++++-
 .../fluo/app/query/PeriodicQueryMetadata.java   | 287 +++++++++++
 .../pcj/fluo/app/query/PeriodicQueryNode.java   | 154 ++++++
 .../pcj/fluo/app/query/QueryMetadata.java       |  15 +
 .../fluo/app/query/SparqlFluoQueryBuilder.java  |  89 +++-
 .../pcj/fluo/app/util/FilterSerializer.java     | 127 +++++
 .../pcj/fluo/app/util/FluoClientFactory.java    |  56 ++
 .../pcj/fluo/app/util/PeriodicQueryUtil.java    | 381 ++++++++++++++
 .../indexing/pcj/fluo/app/FilterFinderTest.java |  84 ---
 .../fluo/app/VisibilityBindingSetSerDeTest.java |  51 --
 .../BatchInformationSerializerTest.java         |  73 +++
 .../fluo/app/query/PeriodicQueryUtilTest.java   | 229 +++++++++
 .../fluo/client/util/QueryReportRenderer.java   |   3 +-
 .../rya.pcj.fluo/pcj.fluo.integration/pom.xml   |   5 +
 .../rya/indexing/pcj/fluo/FluoITBase.java       | 282 ----------
 .../indexing/pcj/fluo/KafkaExportITBase.java    | 370 --------------
 .../rya/indexing/pcj/fluo/RyaExportITBase.java  |  85 ----
 .../pcj/fluo/api/CountStatementsIT.java         |   2 +-
 .../indexing/pcj/fluo/api/GetPcjMetadataIT.java |   2 +-
 .../indexing/pcj/fluo/api/GetQueryReportIT.java |   2 +-
 .../indexing/pcj/fluo/api/ListQueryIdsIT.java   |   2 +-
 .../fluo/app/query/FluoQueryMetadataDAOIT.java  |  46 +-
 .../pcj/fluo/integration/BatchDeleteIT.java     | 316 ++++++++++++
 .../pcj/fluo/integration/CreateDeleteIT.java    |   2 +-
 .../indexing/pcj/fluo/integration/InputIT.java  |   2 +-
 .../pcj/fluo/integration/KafkaExportIT.java     |   2 +-
 .../integration/KafkaRyaSubGraphExportIT.java   |   2 +-
 .../indexing/pcj/fluo/integration/QueryIT.java  | 359 ++++++++++++-
 .../pcj/fluo/integration/RyaExportIT.java       |   2 +-
 .../RyaInputIncrementalUpdateIT.java            |   2 +-
 .../pcj/fluo/integration/StreamingTestIT.java   |   2 +-
 .../HistoricStreamingVisibilityIT.java          |   2 +-
 .../pcj/fluo/visibility/PcjVisibilityIT.java    |   3 +-
 extras/rya.pcj.fluo/pcj.fluo.test.base/pom.xml  | 108 ++++
 .../org/apache/rya/kafka/base/KafkaITBase.java  |  78 +++
 .../rya/pcj/fluo/test/base/FluoITBase.java      | 300 +++++++++++
 .../pcj/fluo/test/base/KafkaExportITBase.java   | 370 ++++++++++++++
 .../rya/pcj/fluo/test/base/RyaExportITBase.java |  80 +++
 extras/rya.pcj.fluo/pom.xml                     |   1 +
 .../periodic.service.integration.tests/pom.xml  |  77 +++
 .../PeriodicNotificationApplicationIT.java      | 509 +++++++++++++++++++
 .../PeriodicNotificationProviderIT.java         |  68 +++
 .../PeriodicNotificationExporterIT.java         | 130 +++++
 .../PeriodicNotificationProcessorIT.java        | 121 +++++
 .../pruner/PeriodicNotificationBinPrunerIT.java | 286 +++++++++++
 .../PeriodicCommandNotificationConsumerIT.java  | 120 +++++
 .../src/test/resources/notification.properties  |  35 ++
 .../periodic.service.notification/pom.xml       | 107 ++++
 .../periodic/notification/api/BinPruner.java    |  40 ++
 .../notification/api/BindingSetExporter.java    |  38 ++
 .../notification/api/CreatePeriodicQuery.java   | 113 ++++
 .../periodic/notification/api/LifeCycle.java    |  45 ++
 .../rya/periodic/notification/api/NodeBin.java  |  77 +++
 .../periodic/notification/api/Notification.java |  34 ++
 .../api/NotificationCoordinatorExecutor.java    |  41 ++
 .../notification/api/NotificationProcessor.java |  41 ++
 .../api/PeriodicNotificationClient.java         |  64 +++
 .../PeriodicApplicationException.java           |  47 ++
 .../PeriodicNotificationApplication.java        | 207 ++++++++
 ...dicNotificationApplicationConfiguration.java | 254 +++++++++
 .../PeriodicNotificationApplicationFactory.java | 140 +++++
 ...PeriodicNotificationCoordinatorExecutor.java | 159 ++++++
 .../notification/exporter/BindingSetRecord.java |  80 +++
 .../exporter/KafkaExporterExecutor.java         | 109 ++++
 .../KafkaPeriodicBindingSetExporter.java        |  98 ++++
 .../notification/BasicNotification.java         |  76 +++
 .../notification/CommandNotification.java       |  99 ++++
 .../notification/PeriodicNotification.java      | 178 +++++++
 .../notification/TimestampedNotification.java   |  69 +++
 .../NotificationProcessorExecutor.java          | 114 +++++
 .../TimestampedNotificationProcessor.java       | 203 ++++++++
 .../notification/pruner/AccumuloBinPruner.java  |  66 +++
 .../notification/pruner/FluoBinPruner.java      |  76 +++
 .../pruner/PeriodicQueryPruner.java             | 108 ++++
 .../pruner/PeriodicQueryPrunerExecutor.java     | 104 ++++
 .../recovery/PeriodicNotificationProvider.java  | 138 +++++
 .../kafka/KafkaNotificationProvider.java        | 123 +++++
 .../KafkaNotificationRegistrationClient.java    |  80 +++
 .../kafka/PeriodicNotificationConsumer.java     |  88 ++++
 .../BasicNotificationTypeAdapter.java           |  55 ++
 .../serialization/BindingSetSerDe.java          | 105 ++++
 .../CommandNotificationSerializer.java          |  76 +++
 .../CommandNotificationTypeAdapter.java         |  89 ++++
 .../PeriodicNotificationTypeAdapter.java        |  73 +++
 .../CommandNotificationSerializerTest.java      |  60 +++
 extras/rya.periodic.service/pom.xml             |  39 ++
 144 files changed, 11783 insertions(+), 1593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/common/rya.api/pom.xml
----------------------------------------------------------------------
diff --git a/common/rya.api/pom.xml b/common/rya.api/pom.xml
index 94f191d..3c80a13 100644
--- a/common/rya.api/pom.xml
+++ b/common/rya.api/pom.xml
@@ -71,9 +71,9 @@ under the License.
             <artifactId>jcip-annotations</artifactId>
         </dependency>
         <dependency>
-			<groupId>com.esotericsoftware.kryo</groupId>
-			<artifactId>kryo</artifactId>
-			<version>2.24.0</version>
+            <groupId>com.esotericsoftware.kryo</groupId>
+            <artifactId>kryo</artifactId>
+            <version>2.24.0</version>
 		</dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
index cb4b29a..9bbf01f 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
@@ -80,6 +80,7 @@ public class AccumuloCreatePCJIT extends FluoITBase {
             assertEquals(sparql, pcjMetadata.getSparql());
             assertEquals(0L, pcjMetadata.getCardinality());
 
+
             // Verify a Query ID was added for the query within the Fluo app.
             final List<String> fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient);
             assertEquals(1, fluoQueryIds.size());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/pom.xml
----------------------------------------------------------------------
diff --git a/extras/pom.xml b/extras/pom.xml
index 6acb51f..a2c8d58 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -33,6 +33,7 @@ under the License.
     <modules>
         <module>rya.prospector</module>
         <module>rya.manual</module>
+        <module>rya.periodic.service</module>
         <module>rya.console</module>
         <module>indexing</module>
         <module>rya.indexing.pcj</module>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
new file mode 100644
index 0000000..697b350
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Interface for storing and retrieving Periodic Query Results.
+ *
+ */
+public interface PeriodicQueryResultStorage {
+    
+    /**
+     * Binding name for the periodic bin id
+     */
+    public static String PeriodicBinId = "periodicBinId";
+
+    /**
+     * Creates a PeriodicQuery result storage layer for the given SPARQL query
+     * @param sparql - SPARQL query
+     * @return - id of the storage layer for the given SPARQL query
+     * @throws PeriodicQueryStorageException
+     */
+    public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException;
+    
+    /**
+     * Creates a PeriodicQuery result storage layer for the given SPARQL query with the given id
+     * @param queryId - id of the storage layer for the given SPARQL query
+     * @param sparql - SPARQL query whose periodic results will be stored
+     * @return - id of the storage layer 
+     * @throws PeriodicQueryStorageException
+     */
+    public String createPeriodicQuery(String queryId, String sparql) throws PeriodicQueryStorageException;
+    
+    /**
+     * Creates a PeriodicQuery result storage layer for the given SPARQL query with the given id
+     * whose results are written in the order indicated by the specified VariableOrder.
+     * @param queryId - id of the storage layer for the given SPARQL query
+     * @param sparql - SPARQL query whose periodic results will be stored
+     * @param varOrder - VariableOrder indicating the order that results will be written in
+     * @return - id of the storage layer 
+     * @throws PeriodicQueryStorageException
+     */
+    public void createPeriodicQuery(String queryId, String sparql, VariableOrder varOrder) throws PeriodicQueryStorageException;
+    
+    /**
+     * Retrieve the {@link PeriodicQueryStorageMetdata} for the give query id
+     * @param queryID - id of the query whose metadata will be returned
+     * @return PeriodicQueryStorageMetadata
+     * @throws PeriodicQueryStorageException
+     */
+    public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(String queryID) throws PeriodicQueryStorageException;;
+    
+    /**
+     * Add periodic query results to the storage layer indicated by the given query id
+     * @param queryId - id indicating the storage layer that results will be added to
+     * @param results - query results to be added to storage
+     * @throws PeriodicQueryStorageException
+     */
+    public void addPeriodicQueryResults(String queryId, Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException;;
+    
+    /**
+     * Deletes periodic query results from the storage layer
+     * @param queryId - id indicating the storage layer that results will be deleted from
+     * @param binID - bin id indicating the periodic id of results to be deleted
+     * @throws PeriodicQueryStorageException
+     */
+    public void deletePeriodicQueryResults(String queryId, long binID) throws PeriodicQueryStorageException;;
+    
+    /**
+     * Deletes all results for the storage layer indicated by the given query id 
+     * @param queryID - id indicating the storage layer whose results will be deleted
+     * @throws PeriodicQueryStorageException
+     */
+    public void deletePeriodicQuery(String queryID) throws PeriodicQueryStorageException;;
+    
+    /**
+     * List results in the given storage layer indicated by the query id
+     * @param queryId - id indicating the storage layer whose results will be listed
+     * @param binID - Optional id to indicate that only results with specific periodic id be listed
+     * @return
+     * @throws PeriodicQueryStorageException
+     */
+    public CloseableIterator<BindingSet> listResults(String queryId, Optional<Long> binID) throws PeriodicQueryStorageException;;
+    
+    /**
+     * List all storage tables containing periodic results.
+     * @return List of Strings with names of all tables containing periodic results
+     */
+    public List<String> listPeriodicTables();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java
new file mode 100644
index 0000000..f9e6969
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+/**
+ * This Exception is thrown by any implementation of {@link PeriodicQueryResultStorage}
+ * when any of its methods fail. 
+ *
+ */
+public class PeriodicQueryStorageException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs an instance of {@link PeriodicQueryStorageException}.
+     *
+     * @param message - Describes why the exception is being thrown.
+     */
+    public PeriodicQueryStorageException(final String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs an instance of {@link PeriodicQueryStorageException}.
+     *
+     * @param message - Describes why the exception is being thrown.
+     * @param cause - The exception that caused this one to be thrown.
+     */
+    public PeriodicQueryStorageException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java
new file mode 100644
index 0000000..9ce3522
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+import java.util.Objects;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *  Metadata for a given PeriodicQueryStorage table. 
+ */
+public class PeriodicQueryStorageMetadata {
+
+    private String sparql;
+    private VariableOrder varOrder;
+
+    /**
+     * Create a PeriodicQueryStorageMetadata object
+     * @param sparql - SPARQL query whose results are stored in table
+     * @param varOrder - order that BindingSet values are written in in table
+     */
+    public PeriodicQueryStorageMetadata(String sparql, VariableOrder varOrder) {
+        this.sparql = Preconditions.checkNotNull(sparql);
+        this.varOrder = Preconditions.checkNotNull(varOrder);
+    }
+    
+    /**
+     * Copy constructor.
+     * @param metadata - PeriodicQueryStorageMetadata object whose data is copied
+     */
+    public PeriodicQueryStorageMetadata(PcjMetadata metadata) {
+        this(metadata.getSparql(), metadata.getVarOrders().iterator().next());
+    }
+    
+
+    /**
+     * @return SPARQL query whose results are stored in the table
+     */
+    public String getSparql() {
+        return sparql;
+    }
+    
+    /**
+     * @return VariableOrder indicating the order that BindingSet Values are written in in table
+     */
+    public VariableOrder getVariableOrder() {
+        return varOrder;
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hash(sparql, varOrder);
+    }
+   
+    @Override
+    public boolean equals(final Object o) {
+        if (o == this) {
+            return true;
+        }
+
+        if (o instanceof PeriodicQueryStorageMetadata) {
+                PeriodicQueryStorageMetadata metadata = (PeriodicQueryStorageMetadata) o;
+                return new EqualsBuilder().append(sparql, metadata.sparql).append(varOrder, metadata.varOrder).isEquals();
+        }
+
+        return false;
+    }
+    
+    @Override
+    public String toString() {
+        return new StringBuilder()
+                .append("PeriodicQueryStorageMetadata {\n")
+                .append("    SPARQL: " + sparql + "\n")
+                .append("    Variable Order: " + varOrder + "\n")
+                .append("}")
+                .toString();
+    }
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
new file mode 100644
index 0000000..d7a50a7
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.AggregateOperatorBase;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for
+ * creating, deleting, and interacting with tables where PeriodicQuery results are stored.
+ */
+public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage {
+
+    private String ryaInstance;
+    private Connector accumuloConn;
+    private Authorizations auths;
+    private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+    private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+    private static final PcjTables pcjTables = new PcjTables();
+    private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory();
+
+    /**
+     * Creates a AccumuloPeriodicQueryResultStorage Object.
+     * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance
+     * @param ryaInstance - Rya Instance name for connecting to Rya
+     */
+    public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) {
+        this.accumuloConn = Preconditions.checkNotNull(accumuloConn);
+        this.ryaInstance = Preconditions.checkNotNull(ryaInstance);
+        String user = accumuloConn.whoami();
+        try {
+            this.auths = accumuloConn.securityOperations().getUserAuthorizations(user);
+        } catch (AccumuloException | AccumuloSecurityException e) {
+            throw new RuntimeException("Unable access user: " + user + "authorizations.");
+        }
+    }
+
+    @Override
+    public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException {
+        Preconditions.checkNotNull(sparql);
+        String queryId = pcjIdFactory.nextId();
+        return createPeriodicQuery(queryId, sparql);
+    }
+    
+    @Override
+    public String createPeriodicQuery(String queryId, String sparql) throws PeriodicQueryStorageException {
+        Set<String> bindingNames;
+        try {
+            bindingNames = new AggregateVariableRemover().getNonAggregationVariables(sparql);
+        } catch (MalformedQueryException e) {
+            throw new PeriodicQueryStorageException(e.getMessage());
+        }
+        List<String> varOrderList = new ArrayList<>();
+        varOrderList.add(PeriodicQueryResultStorage.PeriodicBinId);
+        varOrderList.addAll(bindingNames);
+        createPeriodicQuery(queryId, sparql, new VariableOrder(varOrderList));
+        return queryId;
+    }
+
+    @Override
+    public void createPeriodicQuery(String queryId, String sparql, VariableOrder order) throws PeriodicQueryStorageException {
+        Preconditions.checkNotNull(sparql);
+        Preconditions.checkNotNull(queryId);
+        Preconditions.checkNotNull(order);
+        Preconditions.checkArgument(PeriodicQueryResultStorage.PeriodicBinId.equals(order.getVariableOrders().get(0)),
+                "periodicBinId binding name must occur first in VariableOrder.");
+        String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
+        Set<VariableOrder> varOrders = new HashSet<>();
+        varOrders.add(order);
+        try {
+            pcjTables.createPcjTable(accumuloConn, tableName, varOrders, sparql);
+        } catch (Exception e) {
+            throw new PeriodicQueryStorageException(e.getMessage());
+        }
+    }
+
+    @Override
+    public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(String queryId) throws PeriodicQueryStorageException {
+        try {
+            return new PeriodicQueryStorageMetadata(
+                    pcjTables.getPcjMetadata(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId)));
+        } catch (Exception e) {
+            throw new PeriodicQueryStorageException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void addPeriodicQueryResults(String queryId, Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException {
+        results.forEach(x -> Preconditions.checkArgument(x.hasBinding(PeriodicQueryResultStorage.PeriodicBinId),
+                "BindingSet must contain periodBinId binding."));
+        try {
+            pcjTables.addResults(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId), results);
+        } catch (Exception e) {
+            throw new PeriodicQueryStorageException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void deletePeriodicQueryResults(String queryId, long binId) throws PeriodicQueryStorageException {
+        String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
+        try {
+            Text prefix = getRowPrefix(binId);
+            BatchDeleter deleter = accumuloConn.createBatchDeleter(tableName, auths, 1, new BatchWriterConfig());
+            deleter.setRanges(Collections.singleton(Range.prefix(prefix)));
+            deleter.delete();
+        } catch (Exception e) {
+            throw new PeriodicQueryStorageException(e.getMessage());
+        }
+    }
+
+    public void deletePeriodicQueryResults(String queryId) throws PeriodicQueryStorageException {
+        try {
+            pcjTables.purgePcjTable(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId));
+        } catch (Exception e) {
+            throw new PeriodicQueryStorageException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void deletePeriodicQuery(String queryId) throws PeriodicQueryStorageException {
+        try {
+            pcjTables.dropPcjTable(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId));
+        } catch (Exception e) {
+            throw new PeriodicQueryStorageException(e.getMessage());
+        }
+    }
+
+    @Override
+    public CloseableIterator<BindingSet> listResults(String queryId, Optional<Long> binId)
+            throws PeriodicQueryStorageException {
+        requireNonNull(queryId);
+
+        String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
+        // Fetch the Variable Orders for the binding sets and choose one of
+        // them. It
+        // doesn't matter which one we choose because they all result in the
+        // same output.
+        final PeriodicQueryStorageMetadata metadata = getPeriodicQueryMetadata(queryId);
+        final VariableOrder varOrder = metadata.getVariableOrder();
+
+        try {
+            // Fetch only the Binding Sets whose Variable Order matches the
+            // selected one.
+            final Scanner scanner = accumuloConn.createScanner(tableName, auths);
+            scanner.fetchColumnFamily(new Text(varOrder.toString()));
+            if (binId.isPresent()) {
+                scanner.setRange(Range.prefix(getRowPrefix(binId.get())));
+            }
+            return new AccumuloValueBindingSetIterator(scanner);
+
+        } catch (Exception e) {
+            throw new PeriodicQueryStorageException(String.format("PCJ Table does not exist for name '%s'.", tableName), e);
+        }
+    }
+    
+    private Text getRowPrefix(long binId) throws BindingSetConversionException {
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding(PeriodicQueryResultStorage.PeriodicBinId, new LiteralImpl(Long.toString(binId), XMLSchema.LONG));
+        
+        return new Text(converter.convert(bs, new VariableOrder(PeriodicQueryResultStorage.PeriodicBinId)));
+    }
+
+    @Override
+    public List<String> listPeriodicTables() {
+
+        final List<String> periodicTables = new ArrayList<>();
+        final String periodicPrefix = ryaInstance + PeriodicQueryTableNameFactory.PeriodicTableSuffix;
+        boolean foundInstance = false;
+
+        for (final String tableName : accumuloConn.tableOperations().list()) {
+            if (tableName.startsWith(ryaInstance)) {
+                // This table is part of the target Rya instance.
+                foundInstance = true;
+
+                if (tableName.startsWith(periodicPrefix)) {
+                    periodicTables.add(tableName);
+                }
+            } else if (foundInstance) {
+                // We have encountered the first table name that does not start
+                // with the rya instance name after those that do. Because the
+                // list is sorted, there can't be any more pcj tables for the
+                // target instance in the list.
+                break;
+            }
+        }
+        return periodicTables;
+    }
+    
+    /**
+     * Class for removing any aggregate variables from the ProjectionElementList
+     * of the parsed SPARQL queries. This ensures that only non-aggregation
+     * values are contained in the Accumulo row.  The non-aggregation variables
+     * are not updated while the aggregation variables are, so they are included in
+     * the serialized BindingSet in the Accumulo Value field, which is overwritten
+     * if an entry with the same Key and different Value (updated aggregation) is 
+     * written to the table.
+     *
+     */
+    static class AggregateVariableRemover extends QueryModelVisitorBase<RuntimeException> {
+        
+        private Set<String> bindingNames;
+        
+        public Set<String> getNonAggregationVariables(String sparql) throws MalformedQueryException {
+            TupleExpr te = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+            bindingNames = te.getBindingNames();
+            te.visit(this);
+            return bindingNames;
+        }
+        
+        @Override
+        public void meet(ExtensionElem node) {
+            if(node.getExpr() instanceof AggregateOperatorBase) {
+                bindingNames.remove(node.getName());
+            }
+        }
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
new file mode 100644
index 0000000..946c712
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Implementation of CloseableIterator for retrieving results from a {@link PeriodicQueryResultStorage}
+ * table.
+ *
+ */
+public class AccumuloValueBindingSetIterator implements CloseableIterator<BindingSet>{
+    
+    private final Scanner scanner;
+    private final Iterator<Entry<Key, Value>> iter;
+    private final VisibilityBindingSetSerDe bsSerDe = new VisibilityBindingSetSerDe();
+    
+    public AccumuloValueBindingSetIterator(Scanner scanner) {
+        this.scanner = scanner;
+        iter = scanner.iterator();
+    }
+    
+    @Override
+    public boolean hasNext() {
+        return iter.hasNext();
+    }
+    
+    @Override 
+    public BindingSet next() {
+        try {
+            return bsSerDe.deserialize(Bytes.of(iter.next().getValue().get())).set;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    @Override
+    public void close() {
+        scanner.close();
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
index 5d13597..d5451ae 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
@@ -403,6 +403,7 @@ public class PcjTables {
 
         final Set<Mutation> mutations = new HashSet<>();
         final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+        VisibilityBindingSetSerDe bsSerDe = new VisibilityBindingSetSerDe();
 
         for(final VariableOrder varOrder : varOrders) {
             try {
@@ -412,9 +413,9 @@ public class PcjTables {
                 // Row ID = binding set values, Column Family = variable order of the binding set.
                 final Mutation addResult = new Mutation(rowKey);
                 final String visibility = result.getVisibility();
-                addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), "");
+                addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), new Value(bsSerDe.serialize(result).toArray()));
                 mutations.add(addResult);
-            } catch(final BindingSetConversionException e) {
+            } catch(Exception e) {
                 throw new PCJStorageException("Could not serialize a result.", e);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java
new file mode 100644
index 0000000..561cad2
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+
+/**
+ * Class for creating the names of {@link PeriodicQueryResultStorage} tables.
+ *
+ */
+public class PeriodicQueryTableNameFactory {
+
+    public static final String PeriodicTableSuffix = "PERIODIC_QUERY_";
+    
+    /**
+     * Creates name of a table from the indicating rya instance and query id
+     * @param ryaInstance - name of rya instance table will belong to
+     * @param queryId - id of query whose results will be stored in this table
+     * @return - name of PeriodicQueryResultStorage table
+     */
+    public String makeTableName(final String ryaInstance, final String queryId) {
+        requireNonNull(ryaInstance);
+        requireNonNull(queryId);
+        return ryaInstance + PeriodicTableSuffix + queryId.toString().replaceAll("-", "");
+    }
+
+    /**
+     * Extract query id from PeriodicQueryResultStorage table name
+     * @param periodTableName - name of table
+     * @return - query id whose results are stored in table
+     */
+    public String getPeriodicQueryId(final String periodTableName) {
+        requireNonNull(periodTableName);
+        return periodTableName.split(PeriodicTableSuffix)[1];
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java
new file mode 100644
index 0000000..ae43a9a
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.fluo.api.data.Bytes;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Serializes and deserializes a {@link VisibilityBindingSet} to and from {@link Bytes} objects.
+ */
+@DefaultAnnotation(NonNull.class)
+public class VisibilityBindingSetSerDe {
+
+    /**
+     * Serializes a {@link VisibilityBindingSet} into a {@link Bytes} object.
+     *
+     * @param bindingSet - The binding set that will be serialized. (not null)
+     * @return The serialized object.
+     * @throws Exception A problem was encountered while serializing the object.
+     */
+    public Bytes serialize(final VisibilityBindingSet bindingSet) throws Exception {
+        requireNonNull(bindingSet);
+
+        final ByteArrayOutputStream boas = new ByteArrayOutputStream();
+        try(final ObjectOutputStream oos = new ObjectOutputStream(boas)) {
+            oos.writeObject(bindingSet);
+        }
+
+        return Bytes.of(boas.toByteArray());
+    }
+
+    /**
+     * Deserializes a {@link VisibilityBindingSet} from a {@link Bytes} object.
+     *
+     * @param bytes - The bytes that will be deserialized. (not null)
+     * @return The deserialized object.
+     * @throws Exception A problem was encountered while deserializing the object.
+     */
+    public VisibilityBindingSet deserialize(final Bytes bytes) throws Exception {
+        requireNonNull(bytes);
+
+        try(final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toArray()))) {
+            final Object o = ois.readObject();
+            if(o instanceof VisibilityBindingSet) {
+                return (VisibilityBindingSet) o;
+            } else {
+                throw new Exception("Deserialized Object is not a VisibilityBindingSet. Was: " + o.getClass());
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDeTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDeTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDeTest.java
new file mode 100644
index 0000000..16f56c1
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDeTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Tests the methods of {@link VisibilityBindingSetSerDe}.
+ */
+public class VisibilityBindingSetSerDeTest {
+
+    @Test
+    public void rountTrip() throws Exception {
+        final ValueFactory vf = new ValueFactoryImpl();
+
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("name", vf.createLiteral("Alice"));
+        bs.addBinding("age", vf.createLiteral(5));
+        final VisibilityBindingSet original = new VisibilityBindingSet(bs, "u");
+
+        final VisibilityBindingSetSerDe serde = new VisibilityBindingSetSerDe();
+        final Bytes bytes = serde.serialize(original);
+        final VisibilityBindingSet result = serde.deserialize(bytes);
+
+        assertEquals(original, result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
deleted file mode 100644
index 98ed4c7..0000000
--- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.storage.accumulo.accumulo;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.rya.accumulo.AccumuloRyaITBase;
-import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
-import org.apache.rya.api.instance.RyaDetailsRepository;
-import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
-import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
-import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.junit.Test;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Integration tests the methods of {@link AccumuloPcjStorage}.
- * </p>
- * These tests ensures that the PCJ tables are maintained and that these operations
- * also update the Rya instance's details.
- */
-public class AccumuloPcjStorageIT extends AccumuloRyaITBase {
-
-    @Test
-    public void createPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
-        // Setup the PCJ storage that will be tested against.
-        final Connector connector = super.getClusterInstance().getConnector();
-        final String ryaInstanceName = super.getRyaInstanceName();
-        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
-            // Create a PCJ.
-            final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
-
-            // Ensure the Rya details have been updated to include the PCJ's ID.
-            final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
-
-            final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
-                    .getPCJIndexDetails()
-                    .getPCJDetails();
-
-            final PCJDetails expectedDetails = PCJDetails.builder()
-                    .setId( pcjId )
-                    .build();
-
-            assertEquals(expectedDetails, detailsMap.get(pcjId));
-        }
-    }
-
-    @Test
-    public void dropPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
-        // Setup the PCJ storage that will be tested against.
-        final Connector connector = super.getClusterInstance().getConnector();
-        final String ryaInstanceName = super.getRyaInstanceName();
-        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
-            // Create a PCJ.
-            final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
-
-            // Delete the PCJ that was just created.
-            pcjStorage.dropPcj(pcjId);
-
-            // Ensure the Rya details have been updated to no longer include the PCJ's ID.
-            final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
-
-            final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
-                    .getPCJIndexDetails()
-                    .getPCJDetails();
-
-            assertFalse( detailsMap.containsKey(pcjId) );
-        }
-    }
-
-    @Test
-    public void listPcjs() throws AccumuloException, AccumuloSecurityException, PCJStorageException {
-        // Setup the PCJ storage that will be tested against.
-        final Connector connector = super.getClusterInstance().getConnector();
-        final String ryaInstanceName = super.getRyaInstanceName();
-        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
-            // Create a few PCJs and hold onto their IDs.
-            final List<String> expectedIds = new ArrayList<>();
-
-            String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
-            expectedIds.add( pcjId );
-
-            pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
-            expectedIds.add( pcjId );
-
-            pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
-            expectedIds.add( pcjId );
-
-            // Fetch the PCJ names
-            final List<String> pcjIds = pcjStorage.listPcjs();
-
-            // Ensure the expected IDs match the fetched IDs.
-            Collections.sort(expectedIds);
-            Collections.sort(pcjIds);
-            assertEquals(expectedIds, pcjIds);
-        }
-    }
-
-    @Test
-    public void getPcjMetadata() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException {
-        // Setup the PCJ storage that will be tested against.
-        final Connector connector = super.getClusterInstance().getConnector();
-        final String ryaInstanceName = super.getRyaInstanceName();
-        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
-            // Create a PCJ.
-            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
-            final String pcjId = pcjStorage.createPcj(sparql);
-
-            // Fetch the PCJ's metadata.
-            final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
-
-            // Ensure it has the expected values.
-            final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
-            final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
-            assertEquals(expectedMetadata, metadata);
-        }
-    }
-
-    @Test
-    public void addResults() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException {
-        // Setup the PCJ storage that will be tested against.
-        final Connector connector = super.getClusterInstance().getConnector();
-        final String ryaInstanceName = super.getRyaInstanceName();
-        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
-            // Create a PCJ.
-            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
-            final String pcjId = pcjStorage.createPcj(sparql);
-
-            // Add some binding sets to it.
-            final Set<VisibilityBindingSet> results = new HashSet<>();
-
-            final MapBindingSet aliceBS = new MapBindingSet();
-            aliceBS.addBinding("a", new URIImpl("http://Alice"));
-            aliceBS.addBinding("b", new URIImpl("http://Person"));
-            results.add( new VisibilityBindingSet(aliceBS, "") );
-
-            final MapBindingSet charlieBS = new MapBindingSet();
-            charlieBS.addBinding("a", new URIImpl("http://Charlie"));
-            charlieBS.addBinding("b", new URIImpl("http://Comedian"));
-            results.add( new VisibilityBindingSet(charlieBS, "") );
-
-            pcjStorage.addResults(pcjId, results);
-
-            // Make sure the PCJ metadata was updated.
-            final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
-
-            final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
-            final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, varOrders);
-            assertEquals(expectedMetadata, metadata);
-        }
-    }
-
-    @Test
-    public void listResults() throws Exception {
-        // Setup the PCJ storage that will be tested against.
-        final Connector connector = super.getClusterInstance().getConnector();
-        final String ryaInstanceName = super.getRyaInstanceName();
-        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
-            // Create a PCJ.
-            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
-            final String pcjId = pcjStorage.createPcj(sparql);
-
-            // Add some binding sets to it.
-            final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
-
-            final MapBindingSet aliceBS = new MapBindingSet();
-            aliceBS.addBinding("a", new URIImpl("http://Alice"));
-            aliceBS.addBinding("b", new URIImpl("http://Person"));
-            expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
-
-            final MapBindingSet charlieBS = new MapBindingSet();
-            charlieBS.addBinding("a", new URIImpl("http://Charlie"));
-            charlieBS.addBinding("b", new URIImpl("http://Comedian"));
-            expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
-
-            pcjStorage.addResults(pcjId, expectedResults);
-
-            // List the results that were stored.
-            final Set<BindingSet> results = new HashSet<>();
-            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
-                while(resultsIt.hasNext()) {
-                    results.add( resultsIt.next() );
-                }
-            }
-
-            assertEquals(expectedResults, results);
-        }
-    }
-
-    @Test
-    public void purge() throws Exception {
-        // Setup the PCJ storage that will be tested against.
-        final Connector connector = super.getClusterInstance().getConnector();
-        final String ryaInstanceName = super.getRyaInstanceName();
-        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
-            // Create a PCJ.
-            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
-            final String pcjId = pcjStorage.createPcj(sparql);
-
-            // Add some binding sets to it.
-            final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
-
-            final MapBindingSet aliceBS = new MapBindingSet();
-            aliceBS.addBinding("a", new URIImpl("http://Alice"));
-            aliceBS.addBinding("b", new URIImpl("http://Person"));
-            expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
-
-            final MapBindingSet charlieBS = new MapBindingSet();
-            charlieBS.addBinding("a", new URIImpl("http://Charlie"));
-            charlieBS.addBinding("b", new URIImpl("http://Comedian"));
-            expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
-
-            pcjStorage.addResults(pcjId, expectedResults);
-
-            // Purge the PCJ.
-            pcjStorage.purge(pcjId);
-
-            // List the results that were stored.
-            final Set<BindingSet> results = new HashSet<>();
-            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
-                while(resultsIt.hasNext()) {
-                    results.add( resultsIt.next() );
-                }
-            }
-
-            assertTrue( results.isEmpty() );
-
-            // Make sure the PCJ metadata was updated.
-            final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
-
-            final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
-            final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
-            assertEquals(expectedMetadata, metadata);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
new file mode 100644
index 0000000..2964d91
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.accumulo.AccumuloRyaITBase;
+import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Integration tests the methods of {@link AccumuloPcjStorage}.
+ * </p>
+ * These tests ensures that the PCJ tables are maintained and that these operations
+ * also update the Rya instance's details.
+ */
+public class AccumuloPcjStorageIT extends AccumuloRyaITBase {
+
+    @Test
+    public void createPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+
+            // Ensure the Rya details have been updated to include the PCJ's ID.
+            final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
+
+            final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
+                    .getPCJIndexDetails()
+                    .getPCJDetails();
+
+            final PCJDetails expectedDetails = PCJDetails.builder()
+                    .setId( pcjId )
+                    .build();
+
+            assertEquals(expectedDetails, detailsMap.get(pcjId));
+        }
+    }
+
+    @Test
+    public void dropPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+
+            // Delete the PCJ that was just created.
+            pcjStorage.dropPcj(pcjId);
+
+            // Ensure the Rya details have been updated to no longer include the PCJ's ID.
+            final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
+
+            final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
+                    .getPCJIndexDetails()
+                    .getPCJDetails();
+
+            assertFalse( detailsMap.containsKey(pcjId) );
+        }
+    }
+
+    @Test
+    public void listPcjs() throws AccumuloException, AccumuloSecurityException, PCJStorageException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a few PCJs and hold onto their IDs.
+            final List<String> expectedIds = new ArrayList<>();
+
+            String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+            expectedIds.add( pcjId );
+
+            pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+            expectedIds.add( pcjId );
+
+            pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+            expectedIds.add( pcjId );
+
+            // Fetch the PCJ names
+            final List<String> pcjIds = pcjStorage.listPcjs();
+
+            // Ensure the expected IDs match the fetched IDs.
+            Collections.sort(expectedIds);
+            Collections.sort(pcjIds);
+            assertEquals(expectedIds, pcjIds);
+        }
+    }
+
+    @Test
+    public void getPcjMetadata() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Fetch the PCJ's metadata.
+            final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+            // Ensure it has the expected values.
+            final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+            final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
+            assertEquals(expectedMetadata, metadata);
+        }
+    }
+
+    @Test
+    public void addResults() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Add some binding sets to it.
+            final Set<VisibilityBindingSet> results = new HashSet<>();
+
+            final MapBindingSet aliceBS = new MapBindingSet();
+            aliceBS.addBinding("a", new URIImpl("http://Alice"));
+            aliceBS.addBinding("b", new URIImpl("http://Person"));
+            results.add( new VisibilityBindingSet(aliceBS, "") );
+
+            final MapBindingSet charlieBS = new MapBindingSet();
+            charlieBS.addBinding("a", new URIImpl("http://Charlie"));
+            charlieBS.addBinding("b", new URIImpl("http://Comedian"));
+            results.add( new VisibilityBindingSet(charlieBS, "") );
+
+            pcjStorage.addResults(pcjId, results);
+
+            // Make sure the PCJ metadata was updated.
+            final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+            final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+            final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, varOrders);
+            assertEquals(expectedMetadata, metadata);
+        }
+    }
+
+    @Test
+    public void listResults() throws Exception {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Add some binding sets to it.
+            final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+            final MapBindingSet aliceBS = new MapBindingSet();
+            aliceBS.addBinding("a", new URIImpl("http://Alice"));
+            aliceBS.addBinding("b", new URIImpl("http://Person"));
+            expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
+
+            final MapBindingSet charlieBS = new MapBindingSet();
+            charlieBS.addBinding("a", new URIImpl("http://Charlie"));
+            charlieBS.addBinding("b", new URIImpl("http://Comedian"));
+            expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
+
+            pcjStorage.addResults(pcjId, expectedResults);
+
+            // List the results that were stored.
+            final Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
+
+            assertEquals(expectedResults, results);
+        }
+    }
+
+    @Test
+    public void purge() throws Exception {
+        // Setup the PCJ storage that will be tested against.
+        final Connector connector = super.getClusterInstance().getConnector();
+        final String ryaInstanceName = super.getRyaInstanceName();
+        try(final PrecomputedJoinStorage pcjStorage =  new AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Add some binding sets to it.
+            final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+            final MapBindingSet aliceBS = new MapBindingSet();
+            aliceBS.addBinding("a", new URIImpl("http://Alice"));
+            aliceBS.addBinding("b", new URIImpl("http://Person"));
+            expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
+
+            final MapBindingSet charlieBS = new MapBindingSet();
+            charlieBS.addBinding("a", new URIImpl("http://Charlie"));
+            charlieBS.addBinding("b", new URIImpl("http://Comedian"));
+            expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
+
+            pcjStorage.addResults(pcjId, expectedResults);
+
+            // Purge the PCJ.
+            pcjStorage.purge(pcjId);
+
+            // List the results that were stored.
+            final Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
+
+            assertTrue( results.isEmpty() );
+
+            // Make sure the PCJ metadata was updated.
+            final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+            final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+            final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
+            assertEquals(expectedMetadata, metadata);
+        }
+    }
+}
\ No newline at end of file