You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/04/24 15:06:22 UTC

[4/9] incubator-rya git commit: RYA-260 Fluo PCJ application has had Aggregation support added to it. Also fixed a bunch of resource leaks that were causing integration tests to fail. Closes #156.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 3bb54c4..3a42a23 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -24,58 +24,34 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.rya.indexing.pcj.fluo.ITBase;
-import org.junit.Test;
-
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
+import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.mini.MiniFluo;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.junit.Test;
+
+import com.google.common.base.Optional;
 
 /**
  * Tests the methods of {@link CountStatements}.
  */
-public class CountStatementsIT extends ITBase {
+public class CountStatementsIT extends RyaExportITBase {
 
     /**
      * Overriden so that no Observers will be started. This ensures whatever
      * statements are inserted as part of the test will not be consumed.
-     *
-     * @return A Mini Fluo cluster.
      */
     @Override
-    protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
+    protected void preFluoInitHook() throws Exception {
         // Setup the observers that will be used by the Fluo PCJ Application.
         final List<ObserverSpecification> observers = new ArrayList<>();
 
-        // Configure how the mini fluo will run.
-        final FluoConfiguration config = new FluoConfiguration();
-        config.setMiniStartAccumulo(false);
-        config.setAccumuloInstance(instanceName);
-        config.setAccumuloUser(ACCUMULO_USER);
-        config.setAccumuloPassword(ACCUMULO_PASSWORD);
-        config.setInstanceZookeepers(zookeepers + "/fluo");
-        config.setAccumuloZookeepers(zookeepers);
-
-        config.setApplicationName(FLUO_APP_NAME);
-        config.setAccumuloTable("fluo" + FLUO_APP_NAME);
-
-        config.addObservers(observers);
-
-        FluoFactory.newAdmin(config).initialize(
-                new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
-        final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
-        return miniFluo;
+        // Add the observers to the Fluo Configuration.
+        super.getFluoConfiguration().addObservers(observers);
     }
 
-
     @Test
     public void countStatements() {
         // Insert some Triples into the Fluo app.
@@ -86,12 +62,14 @@ public class CountStatementsIT extends ITBase {
         triples.add( RyaStatement.builder().setSubject(new RyaURI("http://David")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() );
         triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Eve")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() );
 
-        new InsertTriples().insert(fluoClient, triples, Optional.<String>absent());
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            new InsertTriples().insert(fluoClient, triples, Optional.<String>absent());
 
-        // Load some statements into the Fluo app.
-        final BigInteger count = new CountStatements().countStatements(fluoClient);
+            // Load some statements into the Fluo app.
+            final BigInteger count = new CountStatements().countStatements(fluoClient);
 
-        // Ensure the count matches the expected values.
-        assertEquals(BigInteger.valueOf(5), count);
+            // Ensure the count matches the expected values.
+            assertEquals(BigInteger.valueOf(5), count);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index 82b61bd..0aceaa3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -26,8 +26,11 @@ import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException;
 import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException;
 import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -47,7 +50,7 @@ import com.google.common.collect.Sets;
 /**
  * Integration tests the methods of {@link GetPcjMetadata}.
  */
-public class GetPcjMetadataIT extends ITBase {
+public class GetPcjMetadataIT extends RyaExportITBase {
 
     @Test
     public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, RyaDAOException {
@@ -59,56 +62,60 @@ public class GetPcjMetadataIT extends ITBase {
                 "}";
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
 
-        // Fetch the PCJ's Metadata through the GetPcjMetadata interactor.
-        final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0);
-        final PcjMetadata metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient, queryId);
+            // Fetch the PCJ's Metadata through the GetPcjMetadata interactor.
+            final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0);
+            final PcjMetadata metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient, queryId);
 
-        // Ensure the command returns the correct metadata.
-        final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
-        final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders);
-        assertEquals(expected, metadata);
+            // Ensure the command returns the correct metadata.
+            final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+            final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders);
+            assertEquals(expected, metadata);
+        }
     }
 
     @Test
     public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException {
-
-        final CreatePcj createPcj = new CreatePcj();
-
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
 
-        // Add a couple of queries to Accumulo.
-        final String q1Sparql =
-                "SELECT ?x " +
-                  "WHERE { " +
-                  "?x <http://talksTo> <http://Eve>. " +
-                  "?x <http://worksAt> <http://Chipotle>." +
-                "}";
-        final String q1PcjId = pcjStorage.createPcj(q1Sparql);
-        createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
-
-        final String q2Sparql =
-                "SELECT ?x ?y " +
-                  "WHERE { " +
-                  "?x <http://talksTo> ?y. " +
-                  "?y <http://worksAt> <http://Chipotle>." +
-                "}";
-        final String q2PcjId = pcjStorage.createPcj(q2Sparql);
-        createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
-
-        // Ensure the command returns the correct metadata.
-        final Set<PcjMetadata> expected = new HashSet<>();
-        final Set<VariableOrder> q1VarOrders = new ShiftVarOrderFactory().makeVarOrders(q1Sparql);
-        final Set<VariableOrder> q2VarOrders = new ShiftVarOrderFactory().makeVarOrders(q2Sparql);
-        expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders));
-        expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders));
-
-        final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
-        assertEquals(expected, Sets.newHashSet( metadata.values() ));
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Add a couple of queries to Accumulo.
+            final String q1Sparql =
+                    "SELECT ?x " +
+                            "WHERE { " +
+                            "?x <http://talksTo> <http://Eve>. " +
+                            "?x <http://worksAt> <http://Chipotle>." +
+                            "}";
+            final String q1PcjId = pcjStorage.createPcj(q1Sparql);
+            final CreatePcj createPcj = new CreatePcj();
+            createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+            final String q2Sparql =
+                    "SELECT ?x ?y " +
+                            "WHERE { " +
+                            "?x <http://talksTo> ?y. " +
+                            "?y <http://worksAt> <http://Chipotle>." +
+                            "}";
+            final String q2PcjId = pcjStorage.createPcj(q2Sparql);
+            createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+            // Ensure the command returns the correct metadata.
+            final Set<PcjMetadata> expected = new HashSet<>();
+            final Set<VariableOrder> q1VarOrders = new ShiftVarOrderFactory().makeVarOrders(q1Sparql);
+            final Set<VariableOrder> q2VarOrders = new ShiftVarOrderFactory().makeVarOrders(q2Sparql);
+            expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders));
+            expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders));
+
+            final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
+            assertEquals(expected, Sets.newHashSet( metadata.values() ));
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index 85c31a0..10f2319 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -26,8 +26,12 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
@@ -42,7 +46,7 @@ import com.google.common.collect.Sets;
 /**
  * Integration tests the methods of {@link GetQueryReportl}.
  */
