You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:01:57 UTC

[4/9] incubator-rya git commit: RYA-280-Periodic Query Service. Closes #177.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index 05dfd32..f330825 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -29,12 +29,12 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.model.ValueFactory;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index 219e079..ab7610d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -32,9 +32,9 @@ import java.util.UUID;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
 import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.model.ValueFactory;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
index c8167c7..7a4ed8d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
@@ -46,7 +46,6 @@ import org.apache.rya.api.domain.RyaSubGraph;
 import org.apache.rya.api.domain.RyaURI;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils;
-import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
@@ -57,6 +56,7 @@ import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
 import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.model.ValueFactory;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index f815a55..85c5030 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -22,20 +22,27 @@ import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertEquals;
 
 import java.math.BigDecimal;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 
 import javax.xml.datatype.DatatypeFactory;
 
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 import org.openrdf.model.Literal;
 import org.openrdf.model.Statement;
@@ -51,6 +58,7 @@ import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
 import org.openrdf.query.algebra.evaluation.function.Function;
 import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
 import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.RepositoryException;
 import org.openrdf.repository.sail.SailRepositoryConnection;
 
 import com.google.common.collect.Sets;
@@ -60,6 +68,8 @@ import com.google.common.collect.Sets;
  */
 public class QueryIT extends RyaExportITBase {
 
+    private enum ExporterType {Pcj, Periodic};
+    
     @Test
     public void optionalStatements() throws Exception {
         // A query that has optional statement patterns. This query is looking for all
@@ -100,7 +110,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults);
+        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
     }
 
     /**
@@ -181,7 +191,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults);
+        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
     }
 
     @Test
@@ -241,7 +251,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults);
+        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
     }
 
     @Test
@@ -283,7 +293,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults);
+        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
     }
 
     @Test
@@ -368,7 +378,7 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults);
+        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
     }
 
     @Test
@@ -430,10 +440,295 @@ public class QueryIT extends RyaExportITBase {
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
-        runTest(sparql, statements, expectedResults);
+        runTest(sparql, statements, expectedResults, ExporterType.Pcj);
     }
+    
+    
+    @Test
+    public void periodicQueryTestWithoutAggregation() throws Exception {
+        String query = "prefix function: <http://org.apache.rya/function#> " //n
+                + "prefix time: <http://www.w3.org/2006/time#> " //n
+                + "select ?id where {" //n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n
+                + "?obs <uri:hasTime> ?time. " //n
+                + "?obs <uri:hasId> ?id }"; //n
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        ZonedDateTime time = ZonedDateTime.now();
+        long currentTime = time.toInstant().toEpochMilli();
+        
+        ZonedDateTime zTime1 = time.minusMinutes(30);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+        
+        ZonedDateTime zTime2 = zTime1.minusMinutes(30);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+        
+        ZonedDateTime zTime3 = zTime2.minusMinutes(30);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+        
+        ZonedDateTime zTime4 = zTime3.minusMinutes(30);
+        String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
+        
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4"))
+                );
+
+        // Create the expected results of the SPARQL query once the PCJ has been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        long period = 1800000;
+        long binId = (currentTime/period)*period;
+        
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period));
+        expectedResults.add(bs);
 
-    public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults) throws Exception {
+        bs = new MapBindingSet();
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+
+        // Verify the end results of the query match the expected results.
+        runTest(query, statements, expectedResults, ExporterType.Periodic);
+    }
+    
+    
+    @Test
+    public void periodicQueryTestWithAggregation() throws Exception {
+        String query = "prefix function: <http://org.apache.rya/function#> " //n
+                + "prefix time: <http://www.w3.org/2006/time#> " //n
+                + "select (count(?obs) as ?total) where {" //n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n
+                + "?obs <uri:hasTime> ?time. " //n
+                + "?obs <uri:hasId> ?id }"; //n
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        ZonedDateTime time = ZonedDateTime.now();
+        long currentTime = time.toInstant().toEpochMilli();
+        
+        ZonedDateTime zTime1 = time.minusMinutes(30);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+        
+        ZonedDateTime zTime2 = zTime1.minusMinutes(30);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+        
+        ZonedDateTime zTime3 = zTime2.minusMinutes(30);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+        
+        ZonedDateTime zTime4 = zTime3.minusMinutes(30);
+        String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
+        
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4"))
+                );
+
+        // Create the expected results of the SPARQL query once the PCJ has been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        long period = 1800000;
+        long binId = (currentTime/period)*period;
+        
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("4", XMLSchema.INTEGER));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("3", XMLSchema.INTEGER));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period));
+        expectedResults.add(bs);
+
+
+        // Verify the end results of the query match the expected results.
+        runTest(query, statements, expectedResults, ExporterType.Periodic);
+    }
+    
+    @Test
+    public void periodicQueryTestWithAggregationAndGroupBy() throws Exception {
+        String query = "prefix function: <http://org.apache.rya/function#> " //n
+                + "prefix time: <http://www.w3.org/2006/time#> " //n
+                + "select ?id (count(?obs) as ?total) where {" //n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n
+                + "?obs <uri:hasTime> ?time. " //n
+                + "?obs <uri:hasId> ?id } group by ?id"; //n
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        ZonedDateTime time = ZonedDateTime.now();
+        long currentTime = time.toInstant().toEpochMilli();
+        
+        ZonedDateTime zTime1 = time.minusMinutes(30);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+        
+        ZonedDateTime zTime2 = zTime1.minusMinutes(30);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+        
+        ZonedDateTime zTime3 = zTime2.minusMinutes(30);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+        
+        ZonedDateTime zTime4 = zTime3.minusMinutes(30);
+        String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
+        
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2"))
+                );
+
+        // Create the expected results of the SPARQL query once the PCJ has been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        long period = 1800000;
+        long binId = (currentTime/period)*period;
+        
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        expectedResults.add(bs);
+        
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period));
+        expectedResults.add(bs);
+
+        // Verify the end results of the query match the expected results.
+        runTest(query, statements, expectedResults, ExporterType.Periodic);
+    }
+    
+    
+    
+    
+
+    public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults, ExporterType type ) throws Exception {
         requireNonNull(sparql);
         requireNonNull(statements);
         requireNonNull(expectedResults);
@@ -443,9 +738,38 @@ public class QueryIT extends RyaExportITBase {
 
         final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn);
 
-        ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
-
-        // Write the data to Rya.
+        switch (type) {
+        case Pcj:
+            ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
+            addStatementsAndWait(statements);
+            // Fetch the value that is stored within the PCJ table.
+            try (final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName())) {
+                final String pcjId = pcjStorage.listPcjs().get(0);
+                final Set<BindingSet> results = Sets.newHashSet(pcjStorage.listResults(pcjId));
+                // Ensure the result of the query matches the expected result.
+                assertEquals(expectedResults, results);
+            }
+            break;
+        case Periodic:
+            PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName());
+            String periodicId = periodicStorage.createPeriodicQuery(sparql);
+            try (FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) {
+                new CreatePcj().createPcj(periodicId, sparql, fluo);
+            }
+            addStatementsAndWait(statements);
+            final Set<BindingSet> results = Sets.newHashSet();
+            try (CloseableIterator<BindingSet> resultIter = periodicStorage.listResults(periodicId, Optional.empty())) {
+                while (resultIter.hasNext()) {
+                    results.add(resultIter.next());
+                }
+            }
+            assertEquals(expectedResults, results);
+            break;
+        }
+    }
+    
+    private void addStatementsAndWait(final Collection<Statement> statements) throws RepositoryException, Exception {
+     // Write the data to Rya.
         final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
         ryaConn.begin();
         ryaConn.add(statements);
@@ -454,14 +778,5 @@ public class QueryIT extends RyaExportITBase {
 
         // Wait for the Fluo application to finish computing the end result.
         super.getMiniFluo().waitForObservers();
-
-        // Fetch the value that is stored within the PCJ table.
-        try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName())) {
-            final String pcjId = pcjStorage.listPcjs().get(0);
-            final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) );
-
-            // Ensure the result of the query matches the expected result.
-            assertEquals(expectedResults, results);
-        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index 9c21afd..12c69ca 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -28,11 +28,11 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.ValueFactoryImpl;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index a8d470f..e6d287e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -28,12 +28,12 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.model.ValueFactory;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
index 72759bb..3f51311 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
@@ -28,11 +28,11 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 import org.openrdf.model.Resource;
 import org.openrdf.model.Statement;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
index 150492f..ab42e89 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
@@ -32,10 +32,10 @@ import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Assert;
 import org.junit.Test;
 import org.openrdf.model.Statement;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index 46bc7b0..dc2f859 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -44,19 +44,18 @@ import org.apache.hadoop.io.Text;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
 import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.apache.rya.rdftriplestore.RyaSailRepository;
 import org.apache.rya.sail.config.RyaSailFactory;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.test.base/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.test.base/pom.xml
new file mode 100644
index 0000000..67bd0f0
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/pom.xml
@@ -0,0 +1,108 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+  <modelVersion>4.0.0</modelVersion>
+  
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.pcj.fluo.parent</artifactId>
+        <version>3.2.11-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.pcj.fluo.test.base</artifactId>
+
+    <name>Apache Rya Integration Base</name>
+    <description>Base classes for Integration tests.</description>
+
+    <dependencies>
+        <!-- Rya Runtime Dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+        </dependency>
+         <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-api</artifactId>
+        </dependency>
+
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-mini</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>compile</scope>
+        </dependency>
+         <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-api</artifactId>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+          <version>0.10.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.1.0</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.1.0</version>
+            <classifier>test</classifier>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+             <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-recipes-test</artifactId>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
new file mode 100644
index 0000000..b9be828
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.base;
+
+import java.nio.file.Files;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.junit.After;
+import org.junit.Before;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+public class KafkaITBase {
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private KafkaServer kafkaServer;
+    private EmbeddedZookeeper zkServer;
+    private ZkClient zkClient;
+    
+    @Before
+    public void setupKafka() throws Exception {
+
+        // Setup Kafka.
+        zkServer = new EmbeddedZookeeper();
+        final String zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+        ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+        final KafkaConfig config = new KafkaConfig(brokerProps);
+        final Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+    }
+    
+    /**
+     * Close all the Kafka mini server and mini-zookeeper
+     *
+     * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources()
+     */
+    @After
+    public void teardownKafka() {
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
new file mode 100644
index 0000000..32ee962
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.pcj.fluo.test.base;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.net.UnknownHostException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
+import org.apache.rya.accumulo.MiniAccumuloSingleton;
+import org.apache.rya.accumulo.RyaTestInstanceRule;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloInstall;
+import org.apache.zookeeper.ClientCnxn;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailException;
+
+import org.apache.fluo.api.client.FluoAdmin;
+import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+
+/**
+ * Integration tests that ensure the Fluo application processes PCJs results
+ * correctly.
+ * <p>
+ * This class is being ignored because it doesn't contain any unit tests.
+ */
+public abstract class FluoITBase {
+    private static final Logger log = Logger.getLogger(FluoITBase.class);
+
+    // Mini Accumulo Cluster
+    private static MiniAccumuloClusterInstance clusterInstance = MiniAccumuloSingleton.getInstance();
+    private static MiniAccumuloCluster cluster;
+
+    private static String instanceName = null;
+    private static String zookeepers = null;
+
+    protected static Connector accumuloConn = null;
+
+    // Fluo data store and connections.
+    protected MiniFluo fluo = null;
+    protected FluoConfiguration fluoConfig = null;
+    protected FluoClient fluoClient = null;
+
+    // Rya data store and connections.
+    protected RyaSailRepository ryaRepo = null;
+    protected RepositoryConnection ryaConn = null;
+
+    @Rule
+    public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false);
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
+
+        // Setup and start the Mini Accumulo.
+        cluster = clusterInstance.getCluster();
+
+        // Store a connector to the Mini Accumulo.
+        instanceName = cluster.getInstanceName();
+        zookeepers = cluster.getZooKeepers();
+
+        final Instance instance = new ZooKeeperInstance(instanceName, zookeepers);
+        accumuloConn = instance.getConnector(clusterInstance.getUsername(), new PasswordToken(clusterInstance.getPassword()));
+    }
+
+    @Before
+    public void setupMiniResources() throws Exception {
+        // Initialize the Mini Fluo that will be used to store created queries.
+        fluoConfig = createFluoConfig();
+        preFluoInitHook();
+        FluoFactory.newAdmin(fluoConfig).initialize(new FluoAdmin.InitializationOptions()
+                .setClearTable(true)
+                .setClearZookeeper(true));
+        postFluoInitHook();
+        fluo = FluoFactory.newMiniFluo(fluoConfig);
+        fluoClient = FluoFactory.newClient(fluo.getClientConfiguration());
+
+        // Initialize the Rya that will be used by the tests.
+        ryaRepo = setupRya();
+        ryaConn = ryaRepo.getConnection();
+    }
+
+    @After
+    public void shutdownMiniResources() {
+        if (ryaConn != null) {
+            try {
+                log.info("Shutting down Rya Connection.");
+                ryaConn.close();
+                log.info("Rya Connection shut down.");
+            } catch (final Exception e) {
+                log.error("Could not shut down the Rya Connection.", e);
+            }
+        }
+
+        if (ryaRepo != null) {
+            try {
+                log.info("Shutting down Rya Repo.");
+                ryaRepo.shutDown();
+                log.info("Rya Repo shut down.");
+            } catch (final Exception e) {
+                log.error("Could not shut down the Rya Repo.", e);
+            }
+        }
+
+        if (fluoClient != null) {
+            try {
+                log.info("Shutting down Fluo Client.");
+                fluoClient.close();
+                log.info("Fluo Client shut down.");
+            } catch (final Exception e) {
+                log.error("Could not shut down the Fluo Client.", e);
+            }
+        }
+
+        if (fluo != null) {
+            try {
+                log.info("Shutting down Mini Fluo.");
+                fluo.close();
+                log.info("Mini Fluo shut down.");
+            } catch (final Exception e) {
+                log.error("Could not shut down the Mini Fluo.", e);
+            }
+        }
+    }
+
+    protected void preFluoInitHook() throws Exception {
+
+    }
+
+    protected void postFluoInitHook() throws Exception {
+
+    }
+
+    protected MiniAccumuloCluster getMiniAccumuloCluster() {
+        return cluster;
+    }
+
+    protected MiniFluo getMiniFluo() {
+        return fluo;
+    }
+
+    public RyaSailRepository getRyaSailRepository() {
+        return ryaRepo;
+    }
+
+    public Connector getAccumuloConnector() {
+        return accumuloConn;
+    }
+
+    public String getRyaInstanceName() {
+        return testInstance.getRyaInstanceName();
+    }
+
+    protected String getUsername() {
+        return clusterInstance.getUsername();
+    }
+
+    protected String getPassword() {
+        return clusterInstance.getPassword();
+    }
+
+    protected FluoConfiguration getFluoConfiguration() {
+        return fluoConfig;
+    }
+
+    public AccumuloConnectionDetails createConnectionDetails() {
+        return new AccumuloConnectionDetails(
+                clusterInstance.getUsername(),
+                clusterInstance.getPassword().toCharArray(),
+                clusterInstance.getInstanceName(),
+                clusterInstance.getZookeepers());
+    }
+
+    private FluoConfiguration createFluoConfig() {
+        // Configure how the mini fluo will run.
+        final FluoConfiguration config = new FluoConfiguration();
+        config.setMiniStartAccumulo(false);
+        config.setAccumuloInstance(instanceName);
+        config.setAccumuloUser(clusterInstance.getUsername());
+        config.setAccumuloPassword(clusterInstance.getPassword());
+        config.setInstanceZookeepers(zookeepers + "/fluo");
+        config.setAccumuloZookeepers(zookeepers);
+
+        config.setApplicationName(getRyaInstanceName());
+        config.setAccumuloTable("fluo" + getRyaInstanceName());
+        return config;
+    }
+
+    /**
+     * Sets up a Rya instance.
+     */
+    protected RyaSailRepository setupRya()
+            throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException,
+            NumberFormatException, UnknownHostException, InferenceEngineException, AlreadyInitializedException,
+            RyaDetailsRepositoryException, DuplicateInstanceNameException, RyaClientException, SailException {
+        checkNotNull(instanceName);
+        checkNotNull(zookeepers);
+
+        // Setup Rya configuration values.
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(getRyaInstanceName());
+        conf.setDisplayQueryPlan(true);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false);
+        conf.set(ConfigUtils.CLOUDBASE_USER, clusterInstance.getUsername());
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, clusterInstance.getPassword());
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, clusterInstance.getInstanceName());
+        conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, clusterInstance.getZookeepers());
+        conf.set(ConfigUtils.USE_PCJ, "true");
+        conf.set(ConfigUtils.FLUO_APP_NAME, getRyaInstanceName());
+        conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+        conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+        conf.set(ConfigUtils.CLOUDBASE_AUTHS, "");
+
+        // Install the test instance of Rya.
+        final Install install = new AccumuloInstall(createConnectionDetails(), accumuloConn);
+
+        final InstallConfiguration installConfig = InstallConfiguration.builder()
+                .setEnableTableHashPrefix(true)
+                .setEnableEntityCentricIndex(true)
+                .setEnableFreeTextIndex(true)
+                .setEnableTemporalIndex(true)
+                .setEnablePcjIndex(true)
+                .setEnableGeoIndex(true)
+                .setFluoPcjAppName(getRyaInstanceName())
+                .build();
+        install.install(getRyaInstanceName(), installConfig);
+
+        // Connect to the instance of Rya that was just installed.
+        final Sail sail = RyaSailFactory.getInstance(conf);
+        final RyaSailRepository ryaRepo = new RyaSailRepository(sail);
+
+        return ryaRepo;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
new file mode 100644
index 0000000..85da422
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.pcj.fluo.test.base;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+/**
+ * The base Integration Test class used for Fluo applications that export to a
+ * Kakfa topic.
+ */
+public class KafkaExportITBase extends AccumuloExportITBase {
+
+    protected static final String RYA_INSTANCE_NAME = "test_";
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private ZkUtils zkUtils;
+    private KafkaServer kafkaServer;
+    private EmbeddedZookeeper zkServer;
+    private ZkClient zkClient;
+
+    // The Rya instance statements are written to that will be fed into the Fluo
+    // app.
+    private RyaSailRepository ryaSailRepo = null;
+    private AccumuloRyaDAO dao = null;
+
+    /**
+     * Add info about the Kafka queue/topic to receive the export.
+     */
+    @Override
+    protected void preFluoInitHook() throws Exception {
+        // Setup the observers that will be used by the Fluo PCJ Application.
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new ObserverSpecification(FilterObserver.class.getName()));
+        observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
+
+        // Configure the export observer to export new PCJ results to the mini
+        // accumulo cluster.
+        final HashMap<String, String> exportParams = new HashMap<>();
+
+        final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
+        kafkaParams.setExportToKafka(true);
+
+        // Configure the Kafka Producer
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+        kafkaParams.addAllProducerConfig(producerConfig);
+
+        final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
+        observers.add(exportObserverConfig);
+        
+        //create construct query observer and tell it not to export to Kafka
+        //it will only add results back into Fluo
+        HashMap<String, String> constructParams = new HashMap<>();
+        final KafkaExportParameters kafkaConstructParams = new KafkaExportParameters(constructParams);
+        kafkaConstructParams.setExportToKafka(true);
+        
+        // Configure the Kafka Producer
+        final Properties constructProducerConfig = new Properties();
+        constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+        constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName());
+        kafkaConstructParams.addAllProducerConfig(constructProducerConfig);
+
+        final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
+                constructParams);
+        observers.add(constructExportObserverConfig);
+
+        // Add the observers to the Fluo Configuration.
+        super.getFluoConfiguration().addObservers(observers);
+    }
+
+    /**
+     * setup mini kafka and call the super to setup mini fluo
+     */
+    @Before
+    public void setupKafka() throws Exception {
+        // Install an instance of Rya on the Accumulo cluster.
+        installRyaInstance();
+
+        // Setup Kafka.
+        zkServer = new EmbeddedZookeeper();
+        final String zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+        zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+        final KafkaConfig config = new KafkaConfig(brokerProps);
+        final Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+    }
+
+    @After
+    public void teardownRya() {
+        final MiniAccumuloCluster cluster = getMiniAccumuloCluster();
+        final String instanceName = cluster.getInstanceName();
+        final String zookeepers = cluster.getZooKeepers();
+
+        // Uninstall the instance of Rya.
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+                new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
+                super.getAccumuloConnector());
+
+        try {
+            ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
+            // Shutdown the repo.
+            if(ryaSailRepo != null) {ryaSailRepo.shutDown();}
+            if(dao != null ) {dao.destroy();}
+        } catch (Exception e) {
+            System.out.println("Encountered the following Exception when shutting down Rya: " + e.getMessage());
+        }
+    }
+
+    private void installRyaInstance() throws Exception {
+        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+        final String instanceName = cluster.getInstanceName();
+        final String zookeepers = cluster.getZooKeepers();
+
+        // Install the Rya instance to the mini accumulo cluster.
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+                new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
+                super.getAccumuloConnector());
+
+        ryaClient.getInstall().install(RYA_INSTANCE_NAME,
+                InstallConfiguration.builder().setEnableTableHashPrefix(false).setEnableFreeTextIndex(false)
+                        .setEnableEntityCentricIndex(false).setEnableGeoIndex(false).setEnableTemporalIndex(false).setEnablePcjIndex(true)
+                        .setFluoPcjAppName(super.getFluoConfiguration().getApplicationName()).build());
+
+        // Connect to the Rya instance that was just installed.
+        final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
+        final Sail sail = RyaSailFactory.getInstance(conf);
+        dao = RyaSailFactory.getAccumuloDAOWithUpdatedConfig(conf);
+        ryaSailRepo = new RyaSailRepository(sail);
+    }
+
+    protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) {
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(RYA_INSTANCE_NAME);
+
+        // Accumulo connection information.
+        conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER);
+        conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD);
+        conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
+        conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
+        conf.setAuths("");
+
+        // PCJ configuration information.
+        conf.set(ConfigUtils.USE_PCJ, "true");
+        conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
+        conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName());
+        conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+        conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+
+        conf.setDisplayQueryPlan(true);
+
+        return conf;
+    }
+
+    /**
+     * @return A {@link RyaSailRepository} that is connected to the Rya instance
+     *         that statements are loaded into.
+     */
+    protected RyaSailRepository getRyaSailRepository() throws Exception {
+        return ryaSailRepo;
+    }
+
+    /**
+     * @return A {@link AccumuloRyaDAO} so that RyaStatements with distinct
+     *         visibilities can be added to the Rya Instance
+     */
+    protected AccumuloRyaDAO getRyaDAO() {
+        return dao;
+    }
+
+    /**
+     * Close all the Kafka mini server and mini-zookeeper
+     */
+    @After
+    public void teardownKafka() {
+        if(kafkaServer != null) {kafkaServer.shutdown();}
+        if(zkClient != null) {zkClient.close();}
+        if(zkServer != null) {zkServer.shutdown();}
+    }
+
+    /**
+     * Test kafka without rya code to make sure kafka works in this environment.
+     * If this test fails then its a testing environment issue, not with Rya.
+     * Source: https://github.com/asmaier/mini-kafka
+     */
+    @Test
+    public void embeddedKafkaTest() throws Exception {
+        // create topic
+        final String topic = "testTopic";
+        AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+
+        // setup producer
+        final Properties producerProps = new Properties();
+        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+        producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
+        producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps);
+
+        // setup consumer
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+        consumerProps.setProperty("group.id", "group0");
+        consumerProps.setProperty("client.id", "consumer0");
+        consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+        consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+        // to make sure the consumer starts from the beginning of the topic
+        consumerProps.put("auto.offset.reset", "earliest");
+
+        final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Arrays.asList(topic));
+
+        // send message
+        final ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, 42, "test-message".getBytes(StandardCharsets.UTF_8));
+        producer.send(data);
+        producer.close();
+
+        // starting consumer
+        final ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
+        assertEquals(1, records.count());
+        final Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
+        final ConsumerRecord<Integer, byte[]> record = recordIterator.next();
+        assertEquals(42, (int) record.key());
+        assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8));
+        consumer.close();
+    }
+
+    protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) {
+        // setup consumer
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.IntegerDeserializer");
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+
+        // to make sure the consumer starts from the beginning of the topic
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Arrays.asList(TopicName));
+        return consumer;
+    }
+
+    protected String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
+        requireNonNull(sparql);
+        requireNonNull(statements);
+
+        // Register the PCJ with Rya.
+        final Instance accInstance = super.getAccumuloConnector().getInstance();
+        final Connector accumuloConn = super.getAccumuloConnector();
+
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER,
+                ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn);
+
+        final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
+
+        // Write the data to Rya.
+        final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
+        ryaConn.begin();
+        ryaConn.add(statements);
+        ryaConn.commit();
+        ryaConn.close();
+
+        // Wait for the Fluo application to finish computing the end result.
+        super.getMiniFluo().waitForObservers();
+
+        // The PCJ Id is the topic name the results will be written to.
+        return pcjId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
new file mode 100644
index 0000000..6feadff
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.pcj.fluo.test.base;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.PeriodicQueryObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.junit.BeforeClass;
+
+/**
+ * The base Integration Test class used for Fluo applications that export to a Rya PCJ Index.
+ */
+public class RyaExportITBase extends FluoITBase {
+
+    @BeforeClass
+    public static void setupLogging() {
+        BasicConfigurator.configure();
+        Logger.getRootLogger().setLevel(Level.ERROR);
+    }
+
+    @Override
+    protected void preFluoInitHook() throws Exception {
+        // Setup the observers that will be used by the Fluo PCJ Application.
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new ObserverSpecification(BatchObserver.class.getName()));
+        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new ObserverSpecification(FilterObserver.class.getName()));
+        observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
+        observers.add(new ObserverSpecification(PeriodicQueryObserver.class.getName()));
+
+        // Configure the export observer to export new PCJ results to the mini accumulo cluster.
+        final HashMap<String, String> exportParams = new HashMap<>();
+        final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
+        ryaParams.setExportToRya(true);
+        ryaParams.setRyaInstanceName(getRyaInstanceName());
+        ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName());
+        ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers());
+        ryaParams.setExporterUsername(getUsername());
+        ryaParams.setExporterPassword(getPassword());
+
+        final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
+        observers.add(exportObserverConfig);
+
+        // Add the observers to the Fluo Configuration.
+        super.getFluoConfiguration().addObservers(observers);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pom.xml b/extras/rya.pcj.fluo/pom.xml
index 54a22fc..6979768 100644
--- a/extras/rya.pcj.fluo/pom.xml
+++ b/extras/rya.pcj.fluo/pom.xml
@@ -38,6 +38,7 @@
         <module>pcj.fluo.app</module>
         <module>pcj.fluo.client</module>
         <module>pcj.fluo.integration</module>
