You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/04/24 15:06:27 UTC

[9/9] incubator-rya git commit: RYA-260 Fluo PCJ application has had Aggregation support added to it. Also fixed a bunch of resource leaks that were causing integration tests to fail. Closes #156.

RYA-260 Fluo PCJ application has had Aggregation support added to it. Also fixed a bunch of resource leaks that were causing integration tests to fail. Closes #156.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/c941aea8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/c941aea8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/c941aea8

Branch: refs/heads/master
Commit: c941aea8b65acb99b451757c48279914d9488c85
Parents: be9ea9a
Author: Kevin Chilton <ke...@parsons.com>
Authored: Fri Apr 7 15:57:57 2017 -0400
Committer: Caleb Meier <ca...@parsons.com>
Committed: Mon Apr 24 07:58:25 2017 -0700

----------------------------------------------------------------------
 .../AccumuloRyaInstanceDetailsRepository.java   |  20 +-
 .../accumulo/utils/VisibilitySimplifier.java    |  30 +-
 .../utils/VisibilitySimplifierTest.java         |  24 +
 .../rya/accumulo/utils/RyaTableNames.java       |  11 +-
 .../client/accumulo/AccumuloBatchUpdatePCJ.java |  87 ++-
 .../api/client/accumulo/AccumuloCreatePCJ.java  |  86 ++-
 .../api/client/accumulo/AccumuloDeletePCJ.java  |  38 +-
 .../pcj/matching/AccumuloIndexSetProvider.java  |  62 +-
 .../accumulo/AccumuloBatchUpdatePCJIT.java      |  10 +-
 .../client/accumulo/AccumuloCreatePCJIT.java    |  72 +-
 .../client/accumulo/AccumuloDeletePCJIT.java    |  46 +-
 .../benchmark/query/QueryBenchmarkRunIT.java    |  29 +-
 .../pcj/storage/PrecomputedJoinStorage.java     |  22 +-
 .../storage/accumulo/AccumuloPcjSerializer.java |  20 -
 .../storage/accumulo/AccumuloPcjStorage.java    |   8 +-
 .../storage/accumulo/BindingSetConverter.java   |   9 +-
 .../accumulo/BindingSetStringConverter.java     |  53 +-
 .../pcj/storage/accumulo/PcjTables.java         |  39 +-
 .../accumulo/ScannerBindingSetIterator.java     |  18 +-
 .../pcj/storage/accumulo/VariableOrder.java     |  15 +-
 .../pcj/update/PrecomputedJoinUpdater.java      |  10 +-
 .../accumulo/AccumuloPcjSerializerTest.java     | 185 +++++
 .../accumulo/AccumuloPcjSerialzerTest.java      | 175 -----
 .../accumulo/BindingSetStringConverterTest.java |  42 +-
 .../accumulo/PcjTablesIntegrationTest.java      |  26 +-
 .../accumulo/accumulo/AccumuloPcjStorageIT.java | 284 ++++----
 .../rya/indexing/pcj/fluo/api/CreatePcj.java    | 277 ++++----
 .../rya/indexing/pcj/fluo/api/DeletePcj.java    |  28 +-
 extras/rya.pcj.fluo/pcj.fluo.app/pom.xml        |  10 +-
 .../pcj/fluo/app/AggregationResultUpdater.java  | 572 +++++++++++++++
 .../indexing/pcj/fluo/app/BindingSetRow.java    |  11 +-
 .../pcj/fluo/app/FilterResultUpdater.java       |  57 +-
 .../rya/indexing/pcj/fluo/app/IncUpdateDAO.java |  95 +--
 .../fluo/app/IncrementalUpdateConstants.java    |   1 +
 .../pcj/fluo/app/JoinResultUpdater.java         | 131 ++--
 .../rya/indexing/pcj/fluo/app/NodeType.java     |  11 +-
 .../pcj/fluo/app/QueryResultUpdater.java        |  69 +-
 .../pcj/fluo/app/VisibilityBindingSetSerDe.java |  77 +++
 .../app/export/IncrementalResultExporter.java   |  12 +-
 .../app/export/kafka/KafkaResultExporter.java   |  32 +-
 .../fluo/app/export/rya/RyaResultExporter.java  |  16 +-
 .../fluo/app/observers/AggregationObserver.java |  74 ++
 .../fluo/app/observers/BindingSetUpdater.java   |  53 +-
 .../pcj/fluo/app/observers/FilterObserver.java  |  25 +-
 .../pcj/fluo/app/observers/JoinObserver.java    |  24 +-
 .../fluo/app/observers/QueryResultObserver.java |  54 +-
 .../app/observers/StatementPatternObserver.java |  25 +-
 .../pcj/fluo/app/observers/TripleObserver.java  | 158 +++--
 .../pcj/fluo/app/query/AggregationMetadata.java | 371 ++++++++++
 .../indexing/pcj/fluo/app/query/FluoQuery.java  | 111 ++-
 .../pcj/fluo/app/query/FluoQueryColumns.java    |  60 +-
 .../fluo/app/query/FluoQueryMetadataDAO.java    | 186 ++++-
 .../fluo/app/query/SparqlFluoQueryBuilder.java  | 116 +++-
 .../pcj/fluo/app/util/BindingSetUtil.java       |  54 ++
 .../indexing/pcj/fluo/app/util/RowKeyUtil.java  |  69 ++
 .../fluo/app/VisibilityBindingSetSerDeTest.java |  51 ++
 .../fluo/client/command/NewQueryCommand.java    |   2 +-
 .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java  |  38 +-
 .../rya.pcj.fluo/pcj.fluo.integration/pom.xml   |  13 +-
 .../apache/rya/indexing/pcj/fluo/ITBase.java    | 443 ------------
 .../indexing/pcj/fluo/KafkaExportITBase.java    | 315 +++++++++
 .../rya/indexing/pcj/fluo/RyaExportITBase.java  | 182 +++++
 .../pcj/fluo/api/CountStatementsIT.java         |  54 +-
 .../indexing/pcj/fluo/api/GetPcjMetadataIT.java |  91 +--
 .../indexing/pcj/fluo/api/GetQueryReportIT.java | 107 +--
 .../indexing/pcj/fluo/api/ListQueryIdsIT.java   |  35 +-
 .../fluo/app/query/FluoQueryMetadataDAOIT.java  | 204 ++++--
 .../pcj/fluo/integration/CreateDeleteIT.java    | 166 +++--
 .../indexing/pcj/fluo/integration/InputIT.java  | 275 +++++---
 .../pcj/fluo/integration/KafkaExportIT.java     | 693 ++++++++++++-------
 .../indexing/pcj/fluo/integration/QueryIT.java  | 580 ++++++++--------
 .../pcj/fluo/integration/RyaExportIT.java       | 101 +--
 .../RyaInputIncrementalUpdateIT.java            | 245 ++++---
 .../pcj/fluo/integration/StreamingTestIT.java   | 140 ++--
 .../HistoricStreamingVisibilityIT.java          |  80 ++-
 .../pcj/fluo/visibility/PcjVisibilityIT.java    | 199 +++---
 .../rya.pcj.fluo/rya.pcj.functions.geo/pom.xml  |  44 +-
 .../rya/indexing/pcj/fluo/RyaExportITBase.java  | 182 +++++
 .../pcj/functions/geo/GeoFunctionsIT.java       | 471 ++++++-------
 pom.xml                                         |  16 +
 80 files changed, 5719 insertions(+), 3208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