-public class GetQueryReportIT extends ITBase {
+public class GetQueryReportIT extends RyaExportITBase {
 
     @Test
     public void getReport() throws Exception {
@@ -56,69 +60,72 @@ public class GetQueryReportIT extends ITBase {
 
         // Triples that will be streamed into Fluo after the PCJ has been created.
         final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice", "http://worksAt", "http://Taco Shop"),
-                makeRyaStatement("http://Alice", "http://worksAt", "http://Burger Join"),
-                makeRyaStatement("http://Alice", "http://worksAt", "http://Pastery Shop"),
-                makeRyaStatement("http://Alice", "http://worksAt", "http://Burrito Place"),
-                makeRyaStatement("http://Alice", "http://livesIn", "http://Lost County"),
-                makeRyaStatement("http://Alice", "http://livesIn", "http://Big City"),
-                makeRyaStatement("http://Bob", "http://worksAt", "http://Burrito Place"),
-                makeRyaStatement("http://Bob", "http://livesIn", "http://Big City"),
-                makeRyaStatement("http://Charlie", "http://worksAt", "http://Burrito Place"),
-                makeRyaStatement("http://Charlie", "http://livesIn", "http://Big City"),
-                makeRyaStatement("http://David", "http://worksAt", "http://Burrito Place"),
-                makeRyaStatement("http://David", "http://livesIn", "http://Lost County"),
-                makeRyaStatement("http://Eve", "http://worksAt", "http://Burrito Place"),
-                makeRyaStatement("http://Eve", "http://livesIn", "http://Big City"),
-                makeRyaStatement("http://Frank", "http://worksAt", "http://Burrito Place"),
-                makeRyaStatement("http://Frank", "http://livesIn", "http://Lost County"));
+                new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Taco Shop")),
+                new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Burger Join")),
+                new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Pastery Shop")),
+                new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://livesIn"), new RyaURI("http://Lost County")),
+                new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")),
+                new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")),
+                new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")),
+                new RyaStatement(new RyaURI("http://David"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://David"), new RyaURI("http://livesIn"), new RyaURI("http://Lost County")),
+                new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")),
+                new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+                new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://livesIn"), new RyaURI("http://Lost County")));
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
 
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+            // Stream the data into Fluo.
+            new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
 
-        // Wait for the results to finish processing.
-        fluo.waitForObservers();
+            // Wait for the results to finish processing.
+            super.getMiniFluo().waitForObservers();
 
-        // Fetch the report.
-        final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
-        final Set<String> queryIds = metadata.keySet();
-        assertEquals(1, queryIds.size());
-        final String queryId = queryIds.iterator().next();
+            // Fetch the report.
+            final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
+            final Set<String> queryIds = metadata.keySet();
+            assertEquals(1, queryIds.size());
+            final String queryId = queryIds.iterator().next();
 
-        final QueryReport report = new GetQueryReport().getReport(fluoClient, queryId);
+            final QueryReport report = new GetQueryReport().getReport(fluoClient, queryId);
 
-        // Build the expected counts map.
-        final Map<String, BigInteger> expectedCounts = new HashMap<>();
+            // Build the expected counts map.
+            final Map<String, BigInteger> expectedCounts = new HashMap<>();
 
-        final FluoQuery fluoQuery = report.getFluoQuery();
+            final FluoQuery fluoQuery = report.getFluoQuery();
 
-        final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId();
-        expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
+            final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId();
+            expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
 
-        final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId();
-        expectedCounts.put(filterNodeId, BigInteger.valueOf(8));
+            final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId();
+            expectedCounts.put(filterNodeId, BigInteger.valueOf(8));
 
-        final String joinNodeId = fluoQuery.getJoinMetadata().iterator().next().getNodeId();
-        expectedCounts.put(joinNodeId, BigInteger.valueOf(13));
+            final String joinNodeId = fluoQuery.getJoinMetadata().iterator().next().getNodeId();
+            expectedCounts.put(joinNodeId, BigInteger.valueOf(13));
 
-        final Iterator<StatementPatternMetadata> patterns = fluoQuery.getStatementPatternMetadata().iterator();
-        final StatementPatternMetadata sp1 = patterns.next();
-        final StatementPatternMetadata sp2 = patterns.next();
-        if(sp1.getStatementPattern().contains("http://worksAt")) {
-            expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(9));
-            expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(7));
-        } else {
-            expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(9));
-            expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(7));
-        }
+            final Iterator<StatementPatternMetadata> patterns = fluoQuery.getStatementPatternMetadata().iterator();
+            final StatementPatternMetadata sp1 = patterns.next();
+            final StatementPatternMetadata sp2 = patterns.next();
+            if(sp1.getStatementPattern().contains("http://worksAt")) {
+                expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(9));
+                expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(7));
+            } else {
+                expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(9));
+                expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(7));
+            }
 
