You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:50 UTC

[33/50] [abbrv] incubator-rya git commit: RYA-377 Fixed the project layout so that it builds even when the geoindexing profile is not enabled.

RYA-377 Fixed the project layout so that it builds even when the geoindexing profile is not enabled.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/92c85ee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/92c85ee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/92c85ee1

Branch: refs/heads/master
Commit: 92c85ee11030712289df48faed4710359b1b0601
Parents: 923448f
Author: kchilton2 <ke...@gmail.com>
Authored: Wed Jan 3 16:41:08 2018 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500

----------------------------------------------------------------------
 extras/rya.streams/geo/pom.xml                  |  71 +++
 .../kafka/processors/filter/GeoFilterIT.java    | 137 ++++++
 extras/rya.streams/integration/pom.xml          |  69 +++
 .../aggregation/AggregationProcessorIT.java     | 457 +++++++++++++++++++
 .../processors/filter/FilterProcessorIT.java    |  86 ++++
 .../processors/filter/FilterProcessorTest.java  |  75 +++
 .../processors/filter/TemporalFilterIT.java     | 231 ++++++++++
 .../kafka/processors/join/JoinProcessorIT.java  | 316 +++++++++++++
 .../projection/MultiProjectionProcessorIT.java  |  93 ++++
 .../projection/ProjectionProcessorIT.java       |  86 ++++
 .../sp/StatementPatternProcessorIT.java         | 196 ++++++++
 extras/rya.streams/kafka-test/pom.xml           |  51 +++
 .../rya/streams/kafka/RyaStreamsTestUtil.java   | 122 +++++
 extras/rya.streams/kafka/pom.xml                |  32 +-
 .../apache/rya/streams/kafka/RdfTestUtil.java   | 131 ------
 .../rya/streams/kafka/RyaStreamsTestUtil.java   | 122 -----
 .../processors/StatementPatternProcessorIT.java | 195 --------
 .../aggregation/AggregationProcessorIT.java     | 457 -------------------
 .../processors/filter/FilterProcessorIT.java    |  86 ----
 .../processors/filter/FilterProcessorTest.java  |  75 ---
 .../kafka/processors/filter/GeoFilterIT.java    | 137 ------
 .../processors/filter/TemporalFilterIT.java     | 231 ----------
 .../kafka/processors/join/JoinProcessorIT.java  | 316 -------------
 .../projection/MultiProjectionProcessorIT.java  |  92 ----
 .../projection/ProjectionProcessorIT.java       |  85 ----
 extras/rya.streams/pom.xml                      |   3 +
 pom.xml                                         |  10 +
 test/pom.xml                                    |   1 +
 test/rdf/pom.xml                                |  59 +++
 .../apache/rya/streams/kafka/RdfTestUtil.java   | 131 ++++++
 30 files changed, 2212 insertions(+), 1941 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/geo/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/geo/pom.xml b/extras/rya.streams/geo/pom.xml
