You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:01:58 UTC

[5/9] incubator-rya git commit: RYA-280-Periodic Query Service. Closes #177.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java
deleted file mode 100644
index 8b38923..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-
-import org.junit.Test;
-import org.openrdf.model.impl.LiteralImpl;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.algebra.Compare;
-import org.openrdf.query.algebra.Compare.CompareOp;
-import org.openrdf.query.algebra.Filter;
-import org.openrdf.query.algebra.ValueConstant;
-import org.openrdf.query.algebra.ValueExpr;
-import org.openrdf.query.algebra.Var;
-
-import com.google.common.base.Optional;
-
-/**
- * Tests the methods of {@link FilterFinder}.
- */
-public class FilterFinderTest {
-
-    @Test
-    public void manyFilters() throws Exception {
-        // The query that will be searched.
-        final String sparql =
-                "SELECT ?person ?age " +
-                "{" +
-                  "FILTER(?age < 30) . " +
-                  "FILTER(?person = <http://Alice>)" +
-                  "?person <http://hasAge> ?age" +
-                "}";
-
-        // Create the expected result.
-        final ValueExpr[] expected = new ValueExpr[2];
-        expected[0] =  new Compare(new Var("person"), new ValueConstant( new URIImpl("http://Alice") ));
-        expected[1] = new Compare(new Var("age"), new ValueConstant( new LiteralImpl("30", XMLSchema.INTEGER) ), CompareOp.LT);
-
-        // Run the test.
-        final FilterFinder finder = new FilterFinder();
-        final ValueExpr[] conditions = new ValueExpr[2];
-        conditions[0] = finder.findFilter(sparql, 0).get().getCondition();
-        conditions[1] = finder.findFilter(sparql, 1).get().getCondition();
-        assertTrue( Arrays.equals(expected, conditions) );
-    }
-
-    @Test
-    public void noFilterAtIndex() throws Exception {
-        // The query that will be searched.
-        final String sparql =
-                "SELECT ?person ?age " +
-                "{" +
-                  "FILTER(?age < 30) . " +
-                  "FILTER(?person = <http://Alice>)" +
-                  "?person <http://hasAge> ?age" +
-                "}";
-
-        // Run the test.
-        final FilterFinder finder = new FilterFinder();
-        final Optional<Filter> filter = finder.findFilter(sparql, 4);
-        assertFalse( filter.isPresent() );
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
deleted file mode 100644
index 99791ee..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.rya.indexing.pcj.fluo.app;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.fluo.api.data.Bytes;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.impl.MapBindingSet;
-
-/**
- * Tests the methods of {@link VisibilityBindingSetSerDe}.
- */
-public class VisibilityBindingSetSerDeTest {
-
-    @Test
-    public void rountTrip() throws Exception {
-        final ValueFactory vf = new ValueFactoryImpl();
-
-        final MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("name", vf.createLiteral("Alice"));
-        bs.addBinding("age", vf.createLiteral(5));
-        final VisibilityBindingSet original = new VisibilityBindingSet(bs, "u");
-
-        final VisibilityBindingSetSerDe serde = new VisibilityBindingSetSerDe();
-        final Bytes bytes = serde.serialize(original);
-        final VisibilityBindingSet result = serde.deserialize(bytes);
-
-        assertEquals(original, result);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
new file mode 100644
index 0000000..fe89325
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class BatchInformationSerializerTest {
+
+    @Test
+    public void testSpanBatchInformationSerialization() {
+
+        SpanBatchDeleteInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(1000)
+                .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix"))).build();
+        System.out.println(batch);
+        byte[] batchBytes = BatchInformationSerializer.toBytes(batch);
+        Optional<BatchInformation> decodedBatch = BatchInformationSerializer.fromBytes(batchBytes);
+        System.out.println(decodedBatch);
+        assertEquals(batch, decodedBatch.get());
+    }
+
+    @Test
+    public void testJoinBatchInformationSerialization() {
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("a", new URIImpl("urn:123"));
+        bs.addBinding("b", new URIImpl("urn:456"));
+        VisibilityBindingSet vBis = new VisibilityBindingSet(bs, "FOUO");
+        
+        JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1000).setTask(Task.Update)
+                .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix346")))
+                .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setVarOrder(new VariableOrder(Arrays.asList("a", "b")))
+                .setBs(vBis).build();
+        
+        byte[] batchBytes = BatchInformationSerializer.toBytes(batch);
+        Optional<BatchInformation> decodedBatch = BatchInformationSerializer.fromBytes(batchBytes);
+        assertEquals(batch, decodedBatch.get());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
new file mode 100644
index 0000000..c8ca6af
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeRelocator;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeVisitor;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+public class PeriodicQueryUtilTest {
+
+    private static final ValueFactory vf = new ValueFactoryImpl();
+
+   
+    
+    @Test
+    public void periodicNodeNotPresentTest() throws Exception {
+        
+        List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(vf.createLiteral(12.0)), new ValueConstant(vf.createLiteral(6.0)), new ValueConstant(vf.createURI(PeriodicQueryUtil.temporalNameSpace + "hours")));
+        FunctionCall func = new FunctionCall("uri:func", values);
+        Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join());
+        Assert.assertEquals(false, node1.isPresent());
+    }
+
+    
+    
+    @Test
+    public void periodicNodePresentTest() throws Exception {
+        
+        List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(vf.createLiteral(12.0)), new ValueConstant(vf.createLiteral(6.0)), new ValueConstant(vf.createURI(PeriodicQueryUtil.temporalNameSpace + "hours")));
+        FunctionCall func = new FunctionCall(PeriodicQueryUtil.PeriodicQueryURI, values);
+        Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join());
+        Assert.assertEquals(true, node1.isPresent());
+        
+        PeriodicQueryNode node2 = new PeriodicQueryNode(12*60*60*1000L, 6*3600*1000L, TimeUnit.MILLISECONDS, "time", new Join());
+        
+        Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node1.get(), node2));
+    }
+    
+    
+    @Test
+    public void periodicNodeFractionalDurationTest() throws Exception {
+        
+        List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(vf.createLiteral(1)), new ValueConstant(vf.createLiteral(.5)), new ValueConstant(vf.createURI(PeriodicQueryUtil.temporalNameSpace + "hours")));
+        FunctionCall func = new FunctionCall(PeriodicQueryUtil.PeriodicQueryURI, values);
+        Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join());
+        Assert.assertEquals(true, node1.isPresent());
+        
+        double window = 1*60*60*1000;
+        double period = .5*3600*1000;
+        
+        PeriodicQueryNode node2 = new PeriodicQueryNode((long) window, (long) period, TimeUnit.MILLISECONDS, "time", new Join());
+        
+        Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node1.get(), node2));
+    }
+    
+    @Test
+    public void testPeriodicNodePlacement() throws MalformedQueryException {
+         String query = "prefix function: <http://org.apache.rya/function#> " //n
+                + "prefix time: <http://www.w3.org/2006/time#> " //n
+                + "prefix fn: <http://www.w3.org/2006/fn#> " //n
+                + "select ?obs ?time ?lat where {" //n
+                + "Filter(function:periodic(?time, 12.0, 6.0,time:hours)) " //n
+                + "Filter(fn:test(?lat, 25)) " //n
+                + "?obs <uri:hasTime> ?time. " //n
+                + "?obs <uri:hasLattitude> ?lat }"; //n
+         
+         SPARQLParser parser = new SPARQLParser();
+         ParsedQuery pq = parser.parseQuery(query, null);
+         TupleExpr te = pq.getTupleExpr();
+         te.visit(new PeriodicQueryNodeVisitor());
+         
+         PeriodicNodeCollector collector = new PeriodicNodeCollector();
+         te.visit(collector);
+         
+         PeriodicQueryNode node2 = new PeriodicQueryNode(12*60*60*1000L, 6*3600*1000L, TimeUnit.MILLISECONDS, "time", new Join());
+
+         Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node2, collector.getPeriodicQueryNode()));
+         
+    }
+    
+    @Test
+    public void testPeriodicNodeLocation() throws MalformedQueryException {
+         String query = "prefix function: <http://org.apache.rya/function#> " //n
+                + "prefix time: <http://www.w3.org/2006/time#> " //n
+                + "prefix fn: <http://www.w3.org/2006/fn#> " //n
+                + "select ?obs ?time ?lat where {" //n
+                + "Filter(function:periodic(?time, 1,.5,time:hours)) " //n
+                + "Filter(fn:test(?lat, 25)) " //n
+                + "?obs <uri:hasTime> ?time. " //n
+                + "?obs <uri:hasLattitude> ?lat }"; //n
+         
+         SPARQLParser parser = new SPARQLParser();
+         ParsedQuery pq = parser.parseQuery(query, null);
+         TupleExpr te = pq.getTupleExpr();
+         te.visit(new PeriodicQueryNodeVisitor());
+        
+         PeriodicNodeCollector collector = new PeriodicNodeCollector();
+         te.visit(collector);
+         Assert.assertEquals(2, collector.getPos());
+         
+         te.visit(new PeriodicQueryNodeRelocator());
+         collector.resetCount();
+         te.visit(collector);
+         Assert.assertEquals(1, collector.getPos());
+         
+         double window = 1*60*60*1000;
+         double period = .5*3600*1000;
+         PeriodicQueryNode node2 = new PeriodicQueryNode((long) window, (long) period, TimeUnit.MILLISECONDS, "time", new Join());
+         Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node2, collector.getPeriodicQueryNode()));
+         
+    }
+    
+    @Test
+    public void testFluoQueryVarOrders() throws MalformedQueryException {
+        String query = "prefix function: <http://org.apache.rya/function#> " //n
+                + "prefix time: <http://www.w3.org/2006/time#> " //n
+                + "select (count(?obs) as ?total) where {" //n
+                + "Filter(function:periodic(?time, 12.4, 6.2,time:hours)) " //n
+                + "?obs <uri:hasTime> ?time. " //n
+                + "?obs <uri:hasLattitude> ?lat }"; //n
+         
+         SPARQLParser parser = new SPARQLParser();
+         ParsedQuery pq = parser.parseQuery(query, null);
+         SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+         FluoQuery fluoQuery = builder.make(pq, new NodeIds());
+         
+         PeriodicQueryMetadata periodicMeta = fluoQuery.getPeriodicQueryMetadata().orNull();
+         Assert.assertEquals(true, periodicMeta != null);
+         VariableOrder periodicVars = periodicMeta.getVariableOrder();
+         Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, periodicVars.getVariableOrders().get(0));
+         
+         QueryMetadata queryMeta = fluoQuery.getQueryMetadata().get();
+         VariableOrder queryVars = queryMeta.getVariableOrder();
+         Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, queryVars.getVariableOrders().get(0));
+         
+         Collection<AggregationMetadata> aggMetaCollection = fluoQuery.getAggregationMetadata();
+         Assert.assertEquals(1, aggMetaCollection.size());
+         AggregationMetadata aggMeta = aggMetaCollection.iterator().next();
+         VariableOrder aggVars = aggMeta.getVariableOrder();
+         Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, aggVars.getVariableOrders().get(0));
+         
+         System.out.println(fluoQuery);
+    }
+
+    private boolean periodicNodesEqualIgnoreArg(PeriodicQueryNode node1, PeriodicQueryNode node2) {
+        return new EqualsBuilder().append(node1.getPeriod(), node2.getPeriod()).append(node1.getWindowSize(), node2.getWindowSize())
+                .append(node1.getTemporalVariable(), node2.getTemporalVariable()).append(node1.getUnit(), node2.getUnit()).build();
+    }
+    
+    private static class PeriodicNodeCollector extends QueryModelVisitorBase<RuntimeException>{
+        
+        private PeriodicQueryNode periodicNode;
+        int count = 0;
+        
+        public PeriodicQueryNode getPeriodicQueryNode() {
+            return periodicNode;
+        }
+        
+        public int getPos() {
+            return count;
+        }
+        
+        public void resetCount() {
+            count = 0;
+        }
+        
+        public void meet(Filter node) {
+            count++;
+            node.getArg().visit(this);
+        }
+        
+        public void meet(Projection node) {
+            count++;
+            node.getArg().visit(this);
+        }
+        
+        @Override
+        public void meetOther(QueryModelNode node) {
+            if(node instanceof PeriodicQueryNode) {
+                periodicNode = (PeriodicQueryNode) node;
+            }
+        }
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
index 99ccc58..7a73b41 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
@@ -83,8 +83,7 @@ public class QueryReportRenderer {
             builder.appendItem( new ReportItem("FILTER NODE") );
             builder.appendItem( new ReportItem("Node ID", filterMetadata.getNodeId()) );
             builder.appendItem( new ReportItem("Variable Order", filterMetadata.getVariableOrder().toString()) );
-            builder.appendItem( new ReportItem("Original SPARQL", prettyFormatSparql(  filterMetadata.getOriginalSparql()) ) );
-            builder.appendItem( new ReportItem("Filter Index", "" + filterMetadata.getFilterIndexWithinSparql()) );
+            builder.appendItem( new ReportItem("Filter SPARQL", prettyFormatSparql(  filterMetadata.getFilterSparql())));
             builder.appendItem( new ReportItem("Parent Node ID", filterMetadata.getParentNodeId()) );
             builder.appendItem( new ReportItem("Child Node ID", filterMetadata.getChildNodeId()) );
             builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(filterMetadata.getNodeId())) );

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index 6467191..9591e55 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -43,6 +43,11 @@
             <artifactId>rya.indexing</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.test.base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+         <dependency>
             <groupId>org.apache.fluo</groupId>
             <artifactId>fluo-api</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java