-        assertEquals(expectedCounts, report.getCounts());
+            assertEquals(expectedCounts, report.getCounts());
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
index 19bc272..ec301ba 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
@@ -26,17 +26,18 @@ import java.util.List;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.junit.Test;
 
 import com.beust.jcommander.internal.Lists;
 
-import org.apache.fluo.api.client.Transaction;
-
 /**
  * Integration tests the methods of {@link ListQueryIds}.
  */
-public class ListQueryIdsIT extends ITBase {
+public class ListQueryIdsIT extends RyaExportITBase {
 
     /**
      * This test ensures that when there are PCJ tables in Accumulo as well as
@@ -45,18 +46,20 @@ public class ListQueryIdsIT extends ITBase {
      */
     @Test
     public void getQueryIds() throws AccumuloException, AccumuloSecurityException, TableExistsException {
-        // Store a few SPARQL/Query ID pairs in the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            tx.set("SPARQL_3", QUERY_ID, "ID_3");
-            tx.set("SPARQL_1", QUERY_ID, "ID_1");
-            tx.set("SPARQL_4", QUERY_ID, "ID_4");
-            tx.set("SPARQL_2", QUERY_ID, "ID_2");
-            tx.commit();
-        }
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Store a few SPARQL/Query ID pairs in the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                tx.set("SPARQL_3", QUERY_ID, "ID_3");
+                tx.set("SPARQL_1", QUERY_ID, "ID_1");
+                tx.set("SPARQL_4", QUERY_ID, "ID_4");
+                tx.set("SPARQL_2", QUERY_ID, "ID_2");
+                tx.commit();
+            }
 
-        // Ensure the correct list of Query IDs is retured.
-        final List<String> expected = Lists.newArrayList("ID_1", "ID_2", "ID_3", "ID_4");
-        final List<String> queryIds = new ListQueryIds().listQueryIds(fluoClient);
-        assertEquals(expected, queryIds);
+            // Ensure the correct list of Query IDs is retured.
+            final List<String> expected = Lists.newArrayList("ID_1", "ID_2", "ID_3", "ID_4");
+            final List<String> queryIds = new ListQueryIds().listQueryIds(fluoClient);
+            assertEquals(expected, queryIds);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index d5ed447..082f46d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -20,7 +20,13 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
@@ -30,13 +36,10 @@ import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 import org.openrdf.repository.RepositoryException;
 
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.Transaction;
-
 /**
  * Integration tests the methods of {@link FluoQueryMetadataDAO}.
  */
-public class FluoQueryMetadataDAOIT extends ITBase {
+public class FluoQueryMetadataDAOIT extends RyaExportITBase {
 
     @Test
     public void statementPatternMetadataTest() throws RepositoryException {
@@ -49,20 +52,22 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         builder.setParentNodeId("parentNodeId");
         final StatementPatternMetadata originalMetadata = builder.build();
 
-        // Write it to the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            dao.write(tx, originalMetadata);
-            tx.commit();
-        }
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
 
-        // Read it from the Fluo table.
-        StatementPatternMetadata storedMetadata = null;
-        try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedMetadata = dao.readStatementPatternMetadata(sx, "nodeId");
-        }
+            // Read it from the Fluo table.
+            StatementPatternMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readStatementPatternMetadata(sx, "nodeId");
+            }
 
-        // Ensure the deserialized object is the same as the serialized one.
-        assertEquals(originalMetadata, storedMetadata);
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalMetadata, storedMetadata);
+        }
     }
 
     @Test