index be8e12c..dcd64de 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
@@ -22,9 +22,6 @@ import static java.util.Objects.requireNonNull;
 
 import java.util.Map.Entry;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -48,6 +45,9 @@ import org.apache.hadoop.io.Text;
 import org.apache.rya.api.instance.RyaDetails;
 import org.apache.rya.api.instance.RyaDetailsRepository;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 /**
  * An implementation of {@link RyaDetailsRepository} that stores a Rya
  * instance's {@link RyaDetails} in an Accumulo table.
@@ -89,12 +89,17 @@ public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepositor
 
     @Override
     public boolean isInitialized() throws RyaDetailsRepositoryException {
+        Scanner scanner = null;
         try {
-            final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations());
+            scanner = connector.createScanner(detailsTableName, new Authorizations());
             scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER);
             return scanner.iterator().hasNext();
         } catch (final TableNotFoundException e) {
             return false;
+        } finally {
+            if(scanner != null) {
+                scanner.close();
+            }
         }
     }
 
@@ -157,9 +162,10 @@ public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepositor
         }
 
         // Read it from the table.
+        Scanner scanner = null;
         try {
             // Fetch the value from the table.
-            final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations());
+            scanner = connector.createScanner(detailsTableName, new Authorizations());
             scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER);
             final Entry<Key, Value> entry = scanner.iterator().next();
 
@@ -169,6 +175,10 @@ public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepositor
 
         } catch (final TableNotFoundException e) {
             throw new RyaDetailsRepositoryException("Could not get the details from the table.", e);
+        } finally {
+            if(scanner != null) {
+                scanner.close();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java
index 98c6abd..8fa3b0e 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java
@@ -20,13 +20,13 @@ package org.apache.rya.accumulo.utils;
 
 import static java.util.Objects.requireNonNull;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.apache.accumulo.core.security.ColumnVisibility;
 
 import com.google.common.base.Charsets;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 /**
  * Simplifies Accumulo visibility expressions.
  */
