You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:21 UTC

[04/50] [abbrv] incubator-rya git commit: RYA-377 Implement the JoinProcessor for KafkaStreams.

RYA-377 Implement the JoinProcessor for KafkaStreams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3471cb7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3471cb7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3471cb7e

Branch: refs/heads/master
Commit: 3471cb7eedaadde5352c584cfa4135a4dbd66520
Parents: c4966ff
Author: kchilton2 <ke...@gmail.com>
Authored: Fri Nov 10 13:44:25 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:00 2018 -0500

----------------------------------------------------------------------
 extras/rya.streams/kafka/pom.xml                |   5 +-
 .../StatementPatternProcessorSupplier.java      |   5 +
 .../processors/join/CloseableIterator.java      |  32 ++
 .../processors/join/JoinProcessorSupplier.java  | 191 +++++++
 .../kafka/processors/join/JoinStateStore.java   |  11 +-
 .../processors/join/KeyValueJoinStateStore.java | 259 ++++++++++
 .../apache/rya/streams/kafka/KafkaTestUtil.java |  82 +++
 .../processors/StatementPatternProcessorIT.java | 240 +++++++--
 .../kafka/processors/join/JoinProcessorIT.java  | 507 +++++++++++++++++++
 .../kafka/src/test/resources/log4j.properties   |  29 ++
 10 files changed, 1296 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 33cc985..d3f6891 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -57,11 +57,12 @@ under the License.
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
         </dependency>
-       <dependency>
+        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-streams</artifactId>
         </dependency>
  