new file mode 100644
index 0000000..2f179d0
--- /dev/null
+++ b/extras/rya.streams/geo/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.streams.parent</artifactId>
+        <version>3.2.12-incubating-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.streams.geo-test</artifactId>
+    
+    <name>Apache Rya Streams Geo Test</name>
+    <description>
+        A module that contains Geo integration tests with Rya Streams.
+    </description>
+
+    <dependencies>
+        <!-- Rya dependencies -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.functions.geo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.geo.common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.test.kafka</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.streams.kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
new file mode 100644
index 0000000..c090afa
--- /dev/null
+++ b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.filter;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.evaluation.function.Function;
+import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.io.WKTWriter;
+
+/**
+ * Integration tests the geo methods of {@link FilterProcessor}.
+ */
+public class GeoFilterIT {
+    private static final String GEO = "http://www.opengis.net/def/function/geosparql/";
+    private static final GeometryFactory GF = new GeometryFactory();
+    private static final Geometry ZERO = GF.createPoint(new Coordinate(0, 0));
+    private static final Geometry ONE = GF.createPoint(new Coordinate(1, 1));
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test
+    public void showGeoFunctionsRegistered() {
+        int count = 0;
+        final Collection<Function> funcs = FunctionRegistry.getInstance().getAll();
+        for (final Function fun : funcs) {
+            if (fun.getURI().startsWith(GEO)) {
+                count++;
+            }
+        }
+
+        // There are 30 geo functions registered, ensure that there are 30.
+        assertEquals(30, count);
+    }
+
+    @Test
+    public void showProcessorWorks() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the RDF model objects that will be used to build the query.
+        final String sparql =
+                "PREFIX geo: <http://www.opengis.net/ont/geosparql#>\n"
+                        + "PREFIX geof: <" + GEO + ">\n"
+                        + "SELECT * \n"
+                        + "WHERE { \n"
+                        + "  <urn:event1> geo:asWKT ?point .\n"
+                        + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+                        + "}";
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create the statements that will be input into the query.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = getStatements();
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        final MapBindingSet bs = new MapBindingSet();
+        final WKTWriter w = new WKTWriter();
+        bs.addBinding("point", vf.createLiteral(w.write(ZERO), GeoConstants.XMLSCHEMA_OGC_WKT));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    private List<VisibilityStatement> getStatements() throws Exception {
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        // geo 2x2 points
+        statements.add(new VisibilityStatement(statement(ZERO), "a"));
+        statements.add(new VisibilityStatement(statement(ONE), "a"));
+        return statements;
+    }
+
+    private static Statement statement(final Geometry geo) {
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Resource subject = vf.createURI("urn:event1");
+        final URI predicate = GeoConstants.GEO_AS_WKT;
+        final WKTWriter w = new WKTWriter();
+        final Value object = vf.createLiteral(w.write(geo), GeoConstants.XMLSCHEMA_OGC_WKT);
+        return new StatementImpl(subject, predicate, object);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/pom.xml b/extras/rya.streams/integration/pom.xml
new file mode 100644
index 0000000..26ec9f7
--- /dev/null
+++ b/extras/rya.streams/integration/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.streams.parent</artifactId>
+        <version>3.2.12-incubating-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.streams.integration</artifactId>
+    
+    <name>Apache Rya Streams Kafka Integration Tests</name>
+    <description>
+        A module that contains Kafka Integration tests for Rya Streams.
+    </description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.streams.kafka</artifactId>
+        </dependency>
+        
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.test.rdf</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.test.kafka</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.streams.kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
new file mode 100644
index 0000000..072469a
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.aggregation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier.AggregationProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Integration tests {@link AggregationProcessor}.
+ */
+public class AggregationProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
+
+    @Test
+    public void count() throws Exception {
+        // A query that figures out how many books each person has.
+        final String sparql =
+                "SELECT ?person (count(?book) as ?bookCount) " +
+                "WHERE { " +
+                    "?person <urn:hasBook> ?book " +
+                "} GROUP BY ?person";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "a"));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 2")), "b"));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("bookCount", vf.createLiteral("2", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, "a&b"));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void sum() throws Exception {
+        // A query that figures out how much food each person has.
+        final String sparql =
+                "SELECT ?person (sum(?foodCount) as ?totalFood) " +
+                "WHERE { " +
+                    "?person <urn:hasFoodType> ?food . " +
+                    "?food <urn:count> ?foodCount . " +
+                "} GROUP BY ?person";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:corn")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:apple")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:corn"), vf.createURI("urn:count"), vf.createLiteral(4)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:count"), vf.createLiteral(3)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("totalFood", vf.createLiteral("4", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("totalFood", vf.createLiteral("7", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void average() throws Exception {
+        // A query that figures out the average age across all people.
+        final String sparql =
+                "SELECT (avg(?age) as ?avgAge) " +
+                "WHERE { " +
+                    "?person <urn:age> ?age " +
+                "}";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(3)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(2)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("avgAge", vf.createLiteral("3", XMLSchema.DECIMAL));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("avgAge", vf.createLiteral("5", XMLSchema.DECIMAL));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("avgAge", vf.createLiteral("4", XMLSchema.DECIMAL));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void min() throws Exception {
+        // A query that figures out what the youngest age is across all people.
+        final String sparql =
+                "SELECT (min(?age) as ?youngest) " +
+                "WHERE { " +
+                    "?person <urn:age> ?age " +
+                "}";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(13));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(7));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(5));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void max() throws Exception {
+        // A query that figures out what the oldest age is across all people.
+        final String sparql =
+                "SELECT (max(?age) as ?oldest) " +
+                "WHERE { " +
+                    "?person <urn:age> ?age " +
+                "}";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("oldest", vf.createLiteral(13));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("oldest", vf.createLiteral(14));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("oldest", vf.createLiteral(25));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void multipleGroupByVars() throws Exception {
+        // A query that contains more than one group by variable.
+        final String sparql =
+                "SELECT ?business ?employee (sum(?hours) AS ?totalHours) " +
+                "WHERE {" +
+                    "?employee <urn:worksAt> ?business . " +
+                    "?business <urn:hasTimecardId> ?timecardId . " +
+                    "?employee <urn:hasTimecardId> ?timecardId . " +
+                    "?timecardId <urn:hours> ?hours . " +
+                "} GROUP BY ?business ?employee";
+
+        // Create the statements that will be input into the query.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:timecard1"), vf.createURI("urn:hours"), vf.createLiteral(40)), ""));
+
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:timecard2"), vf.createURI("urn:hours"), vf.createLiteral(25)), ""));
+
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:timecard3"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
+
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:timecard4"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
+
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:CoffeeShop"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:timecard5"), vf.createURI("urn:hours"), vf.createLiteral(12)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+        bs.addBinding("employee", vf.createURI("urn:Alice"));
+        bs.addBinding("totalHours", vf.createLiteral("40", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+        bs.addBinding("employee", vf.createURI("urn:Alice"));
+        bs.addBinding("totalHours", vf.createLiteral("65", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("totalHours", vf.createLiteral("28", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("totalHours", vf.createLiteral("56", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("business", vf.createURI("urn:CoffeeShop"));
+        bs.addBinding("employee", vf.createURI("urn:Alice"));
+        bs.addBinding("totalHours", vf.createLiteral("12", XMLSchema.INTEGER));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void multipleAggregations() throws Exception {
+        // A query that figures out what the youngest and oldest ages are across all people.
+        final String sparql =
+                "SELECT (min(?age) as ?youngest) (max(?age) as ?oldest) " +
+                "WHERE { " +
+                    "?person <urn:age> ?age " +
+                "}";
+
+        // Create the statements that will be input into the query..
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+        statements.add(new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(13));
+        bs.addBinding("oldest", vf.createLiteral(13));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(13));
+        bs.addBinding("oldest", vf.createLiteral(14));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(7));
+        bs.addBinding("oldest", vf.createLiteral(14));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(5));
+        bs.addBinding("oldest", vf.createLiteral(14));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        bs = new MapBindingSet();
+        bs.addBinding("youngest", vf.createLiteral(5));
+        bs.addBinding("oldest", vf.createLiteral(25));
+        expected.add(new VisibilityBindingSet(bs, ""));
+
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
new file mode 100644
index 0000000..aaa67ea
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.filter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Integration tests the methods of {@link FilterProcessor}.
+ */
+public class FilterProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test
+    public void showProcessorWorks() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the RDF model objects that will be used to build the query.
+        final String sparql =
+                "SELECT * " +
+                "WHERE { " +
+                    "FILTER(?age < 10)" +
+                    "?person <urn:age> ?age " +
+                "}";
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create the statements that will be input into the query.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(11)), "a"));
+        statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(9)), "a"));
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("age", vf.createLiteral(9));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
new file mode 100644
index 0000000..3ff8e8d
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.filter;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.rya.api.function.filter.FilterEvaluator;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.RdfTestUtil;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Unit tests the methods of {@link FilterProcessor}.
+ */
+public class FilterProcessorTest {
+
+    @Test
+    public void showFilterFunctionIsCalled() throws Exception {
+        // Read the filter object from a SPARQL query.
+        final Filter filter = RdfTestUtil.getFilter(
+                "SELECT * " +
+                "WHERE { " +
+                    "FILTER(?age < 10)" +
+                    "?person <urn:age> ?age " +
+                "}");
+
+        // Create a Binding Set that will be passed into the Filter function based on the where clause.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("age", vf.createLiteral(9));
+        final VisibilityBindingSet inputVisBs = new VisibilityBindingSet(bs, "a");
+
+        // Mock the processor context that will be invoked.
+        final ProcessorContext context = mock(ProcessorContext.class);
+
+        // Run the test.
+        final FilterProcessor processor = new FilterProcessor(
+                FilterEvaluator.make(filter),
+                result -> ProcessorResult.make(new UnaryResult(result)));
+        processor.init(context);
+        processor.process("key", ProcessorResult.make(new UnaryResult(inputVisBs)));
+
+        // Verify the binding set was passed through.
+        verify(context, times(1)).forward(eq("key"), eq(ProcessorResult.make(new UnaryResult(inputVisBs))));
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
new file mode 100644
index 0000000..22a883b
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.filter;
+
+import static org.junit.Assert.assertEquals;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.evaluation.function.Function;
+import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Integration tests the temporal methods of {@link FilterProcessor}.
+ */
+public class TemporalFilterIT {
+    private static final ValueFactory vf = new ValueFactoryImpl();
+    private static final String TEMPORAL = "http://rya.apache.org/ns/temporal";
+    private static final ZonedDateTime TIME = ZonedDateTime.parse("2015-12-30T12:00:00Z");
+    private static final ZonedDateTime TIME_10 = ZonedDateTime.parse("2015-12-30T12:00:10Z");
+    private static final ZonedDateTime TIME_20 = ZonedDateTime.parse("2015-12-30T12:00:20Z");
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
+
+    @Test
+    public void temporalFunctionsRegistered() {
+        int count = 0;
+        final Collection<Function> funcs = FunctionRegistry.getInstance().getAll();
+        for (final Function fun : funcs) {
+            if (fun.getURI().startsWith(TEMPORAL)) {
+                count++;
+            }
+        }
+
+        // There are 4 temporal functions registered, ensure that there are 4.
+        assertEquals(4, count);
+    }
+
+    @Test
+    public void showEqualsWorks() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the RDF model objects that will be used to build the query.
+        final String sparql =
+                "PREFIX time: <http://www.w3.org/2006/time/> \n"
+                        + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n"
+                        + "SELECT * \n"
+                        + "WHERE { \n"
+                        + "  <urn:time> time:atTime ?date .\n"
+                        + " FILTER(tempf:equals(?date, \"" + TIME.toString() + "\")) "
+                        + "}";
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create the statements that will be input into the query.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = getStatements();
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("date", vf.createLiteral(TIME.toString()));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void showBeforeWorks() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the RDF model objects that will be used to build the query.
+        final String sparql =
+                "PREFIX time: <http://www.w3.org/2006/time/> \n"
+                        + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n"
+                        + "SELECT * \n"
+                        + "WHERE { \n"
+                        + "  <urn:time> time:atTime ?date .\n"
+                        + " FILTER(tempf:before(?date, \"" + TIME_10.toString() + "\")) "
+                        + "}";
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create the statements that will be input into the query.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = getStatements();
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("date", vf.createLiteral(TIME.toString()));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void showAfterWorks() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the RDF model objects that will be used to build the query.
+        final String sparql =
+                "PREFIX time: <http://www.w3.org/2006/time/> \n"
+                        + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n"
+                        + "SELECT * \n"
+                        + "WHERE { \n"
+                        + "  <urn:time> time:atTime ?date .\n"
+                        + " FILTER(tempf:after(?date, \"" + TIME_10.toString() + "\")) "
+                        + "}";
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create the statements that will be input into the query.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = getStatements();
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("date", vf.createLiteral(TIME_20.toString()));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void showWithinWorks() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the RDF model objects that will be used to build the query.
+        final String sparql =
+                "PREFIX time: <http://www.w3.org/2006/time/> \n"
+                        + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n"
+                        + "SELECT * \n"
+                        + "WHERE { \n"
+                        + "  <urn:time> time:atTime ?date .\n"
+                        + " FILTER(tempf:within(?date, \"" + TIME.toString() + "/" + TIME_20.toString() + "\")) "
+                        + "}";
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create the statements that will be input into the query.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = getStatements();
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("date", vf.createLiteral(TIME_10.toString()));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    private List<VisibilityStatement> getStatements() throws Exception {
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(statement(TIME), "a"));
+        statements.add(new VisibilityStatement(statement(TIME_10), "a"));
+        statements.add(new VisibilityStatement(statement(TIME_20), "a"));
+        return statements;
+    }
+
+    private static Statement statement(final ZonedDateTime time) {
+        final Resource subject = vf.createURI("urn:time");
+        final URI predicate = vf.createURI("http://www.w3.org/2006/time/atTime");
+        final Value object = vf.createLiteral(time.toString());
+        return new StatementImpl(subject, predicate, object);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
new file mode 100644
index 0000000..bdb9be6
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.join.NaturalJoin;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration tests the methods of {@link JoinProcessor}.
+ */
+public class JoinProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test(expected = IllegalArgumentException.class)
+    public void badAllVars() throws IllegalArgumentException {
+        new JoinProcessorSupplier(
+                "NATURAL_JOIN",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("person", "employee", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) ));
+    }
+
+    @Test
+    public void newLeftResult() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final String query =
+                "SELECT * WHERE { " +
+                    "?person <urn:talksTo> ?employee ." +
+                    "?employee <urn:worksAt> ?business" +
+                " }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+
+        // Add a statement that will generate a left result that joins with some of those right results.
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void newRightResult() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final String query =
+                "SELECT * WHERE { " +
+                    "?person <urn:talksTo> ?employee ." +
+                    "?employee <urn:worksAt> ?business" +
+                " }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+
+        // Add a statement that will generate a left result that joins with some of those right results.
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void newResultsBothSides() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final String query =
+                "SELECT * WHERE { " +
+                    "?person <urn:talksTo> ?employee ." +
+                    "?employee <urn:worksAt> ?business" +
+                " }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("employee", vf.createURI("urn:Charlie"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "a&c") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void manyJoins() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final String query =
+                "SELECT * WHERE { " +
+                    "?person <urn:talksTo> ?employee ." +
+                    "?employee <urn:worksAt> ?business ." +
+                    "?employee <urn:hourlyWage> ?wage ." +
+                " }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hourlyWage"), vf.createLiteral(7.25)), "a") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        bs.addBinding("wage", vf.createLiteral(7.25));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+
+    @Test
+    public void leftJoin() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Setup a topology.
+        final String query =
+                "SELECT * WHERE { " +
+                    "?person <urn:talksTo> ?employee ." +
+                    "OPTIONAL{ ?employee <urn:worksAt> ?business } " +
+                " }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create some statements that generate a result that includes the optional value as well as one that does not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "d") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("employee", vf.createURI("urn:Charlie"));
+        expected.add( new VisibilityBindingSet(bs, "c") );
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
new file mode 100644
index 0000000..a560294
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.projection;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.BNode;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.RDF;
+
+/**
+ * Integration tests the methods of {@link MultiProjectionProcessor}.
+ */
+public class MultiProjectionProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test
+    public void showProcessorWorks() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Create a topology for the Query that will be tested.
+        final String sparql =
+                "CONSTRUCT {" +
+                    "_:b a <urn:movementObservation> ; " +
+                    "<urn:location> ?location ; " +
+                    "<urn:direction> ?direction ; " +
+                "}" +
+                "WHERE {" +
+                    "?thing <urn:corner> ?location ." +
+                    "?thing <urn:compass> ?direction." +
+                "}";
+
+        final String bNodeId = UUID.randomUUID().toString();
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, () -> bNodeId);
+
+        // Create the statements that will be input into the query.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:compass"), vf.createURI("urn:NW")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:corner"), vf.createURI("urn:corner1")), "a") );
+
+        // Make the expected results.
+        final Set<VisibilityStatement> expected = new HashSet<>();
+        final BNode blankNode = vf.createBNode(bNodeId);
+
+        expected.add(new VisibilityStatement(vf.createStatement(blankNode, RDF.TYPE, vf.createURI("urn:movementObservation")), "a"));
+        expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:direction"), vf.createURI("urn:NW")), "a"));
+        expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a"));
+
+        // Run the test.
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityStatementDeserializer.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
new file mode 100644
index 0000000..322bba9
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.projection;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link StatementPatternProcessor}.
+ */
+public class ProjectionProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test
+    public void showProcessorWorks() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Create a topology for the Query that will be tested.
+        final String sparql =
+                "SELECT (?person AS ?p) ?otherPerson " +
+                "WHERE { " +
+                    "?person <urn:talksTo> ?otherPerson . " +
+                "}";
+
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Load some data into the input topic.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        final MapBindingSet expectedBs = new MapBindingSet();
+        expectedBs.addBinding("p", vf.createURI("urn:Alice"));
+        expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add(new VisibilityBindingSet(expectedBs, "a"));
+
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
+    }
+}
\ No newline at end of file