@@ -34,12 +34,34 @@ import com.google.common.base.Charsets;
 public class VisibilitySimplifier {
 
     /**
+     * Unions two visibility equations and then simplifies the result.
+     *
+     * @param vis1 - The first visibility equation that will be unioned. (not null)
+     * @param vis2 - The other visibility equation that will be unioned. (not null)
+     * @return A simplified form of the unioned visibility equations.
+     */
+    public static String unionAndSimplify(final String vis1, final String vis2) {
+        requireNonNull(vis1);
+        requireNonNull(vis2);
+
+        if(vis1.isEmpty()) {
+            return vis2;
+        }
+
+        if(vis2.isEmpty()) {
+            return vis1;
+        }
+
+        return simplify("(" + vis1 + ")&(" + vis2 + ")");
+    }
+
+    /**
      * Simplifies an Accumulo visibility expression.
      *
      * @param visibility - The expression to simplify. (not null)
      * @return A simplified form of {@code visibility}.
      */
-    public String simplify(final String visibility) {
+    public static String simplify(final String visibility) {
         requireNonNull(visibility);
 
         String last = visibility;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java b/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java
index a9a03ce..0adb325 100644
--- a/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java
+++ b/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java
@@ -50,4 +50,28 @@ public class VisibilitySimplifierTest {
         final String simplified = new VisibilitySimplifier().simplify("(a|b)|(a|b)|a|b");
         assertEquals("a|b", simplified);
     }
+
+    @Test
+    public void unionAndSimplify() {
+        final String simplified = new VisibilitySimplifier().unionAndSimplify("u&b", "u");
+        assertEquals("b&u", simplified);
+    }
+
+    @Test
+    public void unionAndSimplify_firstIsEmpty() {
+        final String simplified = new VisibilitySimplifier().unionAndSimplify("", "u");
+        assertEquals("u", simplified);
+    }
+
+    @Test
+    public void unionAndSimplify_secondIsEmpty() {
+        final String simplified = new VisibilitySimplifier().unionAndSimplify("u", "");
+        assertEquals("u", simplified);
+    }
+
+    @Test
+    public void unionAndSimplify_bothAreEmpty() {
+        final String simplified = new VisibilitySimplifier().unionAndSimplify("", "");
+        assertEquals("", simplified);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java b/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java
index faeebbb..cd17cbc 100644
--- a/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java
+++ b/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java
@@ -34,6 +34,7 @@ import org.apache.rya.api.layout.TablePrefixLayoutStrategy;
 import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
 import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
 import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
@@ -93,11 +94,13 @@ public class RyaTableNames {
  */
 
         if(details.getPCJIndexDetails().isEnabled()) {
-            final List<String> pcjIds = new AccumuloPcjStorage(conn, ryaInstanceName).listPcjs();
+            try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(conn, ryaInstanceName)) {
+                final List<String> pcjIds = pcjStorage.listPcjs();
 
-            final PcjTableNameFactory tableNameFactory = new PcjTableNameFactory();
-            for(final String pcjId : pcjIds) {
-                tables.add( tableNameFactory.makeTableName(ryaInstanceName, pcjId) );
+                final PcjTableNameFactory tableNameFactory = new PcjTableNameFactory();
+                for(final String pcjId : pcjIds) {
+                    tables.add( tableNameFactory.makeTableName(ryaInstanceName, pcjId) );
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
index 790fe80..76aad02 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
@@ -28,23 +28,6 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-import org.openrdf.sail.Sail;
-import org.openrdf.sail.SailConnection;
-import org.openrdf.sail.SailException;
-
-import com.google.common.base.Optional;
-
-import info.aduna.iteration.CloseableIteration;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
 import org.apache.rya.api.client.BatchUpdatePCJ;
@@ -62,8 +45,25 @@ import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
 import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
 import org.apache.rya.sail.config.RyaSailFactory;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import com.google.common.base.Optional;
+
+import info.aduna.iteration.CloseableIteration;
 
 /**
  * Uses an in memory Rya Client to batch update a PCJ index.
@@ -126,12 +126,11 @@ public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpda
         SailConnection sailConn = null;
         CloseableIteration<? extends BindingSet, QueryEvaluationException> results = null;
 
-        try {
+        try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName)) {
             // Create an instance of Sail backed by the Rya instance.
             sail = connectToRya(ryaInstanceName);
 
             // Purge the old results from the PCJ.
-            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName);
             try {
                 pcjStorage.purge(pcjId);
             } catch (final PCJStorageException e) {
@@ -139,37 +138,35 @@ public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpda
                         "results could not be purged from it.", e);
             }
 
-            try {
-                // Parse the PCJ's SPARQL query.
-                final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
-                final String sparql = metadata.getSparql();
-                final SPARQLParser parser = new SPARQLParser();
-                final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
-
-                // Execute the query.
-                sailConn = sail.getConnection();
-                results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
-
-                // Load the results into the PCJ table.
-                final List<VisibilityBindingSet> batch = new ArrayList<>(1000);
-
-                while(results.hasNext()) {
-                    final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), "");
-                    batch.add(result);
-
-                    if(batch.size() == 1000) {
-                        pcjStorage.addResults(pcjId, batch);
-                        batch.clear();
-                    }
-                }
+            // Parse the PCJ's SPARQL query.
+            final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+            final String sparql = metadata.getSparql();
+            final SPARQLParser parser = new SPARQLParser();
+            final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
 
-                if(!batch.isEmpty()) {
+            // Execute the query.
+            sailConn = sail.getConnection();
+            results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
+
+            // Load the results into the PCJ table.
+            final List<VisibilityBindingSet> batch = new ArrayList<>(1000);
+
+            while(results.hasNext()) {
+                final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), "");
+                batch.add(result);
+
+                if(batch.size() == 1000) {
                     pcjStorage.addResults(pcjId, batch);
                     batch.clear();
                 }
-            } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) {
-                throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e);
             }
+
+            if(!batch.isEmpty()) {
+                pcjStorage.addResults(pcjId, batch);
+                batch.clear();
+            }
+        } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) {
+            throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e);
         } finally {
             if(results != null) {
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
index ac8da66..3fe1042 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
@@ -90,47 +90,46 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
 
         // Create the PCJ table that will receive the index results.
         final String pcjId;
-        final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName);
-        try {
+        try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName)) {
             pcjId = pcjStorage.createPcj(sparql);
-        } catch (final PCJStorageException e) {
-            throw new RyaClientException("Problem while initializing the PCJ table.", e);
-        }
 
-        // If a Fluo application is being used, task it with updating the PCJ.
-        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
-        if(fluoDetailsHolder.isPresent()) {
-            final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
-            try {
-                updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
-            } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
-                throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+            // If a Fluo application is being used, task it with updating the PCJ.
+            final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+            if(fluoDetailsHolder.isPresent()) {
+                final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
+                try {
+                    updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
+                } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
+                    throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+                }
+
+                // Update the Rya Details to indicate the PCJ is being updated incrementally.
+                final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName);
+                try {
+                    new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
+                        @Override
+                        public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
+                            // Update the original PCJ Details to indicate they are incrementally updated.
+                            final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+                            final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
+                                    .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL );
+
+                            // Replace the old PCJ Details with the updated ones.
+                            final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
+                            builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
+                            return builder.build();
+                        }
+                    });
+                } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
+                    throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e);
+                }
             }
 
-            // Update the Rya Details to indicate the PCJ is being updated incrementally.
-            final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName);
-            try {
-                new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
-                    @Override
-                    public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
-                        // Update the original PCJ Details to indicate they are incrementally updated.
-                        final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
-                        final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
-                            .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL );
-
-                        // Replace the old PCJ Details with the updated ones.
-                        final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
-                        builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
-                        return builder.build();
-                    }
-                });
-            } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
-                throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e);
-            }
+            // Return the ID that was assigned to the PCJ.
+            return pcjId;
+        } catch (final PCJStorageException e) {
+            throw new RyaClientException("Problem while initializing the PCJ table.", e);
         }
-
-        // Return the ID that was assigned to the PCJ.
-        return pcjId;
     }
 
     private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException {
@@ -139,16 +138,15 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
 
         // Connect to the Fluo application that is updating this instance's PCJs.
         final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
-        final FluoClient fluoClient = new FluoClientFactory().connect(
+        try(final FluoClient fluoClient = new FluoClientFactory().connect(
                 cd.getUsername(),
                 new String(cd.getPassword()),
                 cd.getInstanceName(),
                 cd.getZookeepers(),
-                fluoAppName);
-
-        // Initialize the PCJ within the Fluo application.
-        final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj();
-        fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance);
+                fluoAppName);) {
+            // Initialize the PCJ within the Fluo application.
+            final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj();
+            fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance);
+        }
     }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
index b6728ec..96e6d58 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
@@ -20,19 +20,7 @@ package org.apache.rya.api.client.accumulo;
 
 import static java.util.Objects.requireNonNull;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.apache.accumulo.core.client.Connector;
-import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.rya.api.client.DeletePCJ;
 import org.apache.rya.api.client.GetInstanceDetails;
@@ -43,6 +31,17 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * An Accumulo implementation of the {@link DeletePCJ} command.
@@ -104,8 +103,7 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ {
         }
 
         // Drop the table that holds the PCJ results from Accumulo.
-        final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(getConnector(), instanceName);
-        try {
+        try(final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(getConnector(), instanceName)) {
             pcjs.dropPcj(pcjId);
         } catch (final PCJStorageException e) {
             throw new RyaClientException("Could not drop the PCJ's table from Accumulo.", e);
@@ -118,14 +116,14 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ {
 
         // Connect to the Fluo application that is updating this instance's PCJs.
         final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
-        final FluoClient fluoClient = new FluoClientFactory().connect(
+        try(final FluoClient fluoClient = new FluoClientFactory().connect(
                 cd.getUsername(),
                 new String(cd.getPassword()),
                 cd.getInstanceName(),
                 cd.getZookeepers(),
-                fluoAppName);
-
-        // Delete the PCJ from the Fluo App.
-        new DeletePcj(1000).deletePcj(fluoClient, pcjId);
+                fluoAppName)) {
+            // Delete the PCJ from the Fluo App.
+            new DeletePcj(1000).deletePcj(fluoClient, pcjId);
+        }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
index 828ee4b..1940e64 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
@@ -17,24 +17,6 @@
  * under the License.
  */
 package org.apache.rya.indexing.pcj.matching;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 
 import static java.util.Objects.requireNonNull;
 
@@ -184,29 +166,31 @@ public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTup
         }
         // this maps associates pcj table name with pcj sparql query
         final Map<String, String> indexTables = Maps.newLinkedHashMap();
-        final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix);
-        final PcjTableNameFactory pcjFactory = new PcjTableNameFactory();
 
-        final boolean tablesProvided = tables != null && !tables.isEmpty();
+        try(final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix)) {
+            final PcjTableNameFactory pcjFactory = new PcjTableNameFactory();
 
-        if (tablesProvided) {
-            // if tables provided, associate table name with sparql
-            for (final String table : tables) {
-                indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql());
-            }
-        } else if (hasRyaDetails(tablePrefix, conn)) {
-            // If this is a newer install of Rya, and it has PCJ Details, then
-            // use those.
-            final List<String> ids = storage.listPcjs();
-            for (final String id : ids) {
-                indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql());
-            }
-        } else {
-            // Otherwise figure it out by scanning tables.
-            final PcjTables pcjTables = new PcjTables();
-            for (final String table : conn.tableOperations().list()) {
-                if (table.startsWith(tablePrefix + "INDEX")) {
-                    indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql());
+            final boolean tablesProvided = tables != null && !tables.isEmpty();
+
+            if (tablesProvided) {
+                // if tables provided, associate table name with sparql
+                for (final String table : tables) {
+                    indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql());
+                }
+            } else if (hasRyaDetails(tablePrefix, conn)) {
+                // If this is a newer install of Rya, and it has PCJ Details, then
+                // use those.
+                final List<String> ids = storage.listPcjs();
+                for (final String id : ids) {
+                    indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql());
+                }
+            } else {
+                // Otherwise figure it out by scanning tables.
+                final PcjTables pcjTables = new PcjTables();
+                for (final String table : conn.tableOperations().list()) {
+                    if (table.startsWith(tablePrefix + "INDEX")) {
+                        indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql());
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
index 30eb4ca..5a2e69d 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
@@ -31,6 +31,7 @@ import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.sail.config.RyaSailFactory;
 import org.junit.Test;
@@ -63,7 +64,7 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
                 .build());
 
         Sail sail = null;
-        try {
+        try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), RYA_INSTANCE_NAME)) {
             // Get a Sail connection backed by the installed Rya instance.
             final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
             ryaConf.setTablePrefix(RYA_INSTANCE_NAME);
@@ -102,7 +103,6 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
             sailConn.close();
 
             // Create a PCJ for a SPARQL query.
-            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), RYA_INSTANCE_NAME);
             final String sparql = "SELECT ?name WHERE { ?name <urn:likes> <urn:icecream> . ?name <urn:hasEyeColor> <urn:blue> . }";
             final String pcjId = pcjStorage.createPcj(sparql);
 
@@ -137,8 +137,10 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
             expectedResults.add(bs);
 
             final Set<BindingSet> results = new HashSet<>();
-            for(final BindingSet result : pcjStorage.listResults(pcjId)) {
-                results.add( result );
+            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
             }
 
             assertEquals(expectedResults, results);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
index 6c9bf5e..f900837 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
@@ -24,6 +24,15 @@ import static org.junit.Assert.assertFalse;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.rya.api.client.CreatePCJ;
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
 import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
@@ -36,16 +45,6 @@ import org.openrdf.query.impl.MapBindingSet;
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
-import org.apache.rya.api.client.CreatePCJ;
-import org.apache.rya.api.client.Install;
-import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.InstanceDoesNotExistException;
-import org.apache.rya.api.client.RyaClientException;
-import org.apache.rya.api.instance.RyaDetails;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
-
 /**
  * Integration tests the methods of {@link AccumuloCreatePCJ}.
  */
@@ -80,42 +79,43 @@ public class AccumuloCreatePCJIT extends FluoITBase {
         assertEquals(PCJUpdateStrategy.INCREMENTAL, pcjDetails.getUpdateStrategy().get());
 
         // Verify the PCJ's metadata was initialized.
-        final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
-        assertEquals(sparql, pcjMetadata.getSparql());
-        assertEquals(0L, pcjMetadata.getCardinality());
+        try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME)) {
+            final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+            assertEquals(sparql, pcjMetadata.getSparql());
+            assertEquals(0L, pcjMetadata.getCardinality());
 
-        // Verify a Query ID was added for the query within the Fluo app.
-        final List<String> fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient);
-        assertEquals(1, fluoQueryIds.size());
+            // Verify a Query ID was added for the query within the Fluo app.
+            final List<String> fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient);
+            assertEquals(1, fluoQueryIds.size());
 