+        <module>pcj.fluo.test.base</module>
         <module>pcj.fluo.demo</module>
     </modules>
     <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
new file mode 100644
index 0000000..bcd60aa
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
@@ -0,0 +1,77 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+		contributor license agreements. See the NOTICE file distributed with this 
+		work for additional information regarding copyright ownership. The ASF licenses 
+		this file to you under the Apache License, Version 2.0 (the "License"); you 
+		may not use this file except in compliance with the License. You may obtain 
+		a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+		required by applicable law or agreed to in writing, software distributed 
+		under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+		OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+		the specific language governing permissions and limitations under the License. -->
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.rya</groupId>
+		<artifactId>rya.periodic.service</artifactId>
+		<version>3.2.11-incubating-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>rya.periodic.service.integration.tests</artifactId>
+	
+	<name>Apache Rya Periodic Service Integration Tests</name>
+    <description>Integration Tests for Rya Periodic Service</description>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.pcj.fluo.test.base</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>log4j-1.2-api</artifactId>
+					<groupId>org.apache.logging.log4j</groupId>
+				</exclusion>
+				<exclusion>
+					<artifactId>log4j-api</artifactId>
+					<groupId>org.apache.logging.log4j</groupId>
+				</exclusion>
+				<exclusion>
+					<artifactId>log4j-core</artifactId>
+					<groupId>org.apache.logging.log4j</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>rya.periodic.service.notification</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>logback-classic</artifactId>
+					<groupId>ch.qos.logback</groupId>
+				</exclusion>
+				<exclusion>
+					<artifactId>logback-core</artifactId>
+					<groupId>ch.qos.logback</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<encoding>UTF-8</encoding>
+					<source>1.8</source>
+					<target>1.8</target>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
\ No newline at end of file