@@ -78,20 +83,22 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         builder.setFilterIndexWithinSparql(2);
         final FilterMetadata originalMetadata = builder.build();
 
-        // Write it to the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            dao.write(tx, originalMetadata);
-            tx.commit();
-        }
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
 
-        // Read it from the Fluo table.
-        FilterMetadata storedMetadata = null;
-        try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedMetadata = dao.readFilterMetadata(sx, "nodeId");
-        }
+            // Read it from the Fluo table.
+            FilterMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readFilterMetadata(sx, "nodeId");
+            }
 
-        // Ensure the deserialized object is the same as the serialized one.
-        assertEquals(originalMetadata, storedMetadata);
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalMetadata, storedMetadata);
+        }
     }
 
     @Test
@@ -107,20 +114,22 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         builder.setRightChildNodeId("rightChildNodeId");
         final JoinMetadata originalMetadata = builder.build();
 
-        // Write it to the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            dao.write(tx, originalMetadata);
-            tx.commit();
-        }
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
 
-        // Read it from the Fluo table.
-        JoinMetadata storedMetadata = null;
-        try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedMetadata = dao.readJoinMetadata(sx, "nodeId");
-        }
+            // Read it from the Fluo table.
+            JoinMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readJoinMetadata(sx, "nodeId");
+            }
 
-        // Ensure the deserialized object is the same as the serialized one.
-        assertEquals(originalMetadata, storedMetadata);
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalMetadata, storedMetadata);
+        }
     }
 
     @Test