deleted file mode 100644
index b5d9428..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java
+++ /dev/null
@@ -1,282 +0,0 @@
-package org.apache.rya.indexing.pcj.fluo;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.net.UnknownHostException;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
-import org.apache.rya.accumulo.MiniAccumuloSingleton;
-import org.apache.rya.accumulo.RyaTestInstanceRule;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloInstall;
-import org.apache.zookeeper.ClientCnxn;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.openrdf.repository.RepositoryConnection;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.sail.Sail;
-import org.openrdf.sail.SailException;
-
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.RyaClientException;
-import org.apache.rya.api.client.Install;
-import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
-import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
-import org.apache.rya.sail.config.RyaSailFactory;
-
-/**
- * Integration tests that ensure the Fluo application processes PCJs results
- * correctly.
- * <p>
- * This class is being ignored because it doesn't contain any unit tests.
- */
-public abstract class FluoITBase {
-    private static final Logger log = Logger.getLogger(FluoITBase.class);
-
-    // Mini Accumulo Cluster
-    private static MiniAccumuloClusterInstance clusterInstance = MiniAccumuloSingleton.getInstance();
-    private static MiniAccumuloCluster cluster;
-
-    private static String instanceName = null;
-    private static String zookeepers = null;
-
-    protected static Connector accumuloConn = null;
-
-    // Fluo data store and connections.
-    protected MiniFluo fluo = null;
-    protected FluoConfiguration fluoConfig = null;
-    protected FluoClient fluoClient = null;
-
-    // Rya data store and connections.
-    protected RyaSailRepository ryaRepo = null;
-    protected RepositoryConnection ryaConn = null;
-
-    @Rule
-    public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false);
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
-
-        // Setup and start the Mini Accumulo.
-        cluster = clusterInstance.getCluster();
-
-        // Store a connector to the Mini Accumulo.
-        instanceName = cluster.getInstanceName();
-        zookeepers = cluster.getZooKeepers();
-
-        final Instance instance = new ZooKeeperInstance(instanceName, zookeepers);
-        accumuloConn = instance.getConnector(clusterInstance.getUsername(), new PasswordToken(clusterInstance.getPassword()));
-    }
-
-    @Before
-    public void setupMiniResources() throws Exception {
-        // Initialize the Mini Fluo that will be used to store created queries.
-        fluoConfig = createFluoConfig();
-        preFluoInitHook();
-        FluoFactory.newAdmin(fluoConfig).initialize(new FluoAdmin.InitializationOptions()
-                .setClearTable(true)
-                .setClearZookeeper(true));
-        postFluoInitHook();
-        fluo = FluoFactory.newMiniFluo(fluoConfig);
-        fluoClient = FluoFactory.newClient(fluo.getClientConfiguration());
-
-        // Initialize the Rya that will be used by the tests.
-        ryaRepo = setupRya();
-        ryaConn = ryaRepo.getConnection();
-    }
-
-    @After
-    public void shutdownMiniResources() {
-        if (ryaConn != null) {
-            try {
-                log.info("Shutting down Rya Connection.");
-                ryaConn.close();
-                log.info("Rya Connection shut down.");
-            } catch (final Exception e) {
-                log.error("Could not shut down the Rya Connection.", e);
-            }
-        }
-
-        if (ryaRepo != null) {
-            try {
-                log.info("Shutting down Rya Repo.");
-                ryaRepo.shutDown();
-                log.info("Rya Repo shut down.");
-            } catch (final Exception e) {
-                log.error("Could not shut down the Rya Repo.", e);
-            }
-        }
-
-        if (fluoClient != null) {
-            try {
-                log.info("Shutting down Fluo Client.");
-                fluoClient.close();
-                log.info("Fluo Client shut down.");
-            } catch (final Exception e) {
-                log.error("Could not shut down the Fluo Client.", e);
-            }
-        }
-
-        if (fluo != null) {
-            try {
-                log.info("Shutting down Mini Fluo.");
-                fluo.close();
-                log.info("Mini Fluo shut down.");
-            } catch (final Exception e) {
-                log.error("Could not shut down the Mini Fluo.", e);
-            }
-        }
-    }
-
-    protected void preFluoInitHook() throws Exception {
-
-    }
-
-    protected void postFluoInitHook() throws Exception {
-
-    }
-
-    protected MiniAccumuloCluster getMiniAccumuloCluster() {
-        return cluster;
-    }
-
-    protected MiniFluo getMiniFluo() {
-        return fluo;
-    }
-
-    public RyaSailRepository getRyaSailRepository() {
-        return ryaRepo;
-    }
-
-    public Connector getAccumuloConnector() {
-        return accumuloConn;
-    }
-
-    public String getRyaInstanceName() {
-        return testInstance.getRyaInstanceName();
-    }
-
-    protected String getUsername() {
-        return clusterInstance.getUsername();
-    }
-
-    protected String getPassword() {
-        return clusterInstance.getPassword();
-    }
-
-    protected FluoConfiguration getFluoConfiguration() {
-        return fluoConfig;
-    }
-
-    public AccumuloConnectionDetails createConnectionDetails() {
-        return new AccumuloConnectionDetails(
-                clusterInstance.getUsername(),
-                clusterInstance.getPassword().toCharArray(),
-                clusterInstance.getInstanceName(),
-                clusterInstance.getZookeepers());
-    }
-
-    private FluoConfiguration createFluoConfig() {
-        // Configure how the mini fluo will run.
-        final FluoConfiguration config = new FluoConfiguration();
-        config.setMiniStartAccumulo(false);
-        config.setAccumuloInstance(instanceName);
-        config.setAccumuloUser(clusterInstance.getUsername());
-        config.setAccumuloPassword(clusterInstance.getPassword());
-        config.setInstanceZookeepers(zookeepers + "/fluo");
-        config.setAccumuloZookeepers(zookeepers);
-
-        config.setApplicationName(getRyaInstanceName());
-        config.setAccumuloTable("fluo" + getRyaInstanceName());
-        return config;
-    }
-
-    /**
-     * Sets up a Rya instance.
-     */
-    protected RyaSailRepository setupRya()
-            throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException,
-            NumberFormatException, UnknownHostException, InferenceEngineException, AlreadyInitializedException,
-            RyaDetailsRepositoryException, DuplicateInstanceNameException, RyaClientException, SailException {
-        checkNotNull(instanceName);
-        checkNotNull(zookeepers);
-
-        // Setup Rya configuration values.
-        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix(getRyaInstanceName());
-        conf.setDisplayQueryPlan(true);
-        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false);
-        conf.set(ConfigUtils.CLOUDBASE_USER, clusterInstance.getUsername());
-        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, clusterInstance.getPassword());
-        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, clusterInstance.getInstanceName());
-        conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, clusterInstance.getZookeepers());
-        conf.set(ConfigUtils.USE_PCJ, "true");
-        conf.set(ConfigUtils.FLUO_APP_NAME, getRyaInstanceName());
-        conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
-        conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
-        conf.set(ConfigUtils.CLOUDBASE_AUTHS, "");
-
-        // Install the test instance of Rya.
-        final Install install = new AccumuloInstall(createConnectionDetails(), accumuloConn);
-
-        final InstallConfiguration installConfig = InstallConfiguration.builder()
-                .setEnableTableHashPrefix(true)
-                .setEnableEntityCentricIndex(true)
-                .setEnableFreeTextIndex(true)
-                .setEnableTemporalIndex(true)
-                .setEnablePcjIndex(true)
-                .setEnableGeoIndex(true)
-                .setFluoPcjAppName(getRyaInstanceName())
-                .build();
-        install.install(getRyaInstanceName(), installConfig);
-
-        // Connect to the instance of Rya that was just installed.
-        final Sail sail = RyaSailFactory.getInstance(conf);
-        final RyaSailRepository ryaRepo = new RyaSailRepository(sail);
-
-        return ryaRepo;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
deleted file mode 100644
index 452dd27..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo;
-
-import static java.util.Objects.requireNonNull;
-import static org.junit.Assert.assertEquals;
-
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.recipes.test.AccumuloExportITBase;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.AccumuloRyaDAO;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
-import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-import org.apache.rya.sail.config.RyaSailFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.openrdf.model.Statement;
-import org.openrdf.repository.sail.SailRepositoryConnection;
-import org.openrdf.sail.Sail;
-
-
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-
-/**
- * The base Integration Test class used for Fluo applications that export to a
- * Kakfa topic.
- */
-public class KafkaExportITBase extends AccumuloExportITBase {
-
-    protected static final String RYA_INSTANCE_NAME = "test_";
-
-    private static final String ZKHOST = "127.0.0.1";
-    private static final String BROKERHOST = "127.0.0.1";
-    private static final String BROKERPORT = "9092";
-    private ZkUtils zkUtils;
-    private KafkaServer kafkaServer;
-    private EmbeddedZookeeper zkServer;
-    private ZkClient zkClient;
-
-    // The Rya instance statements are written to that will be fed into the Fluo
-    // app.
-    private RyaSailRepository ryaSailRepo = null;
-    private AccumuloRyaDAO dao = null;
-
-    /**
-     * Add info about the Kafka queue/topic to receive the export.
-     */
-    @Override
-    protected void preFluoInitHook() throws Exception {
-        // Setup the observers that will be used by the Fluo PCJ Application.
-        final List<ObserverSpecification> observers = new ArrayList<>();
-        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
-        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
-        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
-        observers.add(new ObserverSpecification(FilterObserver.class.getName()));
-        observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
-
-        // Configure the export observer to export new PCJ results to the mini
-        // accumulo cluster.
-        final HashMap<String, String> exportParams = new HashMap<>();
-
-        final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
-        kafkaParams.setExportToKafka(true);
-
-        // Configure the Kafka Producer
-        final Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
-        kafkaParams.addAllProducerConfig(producerConfig);
-
-        final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
-        observers.add(exportObserverConfig);
-        
-        //create construct query observer and tell it not to export to Kafka
-        //it will only add results back into Fluo
-        HashMap<String, String> constructParams = new HashMap<>();
-        final KafkaExportParameters kafkaConstructParams = new KafkaExportParameters(constructParams);
-        kafkaConstructParams.setExportToKafka(true);
-        
-        // Configure the Kafka Producer
-        final Properties constructProducerConfig = new Properties();
-        constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
-        constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName());
-        kafkaConstructParams.addAllProducerConfig(constructProducerConfig);
-
-        final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
-                constructParams);
-        observers.add(constructExportObserverConfig);
-
-        // Add the observers to the Fluo Configuration.
-        super.getFluoConfiguration().addObservers(observers);
-    }
-
-    /**
-     * setup mini kafka and call the super to setup mini fluo
-     */
-    @Before
-    public void setupKafka() throws Exception {
-        // Install an instance of Rya on the Accumulo cluster.
-        installRyaInstance();
-
-        // Setup Kafka.
-        zkServer = new EmbeddedZookeeper();
-        final String zkConnect = ZKHOST + ":" + zkServer.port();
-        zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
-        zkUtils = ZkUtils.apply(zkClient, false);
-
-        // setup Broker
-        final Properties brokerProps = new Properties();
-        brokerProps.setProperty("zookeeper.connect", zkConnect);
-        brokerProps.setProperty("broker.id", "0");
-        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
-        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
-        final KafkaConfig config = new KafkaConfig(brokerProps);
-        final Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
-    }
-
-    @After
-    public void teardownRya() {
-        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
-        final String instanceName = cluster.getInstanceName();
-        final String zookeepers = cluster.getZooKeepers();
-
-        // Uninstall the instance of Rya.
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
-                new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
-                super.getAccumuloConnector());
-
-        try {
-            ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
-            // Shutdown the repo.
-            if(ryaSailRepo != null) {ryaSailRepo.shutDown();}
-            if(dao != null ) {dao.destroy();}
-        } catch (Exception e) {
-            System.out.println("Encountered the following Exception when shutting down Rya: " + e.getMessage());
-        }
-    }
-
-    private void installRyaInstance() throws Exception {
-        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
-        final String instanceName = cluster.getInstanceName();
-        final String zookeepers = cluster.getZooKeepers();
-
-        // Install the Rya instance to the mini accumulo cluster.
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
-                new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
-                super.getAccumuloConnector());
-
-        ryaClient.getInstall().install(RYA_INSTANCE_NAME,
-                InstallConfiguration.builder().setEnableTableHashPrefix(false).setEnableFreeTextIndex(false)
-                        .setEnableEntityCentricIndex(false).setEnableGeoIndex(false).setEnableTemporalIndex(false).setEnablePcjIndex(true)
-                        .setFluoPcjAppName(super.getFluoConfiguration().getApplicationName()).build());
-
-        // Connect to the Rya instance that was just installed.
-        final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
-        final Sail sail = RyaSailFactory.getInstance(conf);
-        dao = RyaSailFactory.getAccumuloDAOWithUpdatedConfig(conf);
-        ryaSailRepo = new RyaSailRepository(sail);
-    }
-
-    protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) {
-        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix(RYA_INSTANCE_NAME);
-
-        // Accumulo connection information.
-        conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER);
-        conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD);
-        conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
-        conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
-        conf.setAuths("");
-
-        // PCJ configuration information.
-        conf.set(ConfigUtils.USE_PCJ, "true");
-        conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
-        conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName());
-        conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
-        conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
-
-        conf.setDisplayQueryPlan(true);
-
-        return conf;
-    }
-
-    /**
-     * @return A {@link RyaSailRepository} that is connected to the Rya instance
-     *         that statements are loaded into.
-     */
-    protected RyaSailRepository getRyaSailRepository() throws Exception {
-        return ryaSailRepo;
-    }
-
-    /**
-     * @return A {@link AccumuloRyaDAO} so that RyaStatements with distinct
-     *         visibilities can be added to the Rya Instance
-     */
-    protected AccumuloRyaDAO getRyaDAO() {
-        return dao;
-    }
-
-    /**
-     * Close all the Kafka mini server and mini-zookeeper
-     */
-    @After
-    public void teardownKafka() {
-        if(kafkaServer != null) {kafkaServer.shutdown();}
-        if(zkClient != null) {zkClient.close();}
-        if(zkServer != null) {zkServer.shutdown();}
-    }
-
-    /**
-     * Test kafka without rya code to make sure kafka works in this environment.
-     * If this test fails then its a testing environment issue, not with Rya.
-     * Source: https://github.com/asmaier/mini-kafka
-     */
-    @Test
-    public void embeddedKafkaTest() throws Exception {
-        // create topic
-        final String topic = "testTopic";
-        AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
-
-        // setup producer
-        final Properties producerProps = new Properties();
-        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
-        producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
-        producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-        final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps);
-
-        // setup consumer
-        final Properties consumerProps = new Properties();
-        consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
-        consumerProps.setProperty("group.id", "group0");
-        consumerProps.setProperty("client.id", "consumer0");
-        consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
-        consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-
-        // to make sure the consumer starts from the beginning of the topic
-        consumerProps.put("auto.offset.reset", "earliest");
-
-        final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
-        consumer.subscribe(Arrays.asList(topic));
-
-        // send message
-        final ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, 42, "test-message".getBytes(StandardCharsets.UTF_8));
-        producer.send(data);
-        producer.close();
-
-        // starting consumer
-        final ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
-        assertEquals(1, records.count());
-        final Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
-        final ConsumerRecord<Integer, byte[]> record = recordIterator.next();
-        assertEquals(42, (int) record.key());
-        assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8));
-        consumer.close();
-    }
-
-    protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) {
-        // setup consumer
-        final Properties consumerProps = new Properties();
-        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
-        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
-        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
-        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.IntegerDeserializer");
-        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
-
-        // to make sure the consumer starts from the beginning of the topic
-        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
-        consumer.subscribe(Arrays.asList(TopicName));
-        return consumer;
-    }
-
-    protected String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
-        requireNonNull(sparql);
-        requireNonNull(statements);
-
-        // Register the PCJ with Rya.
-        final Instance accInstance = super.getAccumuloConnector().getInstance();
-        final Connector accumuloConn = super.getAccumuloConnector();
-
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER,
-                ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn);
-
-        final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
-
-        // Write the data to Rya.
-        final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
-        ryaConn.begin();
-        ryaConn.add(statements);
-        ryaConn.commit();
-        ryaConn.close();
-
-        // Wait for the Fluo application to finish computing the end result.
-        super.getMiniFluo().waitForObservers();
-
-        // The PCJ Id is the topic name the results will be written to.
-        return pcjId;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
deleted file mode 100644
index 4eab0f6..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
-import org.junit.BeforeClass;
-
-/**
- * The base Integration Test class used for Fluo applications that export to a Rya PCJ Index.
- */
-public class RyaExportITBase extends FluoITBase {
-
-    @BeforeClass
-    public static void setupLogging() {
-        BasicConfigurator.configure();
-        Logger.getRootLogger().setLevel(Level.ERROR);
-    }
-
-    @Override
-    protected void preFluoInitHook() throws Exception {
-        // Setup the observers that will be used by the Fluo PCJ Application.
-        final List<ObserverSpecification> observers = new ArrayList<>();
-        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
-        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
-        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
-        observers.add(new ObserverSpecification(FilterObserver.class.getName()));
-        observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
-
-        // Configure the export observer to export new PCJ results to the mini accumulo cluster.
-        final HashMap<String, String> exportParams = new HashMap<>();
-        final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
-        ryaParams.setExportToRya(true);
-        ryaParams.setRyaInstanceName(getRyaInstanceName());
-        ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName());
-        ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers());
-        ryaParams.setExporterUsername(getUsername());
-        ryaParams.setExporterPassword(getPassword());
-
-        final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
-        observers.add(exportObserverConfig);
-        
-        final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
-        kafkaParams.setExportToKafka(false);
-
-        final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
-                exportParams);
-        observers.add(constructExportObserverConfig);
-
-        // Add the observers to the Fluo Configuration.
-        super.getFluoConfiguration().addObservers(observers);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 3a42a23..cb34d06 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -29,7 +29,7 @@ import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 
 import com.google.common.base.Optional;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index 9a1c285..d5c0e5f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -30,7 +30,6 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException;
 import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException;
 import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -39,6 +38,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.QueryEvaluationException;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index d19646e..965a7b9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -31,13 +31,13 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 
 import com.google.common.base.Optional;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
