You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:49 UTC

[32/50] [abbrv] incubator-rya git commit: RYA-377 Fixed the project layout so that it builds even when the geoindexing profile is not enabled.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
new file mode 100644
index 0000000..ba11e57
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.sp;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+/**
+ * Integration tests the methods of {@link StatementPatternProcessor}.
+ */
+public class StatementPatternProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test
+    public void singlePattern_singleStatement() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create a statement that generate an SP result.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        final QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void singlePattern_manyStatements() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create some statements where some generates SP results and others do not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoin")), "b") );
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Alice")), "a|b") );
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "c") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Alice"));
+        expected.add( new VisibilityBindingSet(bs, "a|b") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void multiplePatterns_singleStatement() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final String query = "SELECT * WHERE { "
+                + "?person <urn:talksTo> ?otherPerson . "
+                + "?person ?action <urn:Bob>"
+                + "}";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create some statements where some generates SP results and others do not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        final QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("action", vf.createURI("urn:talksTo"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void multiplePatterns_multipleStatements() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final String query = "SELECT * WHERE { "
+                + "?person <urn:talksTo> ?otherPerson ."
+                + "?person ?action <urn:Bob>"
+                + "}";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create some statements where some generates SP results and others do not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "a|b") );
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:walksWith"), vf.createURI("urn:Bob")), "b") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("action", vf.createURI("urn:talksTo"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Charlie"));
+        expected.add(new VisibilityBindingSet(bs, "a&(a|b)"));
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("action", vf.createURI("urn:talksTo"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka-test/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka-test/pom.xml b/extras/rya.streams/kafka-test/pom.xml
new file mode 100644
index 0000000..4a423e2
--- /dev/null
+++ b/extras/rya.streams/kafka-test/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.streams.parent</artifactId>
+        <version>3.2.12-incubating-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.streams.kafka-test</artifactId>
+    
+    <name>Apache Rya Streams Kafka Test</name>
+    <description>
+        A common test jar containing utilities used to run Kafka based Rya 
+        Streams integration tests.
+    </description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.test.kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.streams.kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
new file mode 100644
index 0000000..ee25f8c
--- /dev/null
+++ b/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
+
+import com.google.common.collect.Sets;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Utility functions that make it easier to test Rya Streams applications.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaStreamsTestUtil {
+
+    /**
+     * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
+     * the results topic, and ensures the expected results match the read results.
+     *
+     * @param <T> The type of value that will be consumed from the results topic.
+     * @param kafka - The embedded Kafka instance that is being tested with. (not null)
+     * @param statementsTopic - The topic statements will be written to. (not null)
+     * @param resultsTopic - The topic results will be read from. (not null)
+     * @param builder - The streams topology that will be executed. (not null)
+     * @param statements - The statements that will be loaded into the topic. (not null)
+     * @param expected - The expected results. (not null)
+     * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
+     *   values from the results topic. (not null)
+     * @throws Exception If any exception was thrown while running the test.
+     */
+    public static <T> void runStreamProcessingTest(
+            final KafkaTestInstanceRule kafka,
+            final String statementsTopic,
+            final String resultsTopic,
+            final TopologyBuilder builder,
+            final List<VisibilityStatement> statements,
+            final Set<T> expected,
+            final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
+        requireNonNull(kafka);
+        requireNonNull(statementsTopic);
+        requireNonNull(resultsTopic);
+        requireNonNull(builder);
+        requireNonNull(statements);
+        requireNonNull(expected);
+        requireNonNull(expectedDeserializerClass);
+
+        // Explicitly create the topics that are being used.
+        kafka.createTopic(statementsTopic);
+        kafka.createTopic(resultsTopic);
+
+        // Start the streams program.
+        final Properties props = kafka.createBootstrapServerConfig();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT");
+
+        final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
+        streams.cleanUp();
+        try {
+            streams.start();
+
+            // Wait for the streams application to start. Streams only see data after their consumers are connected.
+            Thread.sleep(6000);
+
+            // Load the statements into the input topic.
+            try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
+                    kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
+                new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
+            }
+
+            // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
+            try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) {
+                // Register the topic.
+                consumer.subscribe(Arrays.asList(resultsTopic));
+
+                // Poll for the result.
+                final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
+
+                // Show the correct binding sets results from the job.
+                assertEquals(expected, results);
+            }
+        } finally {
+            streams.close();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 8926870..778630d 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -36,23 +36,22 @@ under the License.
     <profiles>
         <profile>
             <id>geoindexing</id>
-                <dependencies>
-                    <!-- Rya dependencies -->
-                    <dependency>
-                        <groupId>org.apache.rya</groupId>
-                        <artifactId>rya.pcj.functions.geo</artifactId>
-                        <version>${project.version}</version>
-                    </dependency>
-                    <dependency>
-                        <groupId>org.apache.rya</groupId>
-                        <artifactId>rya.geo.common</artifactId>
-                        <version>${project.version}</version>
-                    </dependency>
-                </dependencies>
+            <dependencies>
+                <!-- Rya dependencies -->
+                <dependency>
+                    <groupId>org.apache.rya</groupId>
+                    <artifactId>rya.pcj.functions.geo</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.rya</groupId>
+                    <artifactId>rya.geo.common</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+            </dependencies>
         </profile>
     </profiles>
 
-
     <dependencies>
         <!-- Rya dependencies -->
         <dependency>
@@ -106,6 +105,11 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>
+            <artifactId>rya.test.rdf</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
             <artifactId>rya.test.kafka</artifactId>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java
deleted file mode 100644
index b4388c3..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.openrdf.query.algebra.Filter;
-import org.openrdf.query.algebra.MultiProjection;
-import org.openrdf.query.algebra.Projection;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import edu.umd.cs.findbugs.annotations.Nullable;
-
-/**
- * A set of utility functions that are useful when writing tests RDF functions.
- */
-@DefaultAnnotation(NonNull.class)
-public final class RdfTestUtil {
-
-    private RdfTestUtil() { }
-
-    /**
-     * Fetch the {@link StatementPattern} from a SPARQL string.
-     *
-     * @param sparql - A SPARQL query that contains only a single Statement Patern. (not nul)
-     * @return The {@link StatementPattern} that was in the query, if it could be found. Otherwise {@code null}
-     * @throws Exception The statement pattern could not be found in the parsed SPARQL query.
-     */
-    public static @Nullable StatementPattern getSp(final String sparql) throws Exception {
-        requireNonNull(sparql);
-
-        final AtomicReference<StatementPattern> statementPattern = new AtomicReference<>();
-        final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
-        parsed.getTupleExpr().visitChildren(new QueryModelVisitorBase<Exception>() {
-            @Override
-            public void meet(final StatementPattern node) throws Exception {
-                statementPattern.set(node);
-            }
-        });
-        return statementPattern.get();
-    }
-
-    /**
-     * Get the first {@link Projection} node from a SPARQL query.
-     *
-     * @param sparql - The query that contains a single Projection node.
-     * @return The first {@link Projection} that is encountered.
-     * @throws Exception The query could not be parsed.
-     */
-    public static @Nullable Projection getProjection(final String sparql) throws Exception {
-        requireNonNull(sparql);
-
-        final AtomicReference<Projection> projection = new AtomicReference<>();
-        final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
-        parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() {
-            @Override
-            public void meet(final Projection node) throws Exception {
-                projection.set(node);
-            }
-        });
-
-        return projection.get();
-    }
-
-    /**
-     * Get the first {@link MultiProjection} node from a SPARQL query.
-     *
-     * @param sparql - The query that contains a single Projection node.
-     * @return The first {@link MultiProjection} that is encountered.
-     * @throws Exception The query could not be parsed.
-     */
-    public static @Nullable MultiProjection getMultiProjection(final String sparql) throws Exception {
-        requireNonNull(sparql);
-
-        final AtomicReference<MultiProjection> multiProjection = new AtomicReference<>();
-        final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
-        parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() {
-            @Override
-            public void meet(final MultiProjection node) throws Exception {
-                multiProjection.set(node);
-            }
-        });
-
-        return multiProjection.get();
-    }
-
-    /**
-     * Get the first {@link Filter} node from a SPARQL query.
-     *
-     * @param sparql - The query that contains a single Projection node.
-     * @return The first {@link Filter} that is encountered.
-     * @throws Exception The query could not be parsed.
-     */
-    public static @Nullable Filter getFilter(final String sparql) throws Exception {
-        requireNonNull(sparql);
-
-        final AtomicReference<Filter> filter = new AtomicReference<>();
-        final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
-        parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() {
-            @Override
-            public void meet(final Filter node) throws Exception {
-                filter.set(node);
-            }
-        });
-
-        return filter.get();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
deleted file mode 100644
index ee25f8c..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka;
-
-import static java.util.Objects.requireNonNull;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.apache.rya.test.kafka.KafkaTestUtil;
-
-import com.google.common.collect.Sets;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Utility functions that make it easier to test Rya Streams applications.
- */
-@DefaultAnnotation(NonNull.class)
-public class RyaStreamsTestUtil {
-
-    /**
-     * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
-     * the results topic, and ensures the expected results match the read results.
-     *
-     * @param <T> The type of value that will be consumed from the results topic.
-     * @param kafka - The embedded Kafka instance that is being tested with. (not null)
-     * @param statementsTopic - The topic statements will be written to. (not null)
-     * @param resultsTopic - The topic results will be read from. (not null)
-     * @param builder - The streams topology that will be executed. (not null)
-     * @param statements - The statements that will be loaded into the topic. (not null)
-     * @param expected - The expected results. (not null)
-     * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
-     *   values from the results topic. (not null)
-     * @throws Exception If any exception was thrown while running the test.
-     */
-    public static <T> void runStreamProcessingTest(
-            final KafkaTestInstanceRule kafka,
-            final String statementsTopic,
-            final String resultsTopic,
-            final TopologyBuilder builder,
-            final List<VisibilityStatement> statements,
-            final Set<T> expected,
-            final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
-        requireNonNull(kafka);
-        requireNonNull(statementsTopic);
-        requireNonNull(resultsTopic);
-        requireNonNull(builder);
-        requireNonNull(statements);
-        requireNonNull(expected);
-        requireNonNull(expectedDeserializerClass);
-
-        // Explicitly create the topics that are being used.
-        kafka.createTopic(statementsTopic);
-        kafka.createTopic(resultsTopic);
-
-        // Start the streams program.
-        final Properties props = kafka.createBootstrapServerConfig();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT");
-
-        final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
-        streams.cleanUp();
-        try {
-            streams.start();
-
-            // Wait for the streams application to start. Streams only see data after their consumers are connected.
-            Thread.sleep(6000);
-
-            // Load the statements into the input topic.
-            try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
-                    kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
-                new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
-            }
-
-            // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
-            try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) {
-                // Register the topic.
-                consumer.subscribe(Arrays.asList(resultsTopic));
-
-                // Poll for the result.
-                final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
-
-                // Show the correct binding sets results from the job.
-                assertEquals(expected, results);
-            }
-        } finally {
-            streams.close();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
deleted file mode 100644
index 33dc945..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-/**
- * Integration tests the methods of {@link StatementPatternProcessor}.
- */
-public class StatementPatternProcessorIT {
-
-    @Rule
-    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
-
-    @Test
-    public void singlePattern_singleStatement() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
-        final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Create a statement that generate an SP result.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-
-        // Show the correct binding set results from the job.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-
-        final QueryBindingSet bs = new QueryBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void singlePattern_manyStatements() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
-        final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Create some statements where some generates SP results and others do not.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoin")), "b") );
-        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Alice")), "a|b") );
-        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "c") );
-
-        // Show the correct binding set results from the job.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-
-        QueryBindingSet bs = new QueryBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        bs = new QueryBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Bob"));
-        bs.addBinding("otherPerson", vf.createURI("urn:Alice"));
-        expected.add( new VisibilityBindingSet(bs, "a|b") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void multiplePatterns_singleStatement() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final String query = "SELECT * WHERE { "
-                + "?person <urn:talksTo> ?otherPerson . "
-                + "?person ?action <urn:Bob>"
-                + "}";
-        final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Create some statements where some generates SP results and others do not.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-
-        // Show the correct binding set results from the job.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-
-        final QueryBindingSet bs = new QueryBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("action", vf.createURI("urn:talksTo"));
-        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void multiplePatterns_multipleStatements() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final String query = "SELECT * WHERE { "
-                + "?person <urn:talksTo> ?otherPerson ."
-                + "?person ?action <urn:Bob>"
-                + "}";
-        final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Create some statements where some generates SP results and others do not.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "a|b") );
-        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:walksWith"), vf.createURI("urn:Bob")), "b") );
-
-        // Show the correct binding set results from the job.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-
-        QueryBindingSet bs = new QueryBindingSet();
-        bs = new QueryBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("action", vf.createURI("urn:talksTo"));
-        bs.addBinding("otherPerson", vf.createURI("urn:Charlie"));
-        expected.add(new VisibilityBindingSet(bs, "a&(a|b)"));
-
-        bs = new QueryBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("action", vf.createURI("urn:talksTo"));
-        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
-        expected.add(new VisibilityBindingSet(bs, "a"));
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
deleted file mode 100644
index 072469a..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.aggregation;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier.AggregationProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.impl.MapBindingSet;
-
-/**
- * Integration tests {@link AggregationProcessor}.
- */
-public class AggregationProcessorIT {
-
-    @Rule
-    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
-
-    @Test
-    public void count() throws Exception {
-        // A query that figures out how many books each person has.
-        final String sparql =
-                "SELECT ?person (count(?book) as ?bookCount) " +
-                "WHERE { " +
-                    "?person <urn:hasBook> ?book " +
-                "} GROUP BY ?person";
-
-        // Create the statements that will be input into the query..
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "a"));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 2")), "b"));
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
-        expected.add(new VisibilityBindingSet(bs, "a"));
-
-        bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Bob"));
-        bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("bookCount", vf.createLiteral("2", XMLSchema.INTEGER));
-        expected.add(new VisibilityBindingSet(bs, "a&b"));
-
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void sum() throws Exception {
-        // A query that figures out how much food each person has.
-        final String sparql =
-                "SELECT ?person (sum(?foodCount) as ?totalFood) " +
-                "WHERE { " +
-                    "?person <urn:hasFoodType> ?food . " +
-                    "?food <urn:count> ?foodCount . " +
-                "} GROUP BY ?person";
-
-        // Create the statements that will be input into the query..
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:corn")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:apple")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:corn"), vf.createURI("urn:count"), vf.createLiteral(4)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:count"), vf.createLiteral(3)), ""));
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("totalFood", vf.createLiteral("4", XMLSchema.INTEGER));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("totalFood", vf.createLiteral("7", XMLSchema.INTEGER));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void average() throws Exception {
-        // A query that figures out the average age across all people.
-        final String sparql =
-                "SELECT (avg(?age) as ?avgAge) " +
-                "WHERE { " +
-                    "?person <urn:age> ?age " +
-                "}";
-
-        // Create the statements that will be input into the query..
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(3)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(2)), ""));
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("avgAge", vf.createLiteral("3", XMLSchema.DECIMAL));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("avgAge", vf.createLiteral("5", XMLSchema.DECIMAL));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("avgAge", vf.createLiteral("4", XMLSchema.DECIMAL));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void min() throws Exception {
-        // A query that figures out what the youngest age is across all people.
-        final String sparql =
-                "SELECT (min(?age) as ?youngest) " +
-                "WHERE { " +
-                    "?person <urn:age> ?age " +
-                "}";
-
-        // Create the statements that will be input into the query..
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("youngest", vf.createLiteral(13));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("youngest", vf.createLiteral(7));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("youngest", vf.createLiteral(5));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void max() throws Exception {
-        // A query that figures out what the oldest age is across all people.
-        final String sparql =
-                "SELECT (max(?age) as ?oldest) " +
-                "WHERE { " +
-                    "?person <urn:age> ?age " +
-                "}";
-
-        // Create the statements that will be input into the query..
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("oldest", vf.createLiteral(13));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("oldest", vf.createLiteral(14));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("oldest", vf.createLiteral(25));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void multipleGroupByVars() throws Exception {
-        // A query that contains more than one group by variable.
-        final String sparql =
-                "SELECT ?business ?employee (sum(?hours) AS ?totalHours) " +
-                "WHERE {" +
-                    "?employee <urn:worksAt> ?business . " +
-                    "?business <urn:hasTimecardId> ?timecardId . " +
-                    "?employee <urn:hasTimecardId> ?timecardId . " +
-                    "?timecardId <urn:hours> ?hours . " +
-                "} GROUP BY ?business ?employee";
-
-        // Create the statements that will be input into the query.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:timecard1"), vf.createURI("urn:hours"), vf.createLiteral(40)), ""));
-
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:timecard2"), vf.createURI("urn:hours"), vf.createLiteral(25)), ""));
-
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:timecard3"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
-
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:timecard4"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
-
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:CoffeeShop"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:timecard5"), vf.createURI("urn:hours"), vf.createLiteral(12)), ""));
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
-        bs.addBinding("employee", vf.createURI("urn:Alice"));
-        bs.addBinding("totalHours", vf.createLiteral("40", XMLSchema.INTEGER));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
-        bs.addBinding("employee", vf.createURI("urn:Alice"));
-        bs.addBinding("totalHours", vf.createLiteral("65", XMLSchema.INTEGER));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        bs.addBinding("totalHours", vf.createLiteral("28", XMLSchema.INTEGER));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        bs.addBinding("totalHours", vf.createLiteral("56", XMLSchema.INTEGER));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("business", vf.createURI("urn:CoffeeShop"));
-        bs.addBinding("employee", vf.createURI("urn:Alice"));
-        bs.addBinding("totalHours", vf.createLiteral("12", XMLSchema.INTEGER));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void multipleAggregations() throws Exception {
-        // A query that figures out what the youngest and oldest ages are across all people.
-        final String sparql =
-                "SELECT (min(?age) as ?youngest) (max(?age) as ?oldest) " +
-                "WHERE { " +
-                    "?person <urn:age> ?age " +
-                "}";
-
-        // Create the statements that will be input into the query..
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
-        statements.add(new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("youngest", vf.createLiteral(13));
-        bs.addBinding("oldest", vf.createLiteral(13));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("youngest", vf.createLiteral(13));
-        bs.addBinding("oldest", vf.createLiteral(14));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("youngest", vf.createLiteral(7));
-        bs.addBinding("oldest", vf.createLiteral(14));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("youngest", vf.createLiteral(5));
-        bs.addBinding("oldest", vf.createLiteral(14));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        bs = new MapBindingSet();
-        bs.addBinding("youngest", vf.createLiteral(5));
-        bs.addBinding("oldest", vf.createLiteral(25));
-        expected.add(new VisibilityBindingSet(bs, ""));
-
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
deleted file mode 100644
index aaa67ea..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.filter;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.impl.MapBindingSet;
-
-/**
- * Integration tests the methods of {@link FilterProcessor}.
- */
-public class FilterProcessorIT {
-
-    @Rule
-    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
-
-    @Test
-    public void showProcessorWorks() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Get the RDF model objects that will be used to build the query.
-        final String sparql =
-                "SELECT * " +
-                "WHERE { " +
-                    "FILTER(?age < 10)" +
-                    "?person <urn:age> ?age " +
-                "}";
-
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Create the statements that will be input into the query.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(11)), "a"));
-        statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(9)), "a"));
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        final MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("age", vf.createLiteral(9));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
deleted file mode 100644
index 3ff8e8d..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.filter;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.rya.api.function.filter.FilterEvaluator;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.streams.kafka.RdfTestUtil;
-import org.apache.rya.streams.kafka.processors.ProcessorResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
-import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.Filter;
-import org.openrdf.query.impl.MapBindingSet;
-
-/**
- * Unit tests the methods of {@link FilterProcessor}.
- */
-public class FilterProcessorTest {
-
-    @Test
-    public void showFilterFunctionIsCalled() throws Exception {
-        // Read the filter object from a SPARQL query.
-        final Filter filter = RdfTestUtil.getFilter(
-                "SELECT * " +
-                "WHERE { " +
-                    "FILTER(?age < 10)" +
-                    "?person <urn:age> ?age " +
-                "}");
-
-        // Create a Binding Set that will be passed into the Filter function based on the where clause.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("age", vf.createLiteral(9));
-        final VisibilityBindingSet inputVisBs = new VisibilityBindingSet(bs, "a");
-
-        // Mock the processor context that will be invoked.
-        final ProcessorContext context = mock(ProcessorContext.class);
-
-        // Run the test.
-        final FilterProcessor processor = new FilterProcessor(
-                FilterEvaluator.make(filter),
-                result -> ProcessorResult.make(new UnaryResult(result)));
-        processor.init(context);
-        processor.process("key", ProcessorResult.make(new UnaryResult(inputVisBs)));
-
-        // Verify the binding set was passed through.
-        verify(context, times(1)).forward(eq("key"), eq(ProcessorResult.make(new UnaryResult(inputVisBs))));
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
deleted file mode 100644
index c090afa..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.filter;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.indexing.GeoConstants;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.Resource;
-import org.openrdf.model.Statement;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.StatementImpl;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.evaluation.function.Function;
-import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.vividsolutions.jts.geom.Coordinate;
-import com.vividsolutions.jts.geom.Geometry;
-import com.vividsolutions.jts.geom.GeometryFactory;
-import com.vividsolutions.jts.io.WKTWriter;
-
-/**
- * Integration tests the geo methods of {@link FilterProcessor}.
- */
-public class GeoFilterIT {
-    private static final String GEO = "http://www.opengis.net/def/function/geosparql/";
-    private static final GeometryFactory GF = new GeometryFactory();
-    private static final Geometry ZERO = GF.createPoint(new Coordinate(0, 0));
-    private static final Geometry ONE = GF.createPoint(new Coordinate(1, 1));
-
-    @Rule
-    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
-
-    @Test
-    public void showGeoFunctionsRegistered() {
-        int count = 0;
-        final Collection<Function> funcs = FunctionRegistry.getInstance().getAll();
-        for (final Function fun : funcs) {
-            if (fun.getURI().startsWith(GEO)) {
-                count++;
-            }
-        }
-
-        // There are 30 geo functions registered, ensure that there are 30.
-        assertEquals(30, count);
-    }
-
-    @Test
-    public void showProcessorWorks() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Get the RDF model objects that will be used to build the query.
-        final String sparql =
-                "PREFIX geo: <http://www.opengis.net/ont/geosparql#>\n"
-                        + "PREFIX geof: <" + GEO + ">\n"
-                        + "SELECT * \n"
-                        + "WHERE { \n"
-                        + "  <urn:event1> geo:asWKT ?point .\n"
-                        + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
-                        + "}";
-
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Create the statements that will be input into the query.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = getStatements();
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        final MapBindingSet bs = new MapBindingSet();
-        final WKTWriter w = new WKTWriter();
-        bs.addBinding("point", vf.createLiteral(w.write(ZERO), GeoConstants.XMLSCHEMA_OGC_WKT));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
-    }
-
-    private List<VisibilityStatement> getStatements() throws Exception {
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        // geo 2x2 points
-        statements.add(new VisibilityStatement(statement(ZERO), "a"));
-        statements.add(new VisibilityStatement(statement(ONE), "a"));
-        return statements;
-    }
-
-    private static Statement statement(final Geometry geo) {
-        final ValueFactory vf = new ValueFactoryImpl();
-        final Resource subject = vf.createURI("urn:event1");
-        final URI predicate = GeoConstants.GEO_AS_WKT;
-        final WKTWriter w = new WKTWriter();
-        final Value object = vf.createLiteral(w.write(geo), GeoConstants.XMLSCHEMA_OGC_WKT);
-        return new StatementImpl(subject, predicate, object);
-    }
-}
\ No newline at end of file