-        // Insert some statements into Rya.
-        final ValueFactory vf = ryaRepo.getValueFactory();
-        ryaConn.add(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
-        ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
-        ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
+            // Insert some statements into Rya.
+            final ValueFactory vf = ryaRepo.getValueFactory();
+            ryaConn.add(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
+            ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
+            ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
 
-        ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin"));
+            ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin"));
 
-        ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
-        ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
-        ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
-        ryaConn.add(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
+            ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
+            ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
+            ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
+            ryaConn.add(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
 
-        // Verify the correct results were exported.
-        fluo.waitForObservers();
+            // Verify the correct results were exported.
+            fluo.waitForObservers();
 
-        final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) );
+            final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) );
 
-        final MapBindingSet bob = new MapBindingSet();
-        bob.addBinding("x", vf.createURI("http://Bob"));
+            final MapBindingSet bob = new MapBindingSet();
+            bob.addBinding("x", vf.createURI("http://Bob"));
 
-        final MapBindingSet charlie = new MapBindingSet();
-        charlie.addBinding("x", vf.createURI("http://Charlie"));
+            final MapBindingSet charlie = new MapBindingSet();
+            charlie.addBinding("x", vf.createURI("http://Charlie"));
 
-        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie);
+            final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie);
 
-        assertEquals(expected, results);
+            assertEquals(expected, results);
+        }
     }
 
     @Test(expected = InstanceDoesNotExistException.class)

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java
index 573fccd..fd75167 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java
@@ -24,6 +24,10 @@ import static org.junit.Assert.assertTrue;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.rya.api.client.CreatePCJ;
+import org.apache.rya.api.client.DeletePCJ;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
 import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