index ec301ba..e3914bd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
@@ -29,7 +29,7 @@ import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Transaction;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 
 import com.beust.jcommander.internal.Lists;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index accabbf..d403404 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -21,12 +21,12 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
 import static org.junit.Assert.assertEquals;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.Transaction;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
@@ -34,6 +34,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.algebra.StatementPattern;
@@ -42,8 +43,6 @@ import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 import org.openrdf.repository.RepositoryException;
 
-import com.google.common.base.Optional;
-
 /**
  * Integration tests the methods of {@link FluoQueryMetadataDAO}.
  */
@@ -87,8 +86,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
         builder.setVarOrder(new VariableOrder("e;f"));
         builder.setParentNodeId("parentNodeId");
         builder.setChildNodeId("childNodeId");
-        builder.setOriginalSparql("originalSparql");
-        builder.setFilterIndexWithinSparql(2);
+        builder.setFilterSparql("originalSparql");
         final FilterMetadata originalMetadata = builder.build();
 
         try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -232,11 +230,10 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
                 storedMetadata = dao.readAggregationMetadata(sx, "nodeId");
             }
 
-            // Ensure the deserialized object is the same as the serialized one.
-            assertEquals(originalMetadata, storedMetadata);
         }
     }
 
+
     @Test
     public void aggregationMetadataTest_noGroupByVarOrders() {
         final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
@@ -267,6 +264,41 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
             assertEquals(originalMetadata, storedMetadata);
         }
     }