@@ -134,20 +143,85 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         builder.setChildNodeId("childNodeId");
         final QueryMetadata originalMetadata = builder.build();
 
-        // Write it to the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            dao.write(tx, originalMetadata);
-            tx.commit();
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
+
+            // Read it from the Fluo table.
+            QueryMetadata storedMetdata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetdata = dao.readQueryMetadata(sx, "nodeId");
+            }
+
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalMetadata, storedMetdata);
         }
+    }
+
+    @Test
+    public void aggregationMetadataTest_withGroupByVarOrders() {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId")
+                .setVariableOrder(new VariableOrder("totalCount"))
+                .setParentNodeId("parentNodeId")
+                .setChildNodeId("childNodeId")
+                .setGroupByVariableOrder(new VariableOrder("a", "b", "c"))
+                .addAggregation(new AggregationElement(AggregationType.COUNT, "count", "totalCount"))
+                .addAggregation(new AggregationElement(AggregationType.AVERAGE, "privae", "avgPrice"))
+                .build();
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
 
-        // Read it from the Fluo table.
-        QueryMetadata storedMetdata = null;
-        try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedMetdata = dao.readQueryMetadata(sx, "nodeId");
+            // Read it from the Fluo table.
+            AggregationMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readAggregationMetadata(sx, "nodeId");
+            }
+
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalMetadata, storedMetadata);
         }
+    }
+
+    @Test
+    public void aggregationMetadataTest_noGroupByVarOrders() {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId")
+                .setVariableOrder(new VariableOrder("totalCount"))
+                .setParentNodeId("parentNodeId")
+                .setChildNodeId("childNodeId")
+                .addAggregation(new AggregationElement(AggregationType.COUNT, "count", "totalCount"))
+                .addAggregation(new AggregationElement(AggregationType.AVERAGE, "privae", "avgPrice"))
+                .build();
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
+
+            // Read it from the Fluo table.
+            AggregationMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readAggregationMetadata(sx, "nodeId");
+            }
 
-        // Ensure the deserialized object is the same as the serialized one.
-        assertEquals(originalMetadata, storedMetdata);
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalMetadata, storedMetadata);
+        }
     }
 
     @Test
@@ -168,19 +242,21 @@ public class FluoQueryMetadataDAOIT extends ITBase {
         final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
         final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds());
 
-        // Write it to the Fluo table.
-        try(Transaction tx = fluoClient.newTransaction()) {
-            dao.write(tx, originalQuery);
-            tx.commit();
-        }
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalQuery);
+                tx.commit();
+            }
 
-        // Read it from the Fluo table.
-        FluoQuery storedQuery = null;
-        try(Snapshot sx = fluoClient.newSnapshot()) {
-            storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
-        }
+            // Read it from the Fluo table.
+            FluoQuery storedQuery = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
+            }
 
-        // Ensure the deserialized object is the same as the serialized one.
-        assertEquals(originalQuery, storedQuery);
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalQuery, storedQuery);
+        }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index b4c8d69..21d7db0 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -18,100 +18,152 @@
  */
 package org.apache.rya.indexing.pcj.fluo.integration;
 
+import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.client.scanner.RowScanner;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Span;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.junit.Test;
 import org.openrdf.model.Statement;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.repository.sail.SailRepositoryConnection;
 
 import com.google.common.collect.Sets;
 