@@ -36,11 +40,6 @@ import org.openrdf.repository.RepositoryException;
 
 import com.google.common.collect.Sets;
 
-import org.apache.rya.api.client.CreatePCJ;
-import org.apache.rya.api.client.DeletePCJ;
-import org.apache.rya.api.client.InstanceDoesNotExistException;
-import org.apache.rya.api.client.RyaClientException;
-
 /**
  * Integration tests the methods of {@link AccumuloCreatePCJ}.
  */
@@ -86,31 +85,32 @@ public class AccumuloDeletePCJIT extends FluoITBase {
         // Verify the correct results were exported.
         fluo.waitForObservers();
 
-        final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) );
+        try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME)) {
+            final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) );
 
-        final MapBindingSet bob = new MapBindingSet();
-        bob.addBinding("x", vf.createURI("http://Bob"));
+            final MapBindingSet bob = new MapBindingSet();
+            bob.addBinding("x", vf.createURI("http://Bob"));
 
-        final MapBindingSet charlie = new MapBindingSet();
-        charlie.addBinding("x", vf.createURI("http://Charlie"));
+            final MapBindingSet charlie = new MapBindingSet();
+            charlie.addBinding("x", vf.createURI("http://Charlie"));
 
-        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie);
-        assertEquals(expected, results);
+            final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie);
+            assertEquals(expected, results);
 
-        // Delete the PCJ.
-        final DeletePCJ deletePCJ = new AccumuloDeletePCJ(connectionDetails, accumuloConn);
-        deletePCJ.deletePCJ(RYA_INSTANCE_NAME, pcjId);
+            // Delete the PCJ.
+            final DeletePCJ deletePCJ = new AccumuloDeletePCJ(connectionDetails, accumuloConn);
+            deletePCJ.deletePCJ(RYA_INSTANCE_NAME, pcjId);
 
-        // Ensure the PCJ's metadata has been removed from the storage.
-        assertTrue( pcjStorage.listPcjs().isEmpty() );
+            // Ensure the PCJ's metadata has been removed from the storage.
+            assertTrue( pcjStorage.listPcjs().isEmpty() );
 
-        // Ensure the PCJ has been removed from the Fluo application.
-        fluo.waitForObservers();
+            // Ensure the PCJ has been removed from the Fluo application.
+            fluo.waitForObservers();
 