+    
+    @Test
+    public void periodicQueryMetadataTest() {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        PeriodicQueryMetadata originalMetadata =  PeriodicQueryMetadata.builder()
+            .setNodeId("nodeId")
+            .setParentNodeId("parentNodeId")
+            .setVarOrder(new VariableOrder("a","b","c"))
+            .setChildNodeId("childNodeId")
+            .setPeriod(10)
+            .setWindowSize(20)
+            .setUnit(TimeUnit.DAYS)
+            .setTemporalVariable("a")
+            .build();
+            
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
+
+            // Read it from the Fluo table.
+            PeriodicQueryMetadata storedMetadata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetadata = dao.readPeriodicQueryMetadata(sx, "nodeId");
+            }
+
+            // Ensure the deserialized object is the same as the serialized one.
+            assertEquals(originalMetadata, storedMetadata);
+        }
+    }
 
     @Test
     public void fluoQueryTest() throws MalformedQueryException {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
new file mode 100644
index 0000000..0cd7cfb
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+public class BatchDeleteIT extends RyaExportITBase {
+
+    private static final Logger log = Logger.getLogger(BatchDeleteIT.class);
+    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+    @Test
+    public void simpleScanDelete() throws Exception {
+
+        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+                + " <urn:predicate_2> ?object2 } ";
+        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+            RyaURI subj = new RyaURI("urn:subject_1");
+            RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+            Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
+            Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
+
+            // Create the PCJ table.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Tell the Fluo app to maintain the PCJ.
+            String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+
+            List<String> ids = getNodeIdStrings(fluoClient, queryId);
+            List<String> prefixes = Arrays.asList("urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
+
+            // Stream the data into Fluo.
+            InsertTriples inserter = new InsertTriples();
+            inserter.insert(fluoClient, statements1, Optional.<String> absent());
+            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+            // Verify the end results of the query match the expected results.
+            getMiniFluo().waitForObservers();
+
+            verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 10, 10));
+
+            createSpanBatches(fluoClient, ids, prefixes, 10);
+            getMiniFluo().waitForObservers();
+
+            verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0));
+        }
+    }
+
+     @Test
+    public void simpleJoinDelete() throws Exception {
+        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+                + " <urn:predicate_2> ?object2 } ";
+        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+            RyaURI subj = new RyaURI("urn:subject_1");
+            RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+            Set<RyaStatement> statements1 = getRyaStatements(statement1, 5);
+            Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
+
+            // Create the PCJ table.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Tell the Fluo app to maintain the PCJ.
+            String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+
+            List<String> ids = getNodeIdStrings(fluoClient, queryId);
+            String joinId = ids.get(1);
+            String rightSp = ids.get(3);
+            QueryBindingSet bs = new QueryBindingSet();
+            bs.addBinding("subject", new URIImpl("urn:subject_1"));
+            bs.addBinding("object1", new URIImpl("urn:object_0"));
+            VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+            Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+            VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2"));
+
+            // Stream the data into Fluo.
+            InsertTriples inserter = new InsertTriples();
+            inserter.insert(fluoClient, statements1, Optional.<String> absent());
+            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 5, 5));
+
+            JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
+                    .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Delete)
+                    .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build();
+            // Verify the end results of the query match the expected results.
+            createSpanBatch(fluoClient, joinId, batch);
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(25, 20, 5, 5));
+        }
+    }
+
+     @Test
+    public void simpleJoinAdd() throws Exception {
+        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+                + " <urn:predicate_2> ?object2 } ";
+        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+            RyaURI subj = new RyaURI("urn:subject_1");
+            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+            Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
+
+            // Create the PCJ table.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Tell the Fluo app to maintain the PCJ.
+            String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+
+            List<String> ids = getNodeIdStrings(fluoClient, queryId);
+            String joinId = ids.get(1);
+            String rightSp = ids.get(3);
+            QueryBindingSet bs = new QueryBindingSet();
+            bs.addBinding("subject", new URIImpl("urn:subject_1"));
+            bs.addBinding("object1", new URIImpl("urn:object_0"));
+            VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+            Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+            VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2"));
+
+            // Stream the data into Fluo.
+            InsertTriples inserter = new InsertTriples();
+            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 5));
+
+            JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
+                    .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Add)
+                    .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build();
+            // Verify the end results of the query match the expected results.
+            createSpanBatch(fluoClient, joinId, batch);
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(5, 5, 0, 5));
+        }
+    }
+
+    private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) {
+
+        Set<RyaStatement> statements = new HashSet<>();
+        final String subject = "urn:subject_";
+        final String predicate = "urn:predicate_";
+        final String object = "urn:object_";
+
+        for (int i = 0; i < numTriples; i++) {
+            RyaStatement stmnt = new RyaStatement(statement.getSubject(), statement.getPredicate(), statement.getObject());
+            if (stmnt.getSubject() == null) {
+                stmnt.setSubject(new RyaURI(subject + i));
+            }
+            if (stmnt.getPredicate() == null) {
+                stmnt.setPredicate(new RyaURI(predicate + i));
+            }
+            if (stmnt.getObject() == null) {
+                stmnt.setObject(new RyaURI(object + i));
+            }
+            statements.add(stmnt);
+        }
+        return statements;
+    }
+
+    private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) {
+        List<String> nodeStrings = new ArrayList<>();
+        try (Snapshot sx = fluoClient.newSnapshot()) {
+            FluoQuery query = dao.readFluoQuery(sx, queryId);
+            nodeStrings.add(queryId);
+            Collection<JoinMetadata> jMeta = query.getJoinMetadata();
+            for (JoinMetadata meta : jMeta) {
+                nodeStrings.add(meta.getNodeId());
+                nodeStrings.add(meta.getLeftChildNodeId());
+                nodeStrings.add(meta.getRightChildNodeId());
+            }
+        }
+        return nodeStrings;
+    }
+
+    private void createSpanBatches(FluoClient fluoClient, List<String> ids, List<String> prefixes, int batchSize) {
+
+        Preconditions.checkArgument(ids.size() == prefixes.size());
+
+        try (Transaction tx = fluoClient.newTransaction()) {
+            for (int i = 0; i < ids.size(); i++) {
+                String id = ids.get(i);
+                String bsPrefix = prefixes.get(i);
+                NodeType type = NodeType.fromNodeId(id).get();
+                Column bsCol = type.getResultColumn();
+                String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix;
+                Span span = Span.prefix(Bytes.of(row));
+                BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span)
+                        .build();
+                BatchInformationDAO.addBatch(tx, id, batch);
+            }
+            tx.commit();
+        }
+    }
+
+    private void createSpanBatch(FluoClient fluoClient, String nodeId, BatchInformation batch) {
+        try (Transaction tx = fluoClient.newTransaction()) {
+            BatchInformationDAO.addBatch(tx, nodeId, batch);
+            tx.commit();
+        }
+    }
+
+    private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) {
+        try (Transaction tx = fluoClient.newTransaction()) {
+            int count = 0;
+            RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build();
+            Iterator<ColumnScanner> colScanners = scanner.iterator();
+            while (colScanners.hasNext()) {
+                ColumnScanner colScanner = colScanners.next();
+                Iterator<ColumnValue> vals = colScanner.iterator();
+                while (vals.hasNext()) {
+                    vals.next();
+                    count++;
+                }
+            }
+            tx.commit();
+            return count;
+        }
+    }
+
+    private void verifyCounts(FluoClient fluoClient, List<String> ids, List<Integer> expectedCounts) {
+        Preconditions.checkArgument(ids.size() == expectedCounts.size());
+        for (int i = 0; i < ids.size(); i++) {
+            String id = ids.get(i);
+            int expected = expectedCounts.get(i);
+            NodeType type = NodeType.fromNodeId(id).get();
+            int count = countResults(fluoClient, id, type.getResultColumn());
+            log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected);
+            switch (type) {
+            case STATEMENT_PATTERN:
+                assertEquals(expected, count);
+                break;
+            case JOIN:
+                assertEquals(expected, count);
+                break;
+            case QUERY:
+                assertEquals(expected, count);
+                break;
+            default:
+                break;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 414fa70..0f2d892 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -35,8 +35,8 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Span;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.model.ValueFactory;