-public class CreateDeleteIT extends ITBase {
+/**
+ * Tests that ensure the PCJ delete support works.
+ */
+public class CreateDeleteIT extends RyaExportITBase {
 
-    /**
-     * Ensure historic matches are included in the result.
-     */
     @Test
-    public void historicResults() throws Exception {
+    public void deletePCJ() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
-        final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. "
-                + "?x <http://worksAt> <http://Chipotle>." + "}";
+        final String sparql =
+                "SELECT ?x " + "WHERE { " +
+                    "?x <http://talksTo> <http://Eve>. " +
+                    "?x <http://worksAt> <http://Chipotle>." +
+                "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
-        final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice", "http://talksTo", "http://Eve"),
-                makeStatement("http://Bob", "http://talksTo", "http://Eve"),
-                makeStatement("http://Charlie", "http://talksTo", "http://Eve"),
-
-                makeStatement("http://Eve", "http://helps", "http://Kevin"),
-
-                makeStatement("http://Bob", "http://worksAt", "http://Chipotle"),
-                makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"),
-                makeStatement("http://Eve", "http://worksAt", "http://Chipotle"),
-                makeStatement("http://David", "http://worksAt", "http://Chipotle"));
-
-        // The expected results of the SPARQL query once the PCJ has been
-        // computed.
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Bob"))));
-        expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Charlie"))));
-
-        // Load the historic data into Rya.
-        for (final Statement triple : historicTriples) {
-            ryaConn.add(triple);
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Set<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+                vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+                vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+
+                vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")),
+
+                vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+                vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+                vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+                vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Ensure the data was loaded.
+            final List<Bytes> rows = getFluoTableEntries(fluoClient);
+            assertEquals(17, rows.size());
+
+            // Delete the PCJ from the Fluo application.
+            new DeletePcj(1).deletePcj(fluoClient, pcjId);
+
+            // Ensure all data related to the query has been removed.
+            final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
+            assertEquals(0, empty_rows.size());
         }
+    }
+
+    @Test
+    public void deleteAggregation() throws Exception {
+        // A query that finds the maximum price for an item within the inventory.
+        final String sparql =
+                "SELECT (max(?price) as ?maxPrice) { " +
+                    "?item <urn:price> ?price . " +
+                "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(2.50)),
+                vf.createStatement(vf.createURI("urn:gum"), vf.createURI("urn:price"), vf.createLiteral(0.99)),
+                vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadData(sparql, statements);
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Ensure the data was loaded.
+            final List<Bytes> rows = getFluoTableEntries(fluoClient);
+            assertEquals(10, rows.size());
+
+            // Delete the PCJ from the Fluo application.
+            new DeletePcj(1).deletePcj(fluoClient, pcjId);
+
+            // Ensure all data related to the query has been removed.
+            final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
+            assertEquals(0, empty_rows.size());
+        }
+    }
 
-        // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
-        final String pcjId = pcjStorage.createPcj(sparql);
+    private String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
+        requireNonNull(sparql);
+        requireNonNull(statements);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+        // Register the PCJ with Rya.
+        final Instance accInstance = super.getAccumuloConnector().getInstance();
+        final Connector accumuloConn = super.getAccumuloConnector();
 
-        // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(
+                ACCUMULO_USER,
+                ACCUMULO_PASSWORD.toCharArray(),
+                accInstance.getInstanceName(),
+                accInstance.getZooKeepers()), accumuloConn);
 
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+        final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
 
-        List<Bytes> rows = getFluoTableEntries(fluoClient);
-        assertEquals(17, rows.size());
+        // Write the data to Rya.
+        final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
+        ryaConn.begin();
+        ryaConn.add(statements);
+        ryaConn.commit();
+        ryaConn.close();
 
-        // Delete the PCJ from the Fluo application.
-        new DeletePcj(1).deletePcj(fluoClient, pcjId);
+        // Wait for the Fluo application to finish computing the end result.
+        super.getMiniFluo().waitForObservers();
 
-        // Ensure all data related to the query has been removed.
-        List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
-        assertEquals(0, empty_rows.size());
+        // The PCJ Id is the topic name the results will be written to.
+        return pcjId;
     }
 