-        // Verify a Query ID was added for the query within the Fluo app.
-        fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient);
-        assertEquals(0, fluoQueryIds.size());
+            // Verify a Query ID was added for the query within the Fluo app.
+            fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient);
+            assertEquals(0, fluoQueryIds.size());
+        }
     }
 
     @Test(expected = InstanceDoesNotExistException.class)

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
index c36cb2c..dd5fe68 100644
--- a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
+++ b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
@@ -25,10 +25,19 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun;
 import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun.NotEnoughResultsException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.sail.config.RyaSailFactory;
 import org.apache.zookeeper.ClientCnxn;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,16 +47,6 @@ import org.openrdf.sail.Sail;
 import org.openrdf.sail.SailConnection;
 import org.openrdf.sail.SailException;
 
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
-import org.apache.rya.sail.config.RyaSailFactory;
-
 /**
  * Integration tests {@link QueryBenchmarkRun}.
  */
@@ -145,12 +144,12 @@ public class QueryBenchmarkRunIT {
 
     private static void createTestPCJ(final RyaClient ryaClient) throws Exception {
         // Create an empty PCJ within the Rya instance's PCJ storage for the test query.
-        final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(cluster.getConnector(ACCUMULO_USER, ACCUMULO_PASSWORD), RYA_INSTANCE_NAME);
-        final String pcjId = pcjs.createPcj(SPARQL_QUERY);
+        try(final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(cluster.getConnector(ACCUMULO_USER, ACCUMULO_PASSWORD), RYA_INSTANCE_NAME)) {
+            final String pcjId = pcjs.createPcj(SPARQL_QUERY);
 
-
-        // Batch update the PCJ using the Rya Client.
-        ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+            // Batch update the PCJ using the Rya Client.
+            ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+        }
     }
 
     @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
index 16653ee..38ae1b2 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
@@ -22,17 +22,17 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.openrdf.query.BindingSet;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 /**
  * Functions that create and maintain the PCJ tables that are used by Rya.
  */
 @DefaultAnnotation(NonNull.class)
-public interface PrecomputedJoinStorage {
+public interface PrecomputedJoinStorage extends AutoCloseable {
 
     /**
      * Get a list of all Precomputed Join indices that are being maintained.
@@ -75,7 +75,7 @@ public interface PrecomputedJoinStorage {
      *   results for the PCJ.
      * @throws PCJStorageException The scan couldn't be performed.
      */
-    public Iterable<BindingSet> listResults(String pcjId) throws PCJStorageException;
+    public CloseableIterator<BindingSet> listResults(String pcjId) throws PCJStorageException;
 
     /**
      * Clears all values from a Precomputed Join index. The index will remain,
@@ -94,15 +94,25 @@ public interface PrecomputedJoinStorage {
      */
     public void dropPcj(final String pcjId) throws PCJStorageException;
 
-
     /**
      * Releases and resources that are being used by the storage.
      *
      * @throws PCJStorageException Indicates the resources could not be released.
      */
+    @Override
     public void close() throws PCJStorageException;
 
     /**
+     * An {@link Iterator} that also extends {@link AutoCloseable} because it has reference to resources
+     * that need to be released once you are done iterating.
+     *
+     * @param <T> The type of object that is iterated over.
+     */
+    public static interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
+
+    }
+
+    /**
      * An operation of {@link PrecomputedJoinStorage} failed.
      */
     public static class PCJStorageException extends PcjException {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
index 4769758..999b26f 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
@@ -34,7 +34,6 @@ import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
 import org.openrdf.model.Value;
-import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.algebra.evaluation.QueryBindingSet;
 
@@ -57,7 +56,6 @@ public class AccumuloPcjSerializer implements BindingSetConverter<byte[]> {
     public byte[] convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException {
         checkNotNull(bindingSet);
         checkNotNull(varOrder);
-        checkBindingsSubsetOfVarOrder(bindingSet, varOrder);
 
         // A list that holds all of the byte segments that will be concatenated at the end.
         // This minimizes byte[] construction.
@@ -112,24 +110,6 @@ public class AccumuloPcjSerializer implements BindingSetConverter<byte[]> {
         }
     }
 
-    /**
-     * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet}
-     * are a subset of the variables names in {@link VariableOrder}.
-     *
-     * @param bindingSet - The binding set whose Bindings will be inspected. (not null)
-     * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null)
-     * @throws IllegalArgumentException Indicates the names of the bindings are
-     *   not a subset of the variable order.
-     */
-    private static void checkBindingsSubsetOfVarOrder(BindingSet bindingSet, VariableOrder varOrder) throws IllegalArgumentException {
-        checkNotNull(bindingSet);
-        checkNotNull(varOrder);
-
-        Set<String> bindingNames = bindingSet.getBindingNames();
-        List<String> varNames = varOrder.getVariableOrders();
-        checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder.");
-    }
-
     private static final byte[] concat(Iterable<byte[]> byteSegments) {
         checkNotNull(byteSegments);
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
index 6024d12..b8974e6 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
@@ -25,9 +25,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
@@ -47,6 +44,9 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.MalformedQueryException;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 /**
  * An Accumulo backed implementation of {@link PrecomputedJoinStorage}.
  */
@@ -156,7 +156,7 @@ public class AccumuloPcjStorage implements PrecomputedJoinStorage {
     }
 
     @Override
-    public Iterable<BindingSet> listResults(final String pcjId) throws PCJStorageException {
+    public CloseableIterator<BindingSet> listResults(final String pcjId) throws PCJStorageException {
         requireNonNull(pcjId);
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
index d2cf366..c920824 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
@@ -18,12 +18,12 @@
  */
 package org.apache.rya.indexing.pcj.storage.accumulo;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 /**
  * Converts {@link BindingSet}s into other representations. This library is
  * intended to convert between BindingSet and whatever format it is being
@@ -52,8 +52,7 @@ public interface BindingSetConverter<T> {
     *   resulting model. (not null)
     * @return The BindingSet formatted as the target model.
     * @throws BindingSetConversionException The BindingSet was unable to be
-    *   converted into the target model. This will happen if the BindingSet has
-    *   a binding whose name is not in the VariableOrder or if one of the values
+    *   converted into the target model. This will happen if one of the values
     *   could not be converted into the target model.
     */
    public T convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
index b2d04e1..4120fd9 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
@@ -19,29 +19,27 @@
 package org.apache.rya.indexing.pcj.storage.accumulo;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
 
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.openrdf.model.URI;
 import org.openrdf.model.Value;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.URIImpl;
 import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.base.Joiner;
 
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.resolver.RdfToRyaConversions;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Converts {@link BindingSet}s to Strings and back again. The Strings do not
@@ -58,7 +56,8 @@ public class BindingSetStringConverter implements BindingSetConverter<String> {
 
     @Override
     public String convert(final BindingSet bindingSet, final VariableOrder varOrder) {
-        checkBindingsSubsetOfVarOrder(bindingSet, varOrder);
+        requireNonNull(bindingSet);
+        requireNonNull(varOrder);
 
         // Convert each Binding to a String.
         final List<String> bindingStrings = new ArrayList<>();
@@ -79,38 +78,26 @@ public class BindingSetStringConverter implements BindingSetConverter<String> {
         return Joiner.on(BINDING_DELIM).join(bindingStrings);
     }
 
-    /**
-     * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet}
-     * are a subset of the variables names in {@link VariableOrder}.
-     *
-     * @param bindingSet - The binding set whose Bindings will be inspected. (not null)
-     * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null)
-     * @throws IllegalArgumentException Indicates the names of the bindings are
-     *   not a subset of the variable order.
-     */
-    private static void checkBindingsSubsetOfVarOrder(final BindingSet bindingSet, final VariableOrder varOrder) throws IllegalArgumentException {
-        checkNotNull(bindingSet);
-        checkNotNull(varOrder);
-
-        final Set<String> bindingNames = bindingSet.getBindingNames();
-        final List<String> varNames = varOrder.getVariableOrders();
-        checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder.");
-    }
-
     @Override
     public BindingSet convert(final String bindingSetString, final VariableOrder varOrder) {
-        checkNotNull(bindingSetString);
-        checkNotNull(varOrder);
+        requireNonNull(bindingSetString);
+        requireNonNull(varOrder);
+
+        // If both are empty, return an empty binding set.
+        if(bindingSetString.isEmpty() && varOrder.toString().isEmpty()) {
+            return new MapBindingSet();
+        }
 
+        // Otherwise parse it.
         final String[] bindingStrings = bindingSetString.split(BINDING_DELIM);
-        final String[] varOrrderArr = varOrder.toArray();
-        checkArgument(varOrrderArr.length == bindingStrings.length, "The number of Bindings must match the length of the VariableOrder.");
+        final String[] varOrderArr = varOrder.toArray();
+        checkArgument(varOrderArr.length == bindingStrings.length, "The number of Bindings must match the length of the VariableOrder.");
 
         final QueryBindingSet bindingSet = new QueryBindingSet();
         for(int i = 0; i < bindingStrings.length; i++) {
             final String bindingString = bindingStrings[i];
             if(!NULL_VALUE_STRING.equals(bindingString)) {
-                final String name = varOrrderArr[i];
+                final String name = varOrderArr[i];
                 final Value value = toValue(bindingStrings[i]);
                 bindingSet.addBinding(name, value);
             }
@@ -125,7 +112,7 @@ public class BindingSetStringConverter implements BindingSetConverter<String> {
      * @return The {@link Value} representation of the String.
      */
     protected static Value toValue(final String valueString) {
-        checkNotNull(valueString);
+        requireNonNull(valueString);
 
         // Split the String that was stored in Fluo into its Value and Type parts.
         final String[] valueAndType = valueString.split(TYPE_DELIM);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
index ce3e5d1..5d13597 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
@@ -30,9 +30,6 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -59,6 +56,7 @@ import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.openrdf.query.BindingSet;
@@ -72,6 +70,9 @@ import org.openrdf.repository.RepositoryException;
 
 import com.google.common.base.Optional;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 /**
  * Functions that create and maintain the PCJ tables that are used by Rya.
  */
@@ -157,6 +158,7 @@ public class PcjTables {
 
         final TableOperations tableOps = accumuloConn.tableOperations();
         if(!tableOps.exists(pcjTableName)) {
+            BatchWriter writer = null;
             try {
                 // Create the new table in Accumulo.
                 tableOps.create(pcjTableName);
@@ -165,14 +167,21 @@ public class PcjTables {
                 final PcjMetadata pcjMetadata = new PcjMetadata(sparql, 0L, varOrders);
                 final List<Mutation> mutations = makeWriteMetadataMutations(pcjMetadata);
 
-                final BatchWriter writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig());
+                writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig());
                 writer.addMutations(mutations);
-                writer.close();
             } catch (final TableExistsException e) {
                 log.warn("Something else just created the Rya PCJ export table named '" + pcjTableName
                         + "'. This is unexpected, but we will continue as normal.");
             } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
                 throw new PCJStorageException("Could not create a new PCJ named: " + pcjTableName, e);
+            } finally {
+                if(writer != null) {
+                    try {
+                        writer.close();
+                    } catch (final MutationsRejectedException e) {
+                        log.error("Mutations rejected while creating the PCJ table.", e);
+                    }
+                }
             }
         }
     }
@@ -231,9 +240,10 @@ public class PcjTables {
         checkNotNull(accumuloConn);
         checkNotNull(pcjTableName);
 
+        Scanner scanner = null;
         try {
             // Create an Accumulo scanner that iterates through the metadata entries.
-            final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations());
+            scanner = accumuloConn.createScanner(pcjTableName, new Authorizations());
             final Iterator<Entry<Key, Value>> entries = scanner.iterator();
 
             // No metadata has been stored in the table yet.
@@ -266,6 +276,10 @@ public class PcjTables {
 
         } catch (final TableNotFoundException e) {
             throw new PCJStorageException("Could not add results to a PCJ because the PCJ table does not exist.", e);
+        } finally {
+            if(scanner != null) {
+                scanner.close();
+            }
         }
     }
 
@@ -310,7 +324,7 @@ public class PcjTables {
      *   results for the PCJ.
      * @throws PCJStorageException The binding sets could not be fetched.
      */
-    public Iterable<BindingSet> listResults(final Connector accumuloConn, final String pcjTableName, final Authorizations auths) throws PCJStorageException {
+    public CloseableIterator<BindingSet> listResults(final Connector accumuloConn, final String pcjTableName, final Authorizations auths) throws PCJStorageException {
         requireNonNull(pcjTableName);
 
         // Fetch the Variable Orders for the binding sets and choose one of them. It
@@ -324,12 +338,7 @@ public class PcjTables {
             scanner.fetchColumnFamily( new Text(varOrder.toString()) );
 
             // Return an Iterator that uses that scanner.
-            return new Iterable<BindingSet>() {
-                @Override
-                public Iterator<BindingSet> iterator() {
-                    return new ScannerBindingSetIterator(scanner, varOrder);
-                }
-            };
+            return new ScannerBindingSetIterator(scanner, varOrder);
 
         } catch (final TableNotFoundException e) {
             throw new PCJStorageException(String.format("PCJ Table does not exist for name '%s'.", pcjTableName), e);
@@ -398,10 +407,10 @@ public class PcjTables {
         for(final VariableOrder varOrder : varOrders) {
             try {
                 // Serialize the result to the variable order.
-                final byte[] serializedResult = converter.convert(result, varOrder);
+                final byte[] rowKey = converter.convert(result, varOrder);
 
                 // Row ID = binding set values, Column Family = variable order of the binding set.
-                final Mutation addResult = new Mutation(serializedResult);
+                final Mutation addResult = new Mutation(rowKey);
                 final String visibility = result.getVisibility();
                 addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), "");
                 mutations.add(addResult);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
index d0fd7bf..26fd8c9 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
@@ -20,27 +20,30 @@ package org.apache.rya.indexing.pcj.storage.accumulo;
 
 import static java.util.Objects.requireNonNull;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.openrdf.query.BindingSet;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 /**
  * Iterates over the results of a {@link Scanner} assuming the results are
  * binding sets that can be converted using a {@link AccumuloPcjSerializer}.
  */
 @DefaultAnnotation(NonNull.class)
-public class ScannerBindingSetIterator implements Iterator<BindingSet> {
+public class ScannerBindingSetIterator implements CloseableIterator<BindingSet> {
 
     private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
 
+    private final Scanner scanner;
     private final Iterator<Entry<Key, Value>> accEntries;
     private final VariableOrder varOrder;
 
@@ -51,7 +54,7 @@ public class ScannerBindingSetIterator implements Iterator<BindingSet> {
      * @param varOrder - The variable order of the binding sets the scanner returns. (not null)
      */
     public ScannerBindingSetIterator(final Scanner scanner, final VariableOrder varOrder) {
-        requireNonNull(scanner);
+        this.scanner = requireNonNull(scanner);
         this.accEntries = scanner.iterator();
         this.varOrder = requireNonNull(varOrder);
     }
@@ -71,4 +74,9 @@ public class ScannerBindingSetIterator implements Iterator<BindingSet> {
             throw new RuntimeException("Could not deserialize a BindingSet from Accumulo.", e);
         }
     }
+
+    @Override
+    public void close() throws IOException {
+        scanner.close();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
index 6ec801e..151db50 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
@@ -23,15 +23,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import java.util.Collection;
 import java.util.Iterator;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
 import org.openrdf.query.BindingSet;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import net.jcip.annotations.Immutable;
+
 /**
  * An ordered list of {@link BindingSet} variable names. These are used to
  * specify the order {@link Binding}s within the set are serialized to Accumulo.
@@ -46,6 +46,13 @@ public final class VariableOrder implements Iterable<String> {
     private final ImmutableList<String> variableOrder;
 
     /**
+     * Constructs an instance of {@link VariableOrder} when there are no variables.
+     */
+    public VariableOrder() {
+        variableOrder = ImmutableList.of();
+    }
+
+    /**
      * Constructs an instance of {@link VariableOrder}.
      *
      * @param varOrder - An ordered array of Binding Set variables. (not null)

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
index 2baa52e..f67110e 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
@@ -20,19 +20,18 @@ package org.apache.rya.indexing.pcj.update;
 
 import java.util.Collection;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
+import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 
-import org.apache.rya.api.domain.RyaStatement;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Updates the state of all PCJ indices whenever {@link RyaStatement}s are
  * added to or removed from the system.
  */
 @DefaultAnnotation(NonNull.class)
-public interface PrecomputedJoinUpdater {
+public interface PrecomputedJoinUpdater extends AutoCloseable {
 
     /**
      * The PCJ indices will be updated to include new statements within
@@ -80,6 +79,7 @@ public interface PrecomputedJoinUpdater {
      *
      * @throws PcjUpdateException The updater could not be closed.
      */
+    @Override
     public void close() throws PcjUpdateException;
 
     /**