+        <!-- Misc. dependencies -->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
@@ -79,4 +80,4 @@ under the License.
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
index 6991783..386fe98 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
@@ -30,6 +30,8 @@ import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.algebra.StatementPattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -70,6 +72,8 @@ public class StatementPatternProcessorSupplier implements ProcessorSupplier<Stri
     @DefaultAnnotation(NonNull.class)
     public static final class StatementPatternProcessor implements Processor<String, VisibilityStatement> {
 
+        private static final Logger log = LoggerFactory.getLogger(StatementPatternProcessor.class);
+
         private final StatementPatternMatcher spMatcher;
         private final ProcessorResultFactory resultFactory;
 
@@ -104,6 +108,7 @@ public class StatementPatternProcessorSupplier implements ProcessorSupplier<Stri
 
                 // Wrap the binding set as a result and forward it to the downstream processor.
                 final ProcessorResult resultValue = resultFactory.make(visBs);
+                log.debug("\nOUTPUT:\n{}", visBs);
                 context.forward(key, resultValue);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
new file mode 100644
index 0000000..9ea927d
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import java.util.Iterator;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An {@link Iterator} that is also {@link AutoCloseable}.
+ *
+ * @param <T> - The type of elements that will be iterated over.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
new file mode 100644
index 0000000..9ed2363
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.rya.api.function.join.IterativeJoin;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
+import org.apache.rya.streams.kafka.processors.ProcessorResultFactory;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Supplies {@link JoinProcessor} instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier {
+
+    private final String stateStoreName;
+    private final IterativeJoin join;
+    private final List<String> joinVars;
+    private final List<String> allVars;
+
+    /**
+     * Constructs an instance of {@link JoinProcessorSupplier}.
+     *
+     * @param stateStoreName - The name of the state store the processor will use. (not null)
+     * @param join - The join function the supplied processor will use. (not null)
+     * @param joinVars - The variables that the supplied processor will join over. (not null)
+     * @param allVars - An ordered list of all the variables that may appear in resulting Binding Sets.
+     *   This list must lead with the same variables and order as {@code joinVars}. (not null)
+     * @param resultFactory - The factory that the supplied processors will use to create results. (not null)
+     * @throws IllegalArgumentException Thrown if {@code allVars} does not start with {@code joinVars}.
+     */
+    public JoinProcessorSupplier(
+            final String stateStoreName,
+            final IterativeJoin join,
+            final List<String> joinVars,
+            final List<String> allVars,
+            final ProcessorResultFactory resultFactory) throws IllegalArgumentException {
+        super(resultFactory);
+        this.stateStoreName = requireNonNull(stateStoreName);
+        this.join = requireNonNull(join);
+        this.joinVars = requireNonNull(joinVars);
+        this.allVars = requireNonNull(allVars);
+
+        if(!allVars.subList(0, joinVars.size()).equals(joinVars)) {
+            throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " +
+                    "Join Vars: " + joinVars + ", All Vars: " + allVars);
+        }
+    }
+
+    @Override
+    public Processor<Object, ProcessorResult> get() {
+        return new JoinProcessor(stateStoreName, join, joinVars, allVars, super.getResultFactory());
+    }
+
+    /**
+     * Joins {@link VisibilityBindingSet}s against all binding sets that were emitted on the other side. This function
+     * does not have an age off policy, so it will match everything that could have ever possibly matched, however this
+     * may become prohibitive for joins that match a large volume of binding sets since this will indefinitely grow
+     * within the state store.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static class JoinProcessor extends RyaStreamsProcessor {
+
+        private static final Logger log = LoggerFactory.getLogger(JoinProcessor.class);
+
+        private final String stateStoreName;
+        private final IterativeJoin join;
+        private final List<String> joinVars;
+        private final List<String> allVars;
+        private final ProcessorResultFactory resultFactory;
+
+        private ProcessorContext context;
+        private JoinStateStore joinStateStore;
+
+        /**
+         * Constructs an instance of {@link JoinProcessor}.
+         *
+         * @param stateStoreName - The name of the state store the processor will use. (not null)
+         * @param join - The join function that the processor will use. (not null)
+         * @param joinVars - The variables that the processor will join over. (not null)
+         * @param allVars - An ordered list of all the variables that may appear in resulting Binding Sets.
+         *   This list must lead with the same variables and order as {@code joinVars}. (not null)
+         * @param resultFactory - The factory that will format this processor's final results
+         *   for the downstream processor. (not null)
+         */
+        public JoinProcessor(
+                final String stateStoreName,
+                final IterativeJoin join,
+                final List<String> joinVars,
+                final List<String> allVars,
+                final ProcessorResultFactory resultFactory) {
+            super(resultFactory);
+            this.stateStoreName = requireNonNull(stateStoreName);
+            this.join = requireNonNull(join);
+            this.joinVars = requireNonNull(joinVars);
+            this.allVars = requireNonNull(allVars);
+            this.resultFactory = requireNonNull(resultFactory);
+
+            if(!allVars.subList(0, joinVars.size()).equals(joinVars)) {
+                throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " +
+                        "Join Vars: " + joinVars + ", All Vars: " + allVars);
+            }
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            // Hold onto the context so that we can foward results.
+            this.context = context;
+
+            // Get a reference to the state store that keeps track of what can be joined with.
+            final KeyValueStore<String, VisibilityBindingSet> stateStore =
+                    (KeyValueStore<String, VisibilityBindingSet>) context.getStateStore( stateStoreName );
+            joinStateStore = new KeyValueJoinStateStore( stateStore, joinVars, allVars );
+        }
+
+        @Override
+        public void process(final Object key, final ProcessorResult value) {
+            // Log the key/value that have been encountered.
+            log.debug("\nINPUT:\nSide: {}\nBinding Set: {}", value.getBinary().getSide(), value.getBinary().getResult());
+
+            // Must be a binary result.
+            final BinaryResult binary = value.getBinary();
+
+            // Store the new result in the state store so that future joins may include it.
+            joinStateStore.store(binary);
+
+            // Fetch the binding sets that the emitted value joins with.
+            try(final CloseableIterator<VisibilityBindingSet> otherSide = joinStateStore.getJoinedValues(binary)) {
+                // Create an iterator that performs the join operation.
+                final Iterator<VisibilityBindingSet> joinResults = binary.getSide() == Side.LEFT ?
+                        join.newLeftResult(binary.getResult(), otherSide) :
+                        join.newRightResult(otherSide, binary.getResult());
+
+                // Format each join result and forward it to the downstream processor.
+                while(joinResults.hasNext()) {
+                    final VisibilityBindingSet joinResult = joinResults.next();
+                    final ProcessorResult resultValue = resultFactory.make(joinResult);
+                    log.debug("\nOUTPUT:\n{}", joinResult);
+                    context.forward(key, resultValue);
+                }
+            } catch (final Exception e) {
+                final String msg = "Problem encountered while iterating over the other side's values within the state store.";
+                log.error(msg, e);
+                throw new RuntimeException(msg, e);
+            }
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+            // Nothing to do.
+        }
+
+        @Override
+        public void close() {
+            // Nothing to do.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
index e53ec68..17a6ebb 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
@@ -18,9 +18,6 @@
  */
 package org.apache.rya.streams.kafka.processors.join;
 
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
 
@@ -37,17 +34,15 @@ public interface JoinStateStore {
     /**
      * Store a {@link VisibilityBindingSet} based on the side it was emitted from.
      *
-     * @param joinVars - An ordered list of the variables that are being joined over. (not null)
      * @param result - The result whose value will be stored. (not null)
      */
-    public void store(List<String> joinVars, BinaryResult result);
+    public void store(BinaryResult result);
 
     /**
      * Get the previously stored {@link VisibilityBindingSet}s that join with the provided result.
      *
-     * @param joinVars - An ordered list of the variables to join over. (not null)
-     * @param result - Defines the values that will be used to join. (not null)
+     * @param result - The value that will be joined with. (not null)
      * @return The {@link VisibilityBinidngSet}s that join with {@code result}.
      */
-    public Iterator<VisibilityBindingSet> getJoinedValues(List<String> joinVars, BinaryResult result);
+    public CloseableIterator<VisibilityBindingSet> getJoinedValues(BinaryResult result);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
new file mode 100644
index 0000000..d73b40e
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
+import org.openrdf.query.impl.MapBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link KeyValueStore} implementation of {@link JoinStateStore}.
+ * </p>
+ * This is a key/value store, so we need to store the {@link VisibilityBindingSet}s using keys that allow us to fetch
+ * all binding sets that join from a specific side. We use the following pattern to accomplish this:
+ * <pre>
+ * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]
+ * </pre>
+ * This will group all binding sets that have been emitted from a specific side and who have the same join variables
+ * next to each other within the store. This isn't enough information to fetch that group, though. We must provide a
+ * start and end key to bound the range that is fetched back. To accomplish this, we place a start of range marker
+ * as the first key for all unique [side]/[join values] groups, and an end of range marker as the last key for each
+ * of those groups.
+ * </p>
+ * The rows follow this pattern:
+ * <pre>
+ * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]0x00
+ * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value],[remainingBindingValues]
+ * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]0xFF
+ * </pre>
+ * </p>
+ * When an iterator over the results is returned, it skips over the start and end of range markers.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KeyValueJoinStateStore implements JoinStateStore {
+
+    private static final Logger log = LoggerFactory.getLogger(KeyValueJoinStateStore.class);
+
+    /**
+     * This is the minimum value in UTF-8 character.
+     */
+    private static final String START_RANGE_SUFFIX = new String(new byte[] { 0x00 } );
+
+    /**
+     * This is the maximum value of a UTF-8 character.
+     */
+    private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF } );
+
+    /**
+     * A default empty value that is stored for a start of range or end of range marker.
+     */
+    private static final VisibilityBindingSet RANGE_MARKER_VALUE = new VisibilityBindingSet(new MapBindingSet(), "");
+
+    private final KeyValueStore<String, VisibilityBindingSet> store;
+    private final List<String> joinVars;
+    private final List<String> allVars;
+
+    /**
+     * Constructs an instance of {@link KeyValueJoinStateStore}.
+     *
+     * @param store - The state store that will be used. (not null)
+     * @param joinVars - The variables that are used to build grouping keys. (not null)
+     * @param allVars - The variables that are used to build full value keys. (not null)
+     * @throws IllegalArgumentException Thrown if {@code allVars} does not start with {@code joinVars}.
+     */
+    public KeyValueJoinStateStore(
+            final KeyValueStore<String, VisibilityBindingSet> store,
+            final List<String> joinVars,
+            final List<String> allVars) throws IllegalArgumentException {
+        this.store = requireNonNull(store);
+        this.joinVars = requireNonNull(joinVars);
+        this.allVars = requireNonNull(allVars);
+
+        for(int i = 0; i < joinVars.size(); i++) {
+            if(!joinVars.get(i).equals(allVars.get(i))) {
+                throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " +
+                        "Join Vars: " + joinVars + ", All Vars: " + allVars);
+            }
+        }
+    }
+
+    @Override
+    public void store(final BinaryResult result) {
+        requireNonNull(result);
+
+        // The join key prefix is an ordered list of values from the binding set that match the join variables.
+        // This is a prefix for every row that holds values for a specific set of join variable values.
+        final Side side = result.getSide();
+        final VisibilityBindingSet bs = result.getResult();
+        final String joinKeyPrefix = makeCommaDelimitedValues(side, joinVars, bs);
+
+        final List<KeyValue<String, VisibilityBindingSet>> values = new ArrayList<>();
+
+        // For each join variable set, we need a start key for scanning,
+        final String startKey = joinKeyPrefix + START_RANGE_SUFFIX;
+        values.add( new KeyValue<>(startKey, RANGE_MARKER_VALUE) );
+
+        // The actual value that was emitted as a result.
+        final String valueKey = makeCommaDelimitedValues(side, allVars, bs);
+        values.add( new KeyValue<>(valueKey, bs) );
+
+        // And the end key for scanning.
+        final String endKey = joinKeyPrefix + END_RANGE_SUFFIX;
+        values.add( new KeyValue<>(endKey, RANGE_MARKER_VALUE) );
+
+        // Write the pairs to the store.
+        log.debug("\nStoring the following values: {}\n", values);
+        store.putAll( values );
+    }
+
+    @Override
+    public CloseableIterator<VisibilityBindingSet> getJoinedValues(final BinaryResult result) {
+        requireNonNull(result);
+
+        // Get an iterator over the values that start with the join variables for the other side.
+        final Side otherSide = result.getSide() == Side.LEFT ? Side.RIGHT : Side.LEFT;
+        final VisibilityBindingSet bs = result.getResult();
+        final String joinKeyPrefix = makeCommaDelimitedValues(otherSide, joinVars, bs);
+
+        final String startKey = joinKeyPrefix + START_RANGE_SUFFIX;
+        final String endKey = joinKeyPrefix + END_RANGE_SUFFIX;
+        final KeyValueIterator<String, VisibilityBindingSet> rangeIt = store.range(startKey, endKey);
+
+        // Return a CloseableIterator over the range's value fields, skipping the start and end entry.
+        return new CloseableIterator<VisibilityBindingSet>() {
+
+            private Optional<VisibilityBindingSet> next = null;
+
+            @Override
+            public boolean hasNext() {
+                // If the iterator has not been initialized yet, read a value in.
+                if(next == null) {
+                    next = readNext();
+                }
+
+                // Return true if there is a next value, otherwise false.
+                return next.isPresent();
+            }
+
+            @Override
+            public VisibilityBindingSet next() {
+                // If the iterator has not been initialized yet, read a value in.
+                if(next == null) {
+                    next = readNext();
+                }
+
+                // It's illegal to call next() when there is no next value.
+                if(!next.isPresent()) {
+                    throw new IllegalStateException("May not invoke next() when there is nothing left in the Iterator.");
+                }
+
+                // Update and return the next value.
+                final VisibilityBindingSet ret = next.get();
+                log.debug("\nReturning: {}", ret);
+                next = readNext();
+                return ret;
+            }
+
+            private Optional<VisibilityBindingSet> readNext() {
+                // Check to see if there's anything left in the iterator.
+                if(!rangeIt.hasNext()) {
+                    return Optional.empty();
+                }
+
+                // Read a candidate key/value pair from the iterator.
+                KeyValue<String, VisibilityBindingSet> candidate = rangeIt.next();
+
+                // If we are initializing, then the first thing we must read is a start of range marker.
+                if(next == null) {
+                    if(!candidate.key.endsWith(START_RANGE_SUFFIX)) {
+                        throw new IllegalStateException("The first key encountered must be a start of range key.");
+                    }
+                    log.debug("Read the start of range markers.\n");
+
+                    // Read a new candidate to skip this one.
+                    if(!rangeIt.hasNext()) {
+                        throw new IllegalStateException("There must be another entry after the start of range key.");
+                    }
+                    candidate = rangeIt.next();
+                }
+
+                // If that value is an end of range key, then we are finished. Otherwise, return it.
+                else if(candidate.key.endsWith(END_RANGE_SUFFIX)) {
+                    log.debug("Read the end of range marker.\n");
+
+                    // If there are more messages, that's a problem.
+                    if(rangeIt.hasNext()) {
+                        throw new IllegalStateException("The end of range marker must be the last key in the iterator.");
+                    }
+
+                    return Optional.empty();
+                }
+
+                // Otherwise we found a new value.
+                return Optional.of( candidate.value );
+            }
+
+            @Override
+            public void close() throws Exception {
+                rangeIt.close();
+            }
+        };
+    }
+
+    /**
+     * A utility function that helps construct the keys used by {@link KeyValueJoinStateStore}.
+     *
+     * @param side - The side value for the key. (not null)
+     * @param vars - Which variables within the binding set to use for the key's values. (not null)
+     * @param bindingSet - The binding set the key is being constructed from. (not null)
+     * @return A comma delimited list of the binding values, leading with the side.
+     */
+    private static String makeCommaDelimitedValues(final Side side, final List<String> vars, final VisibilityBindingSet bindingSet) {
+        requireNonNull(side);
+        requireNonNull(vars);
+        requireNonNull(bindingSet);
+
+        // Make a an ordered list of the binding set variables.
+        final List<String> values = new ArrayList<>();
+        values.add(side.toString());
+        for(final String var : vars) {
+            values.add( bindingSet.hasBinding(var) ? bindingSet.getBinding(var).getValue().toString() : "" );
+        }
+
+        // Return a comma delimited list of those values.
+        return Joiner.on(",").join(values);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
index bff4fdb..0a1a8a4 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
@@ -19,10 +19,13 @@
 package org.apache.rya.streams.kafka;
 
 import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.kafka.clients.consumer.Consumer;
@@ -34,8 +37,20 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 
+import com.google.common.collect.Sets;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
@@ -124,4 +139,71 @@ public final class KafkaTestUtil {
 
         return values;
     }
+
+    /**
+     * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
+     * the results topic, and ensures the expected results match the read results.
+     *
+     * @param kafka - The embedded kafka instance that is being tested with. (not null)
+     * @param statementsTopic - The topic statements will be written to. (not null)
+     * @param resultsTopic - The topic results will be read from. (not null)
+     * @param builder - The streams topology that will be executed. (not null)
+     * @param startupMs - How long to wait for the topology to start before writing the statements.
+     * @param statements - The statements that will be loaded into the topic. (not null)
+     * @param expected - The expected results. (not null)
+     * @throws Exception If any exception was thrown while running the test.
+     */
+    public static void runStreamProcessingTest(
+            final KafkaTestInstanceRule kafka,
+            final String statementsTopic,
+            final String resultsTopic,
+            final TopologyBuilder builder,
+            final int startupMs,
+            final List<VisibilityStatement> statements,
+            final Set<VisibilityBindingSet> expected) throws Exception {
+        requireNonNull(kafka);
+        requireNonNull(statementsTopic);
+        requireNonNull(resultsTopic);
+        requireNonNull(builder);
+        requireNonNull(statements);
+        requireNonNull(expected);
+
+        // Explicitly create the topics that are being used.
+        kafka.createTopic(statementsTopic);
+        kafka.createTopic(resultsTopic);
+
+        // Start the streams program.
+        final Properties props = kafka.createBootstrapServerConfig();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT");
+
+        final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
+        streams.cleanUp();
+        try {
+            streams.start();
+
+            // Wait for the streams application to start. Streams only see data after their consumers are connected.
+            Thread.sleep(startupMs);
+
+            // Load the statements into the input topic.
+            try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
+                    kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
+                new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
+            }
+
+            // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
+            try(Consumer<String, VisibilityBindingSet> consumer = KafkaTestUtil.fromStartConsumer(
+                    kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class)) {
+                // Register the topic.
+                consumer.subscribe(Arrays.asList(resultsTopic));
+
+                // Poll for the result.
+                final Set<VisibilityBindingSet> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
+
+                // Show the correct binding sets results from the job.
+                assertEquals(expected, results);
+            }
+        } finally {
+            streams.close();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
index 1b58b42..371fd0b 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -18,34 +18,25 @@
  */
 package org.apache.rya.streams.kafka.processors;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.RdfTestUtil;
-import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
 import org.apache.rya.streams.kafka.processors.RyaStreamsSinkFormatterSupplier.RyaStreamsSinkFormatter;
 import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -63,7 +54,50 @@ public class StatementPatternProcessorIT {
     public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
 
     @Test
-    public void statementPatternMatches() throws Exception {
+    public void singlePattern_singleStatement() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPattern object that will be evaluated.
+        final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "SP1");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create a statement that generate an SP result.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        final QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void singlePattern_manyStatements() throws Exception {
         // Enumerate some topics that will be re-used
         final String ryaInstance = UUID.randomUUID().toString();
         final UUID queryId = UUID.randomUUID();
@@ -88,48 +122,144 @@ public class StatementPatternProcessorIT {
         // Add a sink that writes the data out to a new Kafka topic.
         builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
 
-        // Start the streams program.
-        final Properties props = kafka.createBootstrapServerConfig();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT");
-
-        final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
-        streams.cleanUp();
-        try {
-            streams.start();
-
-            // Wait for the streams application to start. Streams only see data after their consumers are connected.
-            Thread.sleep(2000);
-
-            // Load some data into the input topic.
-            final ValueFactory vf = new ValueFactoryImpl();
-            final List<VisibilityStatement> statements = new ArrayList<>();
-            statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-
-            try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
-                    kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
-                new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
-            }
-
-            // Wait for the final results to appear in the output topic and verify the expected Binding Set was found.
-            try(Consumer<String, VisibilityBindingSet> consumer = KafkaTestUtil.fromStartConsumer(
-                    kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class)) {
-                // Register the topic.
-                consumer.subscribe(Arrays.asList(resultsTopic));
-
-                // Poll for the result.
-                final List<VisibilityBindingSet> results = KafkaTestUtil.pollForResults(500, 6, 1, consumer);
-
-                // Show the correct binding set results from the job.
-                final QueryBindingSet bs = new QueryBindingSet();
-                bs.addBinding("person", vf.createURI("urn:Alice"));
-                bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
-                final VisibilityBindingSet expected = new VisibilityBindingSet(bs, "a");
-
-                final VisibilityBindingSet result = results.iterator().next();
-                assertEquals(expected, result);
-            }
-        } finally {
-            streams.close();
-        }
+        // Create some statements where some generates SP results and others do not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoin")), "b") );
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Alice")), "a|b") );
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "c") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Alice"));
+        expected.add( new VisibilityBindingSet(bs, "a|b") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void multiplePatterns_singleStatement() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPattern object that will be evaluated.
+        final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
+        final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?person ?action <urn:Bob> }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "SP1", "SP2");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements where some generates SP results and others do not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("action", vf.createURI("urn:talksTo"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void multiplePatterns_multipleStatements() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPattern object that will be evaluated.
+        final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
+        final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?person ?action <urn:Bob> }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "SP1", "SP2");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements where some generates SP results and others do not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "a|b") );
+        statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:walksWith"), vf.createURI("urn:Bob")), "b") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("action", vf.createURI("urn:talksTo"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Charlie"));
+        expected.add( new VisibilityBindingSet(bs, "a|b") );
+
+        bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Charlie"));
+        bs.addBinding("action", vf.createURI("urn:walksWith"));
+        expected.add( new VisibilityBindingSet(bs, "b") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
new file mode 100644
index 0000000..14a559f
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.rya.api.function.join.LeftOuterJoin;
+import org.apache.rya.api.function.join.NaturalJoin;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RdfTestUtil;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import org.apache.rya.streams.kafka.processors.RyaStreamsSinkFormatterSupplier.RyaStreamsSinkFormatter;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration tests the methods of {@link JoinProcessor}.
+ */
+public class JoinProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test(expected = IllegalArgumentException.class)
+    public void badAllVars() throws IllegalArgumentException {
+        new JoinProcessorSupplier(
+                "NATURAL_JOIN",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("person", "employee", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) ));
+    }
+
+    @Test
+    public void newLeftResult() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPatterns that will be evaluated.
+        final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
+        final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
+
+        // Add a processor that handles a natrual join over the SPs.
+        builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
+                "NATURAL_JOIN",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "person", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP");
+
+        // Add a state store for the join processor.
+        final StateStoreSupplier joinStoreSupplier =
+                Stores.create( "NATURAL_JOIN" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "NATURAL_JOIN");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+
+        // Add a statement that will generate a left result that joins with some of those right results.
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void newRightResult() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPatterns that will be evaluated.
+        final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
+        final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
+
+        // Add a processor that handles a natrual join over the SPs.
+        builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
+                "NATURAL_JOIN",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "person", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP");
+
+        // Add a state store for the join processor.
+        final StateStoreSupplier joinStoreSupplier =
+                Stores.create( "NATURAL_JOIN" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "NATURAL_JOIN");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+
+        // Add a statement that will generate a left result that joins with some of those right results.
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void newResultsBothSides() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPatterns that will be evaluated.
+        final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
+        final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
+
+        // Add a processor that handles a natrual join over the SPs.
+        builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
+                "NATURAL_JOIN",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "person", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP");
+
+        // Add a state store for the join processor.
+        final StateStoreSupplier joinStoreSupplier =
+                Stores.create( "NATURAL_JOIN" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "NATURAL_JOIN");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("employee", vf.createURI("urn:Charlie"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add( new VisibilityBindingSet(bs, "a&c") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+    }
+
+    @Test
+    public void manyJoins() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPatterns that will be evaluated.
+        final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
+        final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
+        final StatementPattern sp3 = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:hourlyWage> ?wage }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1,
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
+
+        // Add a processor that handles a natural join over SPs 1 and 2.
+        builder.addProcessor("JOIN1", new JoinProcessorSupplier(
+                "JOIN1",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "person", "business"),
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "SP1", "SP2");
+
+        // Add a processor that handles the third statement pattern.
+        builder.addProcessor("SP3", new StatementPatternProcessorSupplier(sp3,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
+
+        // Add a processor that handles a natural join over JOIN1 and SP3.
+        builder.addProcessor("JOIN2", new JoinProcessorSupplier(
+                "JOIN2",
+                new NaturalJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "business", "wage"),
+                result -> ProcessorResult.make( new UnaryResult(result) )), "JOIN1", "SP3");
+
+        // Setup the join state suppliers.
+        final StateStoreSupplier join1StoreSupplier =
+                Stores.create( "JOIN1" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(join1StoreSupplier, "JOIN1");
+
+        final StateStoreSupplier join2StoreSupplier =
+                Stores.create( "JOIN2" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(join2StoreSupplier, "JOIN2");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "JOIN2");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements that generate a bunch of right SP results.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hourlyWage"), vf.createLiteral(7.25)), "a") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        bs.addBinding("wage", vf.createLiteral(7.25));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected);
+    }
+
+    @Test
+    public void leftJoin() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPatterns that will be evaluated.
+        final StatementPattern requiredSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
+        final StatementPattern optionalSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("REQUIRED_SP", new StatementPatternProcessorSupplier(requiredSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
+
+        // Add a processor that handles the second statement pattern.
+        builder.addProcessor("OPTIONAL_SP", new StatementPatternProcessorSupplier(optionalSp,
+                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
+
+        // Add a processor that handles a natrual join over the SPs.
+        builder.addProcessor("LEFT_JOIN", new JoinProcessorSupplier(
+                "LEFT_JOIN",
+                new LeftOuterJoin(),
+                Lists.newArrayList("employee"),
+                Lists.newArrayList("employee", "person", "business"),
+                result -> ProcessorResult.make( new UnaryResult(result) )), "REQUIRED_SP", "OPTIONAL_SP");
+
+        // Add a state store for the join processor.
+        final StateStoreSupplier joinStoreSupplier =
+                Stores.create( "LEFT_JOIN" )
+                  .withStringKeys()
+                  .withValues(new VisibilityBindingSetSerde())
+                  .inMemory()
+                  .build();
+        builder.addStateStore(joinStoreSupplier, "LEFT_JOIN");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "LEFT_JOIN");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Create some statements that generate a result that includes the optional value as well as one that does not.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "b") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") );
+        statements.add( new VisibilityStatement(
+                vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "d") );
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("employee", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+        expected.add( new VisibilityBindingSet(bs, "a&b") );
+
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("employee", vf.createURI("urn:Charlie"));
+        expected.add( new VisibilityBindingSet(bs, "c") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/resources/log4j.properties b/extras/rya.streams/kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..05e7be5
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Root logger option
+log4j.rootLogger=ERROR, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+
+log4j.logger.org.apache.rya.streams.kafka.processors=debug