-    private List<Bytes> getFluoTableEntries(FluoClient fluoClient) {
+    private List<Bytes> getFluoTableEntries(final FluoClient fluoClient) {
         try (Snapshot snapshot = fluoClient.newSnapshot()) {
-            List<Bytes> rows = new ArrayList<>();
-            RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build();
+            final List<Bytes> rows = new ArrayList<>();
+            final RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build();
 
-            for(ColumnScanner cscanner: rscanner) {
+            for(final ColumnScanner cscanner: rscanner) {
             	rows.add(cscanner.getRow());
             }
-            
+
             return rows;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index dcab997..ab97bbd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -19,32 +19,37 @@
 package org.apache.rya.indexing.pcj.fluo.integration;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.junit.Test;
 import org.openrdf.model.Statement;
-import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.sail.SailRepositoryConnection;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
 /**
  * Performs integration tests over the Fluo application geared towards various types of input.
- * <p>
- * These tests are being ignore so that they will not run as unit tests while building the application.
  */
-public class InputIT extends ITBase {
+public class InputIT extends RyaExportITBase {
 
     /**
      * Ensure historic matches are included in the result.
@@ -53,49 +58,64 @@ public class InputIT extends ITBase {
     public void historicResults() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
         final String sparql =
-              "SELECT ?x " +
-                "WHERE { " +
+              "SELECT ?x WHERE { " +
                 "?x <http://talksTo> <http://Eve>. " +
                 "?x <http://worksAt> <http://Chipotle>." +
               "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice", "http://talksTo", "http://Eve"),
-                makeStatement("http://Bob", "http://talksTo", "http://Eve"),
-                makeStatement("http://Charlie", "http://talksTo", "http://Eve"),
+                vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+                vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+                vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
 
-                makeStatement("http://Eve", "http://helps", "http://Kevin"),
+                vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")),
 
-                makeStatement("http://Bob", "http://worksAt", "http://Chipotle"),
-                makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"),
-                makeStatement("http://Eve", "http://worksAt", "http://Chipotle"),
-                makeStatement("http://David", "http://worksAt", "http://Chipotle"));
+                vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+                vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+                vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+                vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")));
 
         // The expected results of the SPARQL query once the PCJ has been computed.
         final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Bob"))));
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Charlie"))));
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Bob"));
+        expected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Charlie"));
+        expected.add(bs);
 
         // Load the historic data into Rya.
+        final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
         for(final Statement triple : historicTriples) {
             ryaConn.add(triple);
         }
+        ryaConn.close();
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+            // Verify the end results of the query match the expected results.
+            super.getMiniFluo().waitForObservers();
 
-        // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
+            final Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
 
-        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+            assertEquals(expected, results);
+        }
     }
 
     /**
@@ -105,51 +125,67 @@ public class InputIT extends ITBase {
     public void streamedResults() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
         final String sparql =
-              "SELECT ?x " +
-                "WHERE { " +
+              "SELECT ?x WHERE { " +
                 "?x <http://talksTo> <http://Eve>. " +
                 "?x <http://worksAt> <http://Chipotle>." +
               "}";
 
         // Triples that will be streamed into Fluo after the PCJ has been created.
         final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"),
-                makeRyaStatement("http://Bob", "http://talksTo", "http://Eve"),
-                makeRyaStatement("http://Charlie", "http://talksTo", "http://Eve"),
+                new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")),
+                new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")),
+                new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")),
 
-                makeRyaStatement("http://Eve", "http://helps", "http://Kevin"),
+                new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://helps"), new RyaURI("http://Kevin")),
 
-                makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"),
-                makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"),
-                makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"),
-                makeRyaStatement("http://David", "http://worksAt", "http://Chipotle"));
+                new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")),
+                new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")),
+                new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")),
+                new RyaStatement(new RyaURI("http://David"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")));
 
         // The expected results of the SPARQL query once the PCJ has been computed.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Bob"))));
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Charlie"))));
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Bob"));
+        expected.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Charlie"));
+        expected.add(bs);
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
 
-        // Ensure the query has no results yet.
-        fluo.waitForObservers();
-        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
-        assertTrue( results.isEmpty() );
+            // Ensure the query has no results yet.
+            super.getMiniFluo().waitForObservers();
 
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+                assertFalse( resultsIt.hasNext() );
+            }
 
-        // Verify the end results of the query match the expected results.
-        fluo.waitForObservers();
-        results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+            // Stream the data into Fluo.
+            new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+
+            // Verify the end results of the query match the expected results.
+            super.getMiniFluo().waitForObservers();
+
+            final HashSet<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
+
+            assertEquals(expected, results);
+        }
     }
 
     /**
@@ -162,53 +198,75 @@ public class InputIT extends ITBase {
     public void historicThenStreamedResults() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
         final String sparql =
-              "SELECT ?x " +
-                "WHERE { " +
+              "SELECT ?x WHERE { " +
                 "?x <http://talksTo> <http://Eve>. " +
                 "?x <http://worksAt> <http://Chipotle>." +
               "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice", "http://talksTo", "http://Eve"),
-                makeStatement("http://Alice", "http://worksAt", "http://Chipotle"));
+                vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+                vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")));
 
         // Triples that will be streamed into Fluo after the PCJ has been created.
         final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Frank", "http://talksTo", "http://Eve"),
-                makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle"));
+                new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")),
+                new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")));
 
         // Load the historic data into Rya.
+        final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
         for(final Statement triple : historicTriples) {
             ryaConn.add(triple);
         }
+        ryaConn.close();
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
 
-        // Ensure Alice is a match.
-        fluo.waitForObservers();
-        final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Alice"))));
+            // Ensure Alice is a match.
+            super.getMiniFluo().waitForObservers();
 
-        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+            final Set<BindingSet> expected = new HashSet<>();
 
-        // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+            MapBindingSet bs = new MapBindingSet();
+            bs.addBinding("x", vf.createURI("http://Alice"));
+            expected.add(bs);
 
-        // Verify the end results of the query also include Frank.
-        fluo.waitForObservers();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Frank"))));
+            Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add(resultsIt.next());
+                }
+            }
 
-        results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+            assertEquals(expected, results);
+
+            // Stream the data into Fluo.
+            new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+
+            // Verify the end results of the query also include Frank.
+            super.getMiniFluo().waitForObservers();
+
+            bs = new MapBindingSet();
+            bs.addBinding("x", vf.createURI("http://Frank"));
+            expected.add(bs);
+
+            results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add(resultsIt.next());
+                }
+            }
+
+            assertEquals(expected, results);
+        }
     }
 
     /**
@@ -222,50 +280,69 @@ public class InputIT extends ITBase {
     public void historicAndStreamConflict() throws Exception {
         // A query that finds people who talk to Eve and work at Chipotle.
         final String sparql =
-              "SELECT ?x " +
-                "WHERE { " +
+              "SELECT ?x WHERE { " +
                 "?x <http://talksTo> <http://Eve>. " +
                 "?x <http://worksAt> <http://Chipotle>." +
               "}";
 
         // Triples that are loaded into Rya before the PCJ is created.
+        final ValueFactory vf = new ValueFactoryImpl();
         final Set<Statement> historicTriples = Sets.newHashSet(
-                makeStatement("http://Alice", "http://talksTo", "http://Eve"),
-                makeStatement("http://Alice", "http://worksAt", "http://Chipotle"));
+                vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+                vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")));
 
         // Triples that will be streamed into Fluo after the PCJ has been created.
         final Set<RyaStatement> streamedTriples = Sets.newHashSet(
-                makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"),
-                makeRyaStatement("http://Alice", "http://worksAt", "http://Chipotle"));
+                new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")),
+                new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")));
 
         // The expected final result.
         final Set<BindingSet> expected = new HashSet<>();
-        expected.add(makeBindingSet(
-                new BindingImpl("x", new URIImpl("http://Alice"))));
+
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("x", vf.createURI("http://Alice"));
+        expected.add(bs);
 
         // Load the historic data into Rya.
+        final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
         for(final Statement triple : historicTriples) {
             ryaConn.add(triple);
         }
+        ryaConn.close();
 
         // Create the PCJ table.
+        final Connector accumuloConn = super.getAccumuloConnector();
         final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
-
-        // Ensure Alice is a match.
-        fluo.waitForObservers();
-        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
-
-        // Stream the same Alice triple into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
-
-        // Verify the end results of the query is stiill only Alice.
-        fluo.waitForObservers();
-        results = getQueryBindingSetValues(fluoClient, sparql);
-        assertEquals(expected, results);
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+            // Ensure Alice is a match.
+            super.getMiniFluo().waitForObservers();
+
+            Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
+            assertEquals(expected, results);
+
+            // Stream the same Alice triple into Fluo.
+            new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+
+            // Verify the end results of the query is stiill only Alice.
+            super.getMiniFluo().waitForObservers();
+
+            results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
+            assertEquals(expected, results);
+        }
     }
 }
\ No newline at end of file