You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:57 UTC

[40/50] [abbrv] incubator-rya git commit: RYA-377 Added Construct query support to Rya Streams.

RYA-377 Added Construct query support to Rya Streams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/da63fd12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/da63fd12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/da63fd12

Branch: refs/heads/master
Commit: da63fd125e16779df3536b4172fa66e36561e4ff
Parents: 538393f
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Nov 21 18:49:13 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500

----------------------------------------------------------------------
 .../projection/MultiProjectionEvaluator.java    |   4 +-
 .../projection/ProjectionEvaluator.java         |   5 +-
 .../kafka/topology/TopologyBuilderFactory.java  |  40 ++-
 .../streams/kafka/topology/TopologyFactory.java | 178 +++++++++++---
 .../apache/rya/streams/kafka/KafkaTestUtil.java |  18 +-
 .../processors/StatementPatternProcessorIT.java |  18 +-
 .../kafka/processors/join/JoinProcessorIT.java  | 243 ++++---------------
 .../projection/MultiProjectionProcessorIT.java  |  87 +------
 .../projection/ProjectionProcessorIT.java       |  42 +---
 .../kafka/topology/TopologyFactoryTest.java     |  32 ++-
 10 files changed, 280 insertions(+), 387 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java
index e2b7046..0e9093d 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java
@@ -57,9 +57,9 @@ public class MultiProjectionEvaluator {
     /**
      * Constructs an instance of {@link MultiProjection}.
      *
-     * @param projections - The {@link ProjectionEvaluators} that handle each projection within the multi. (not null)
+     * @param projections - The {@link ProjectionEvaluators} that handle each projection within the MultiProjection. (not null)
      * @param blankNodeSourceNames - If there are blank nodes in the projection, this is a set of their names
-     *   so that they may be re-label to have the same node IDs. (not null)
+     *   so that they may be re-labeled to have the same node IDs. (not null)
      * @param bNodeIdFactory - Creates the IDs for Blank Nodes. (not null)
      */
     public MultiProjectionEvaluator(

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java
index a0b59c1..4b37448 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java
@@ -179,7 +179,10 @@ public class ProjectionEvaluator {
                 }
             }
 
-            result.addBinding(elem.getTargetName(), value);
+            // Only add the value if there is one. There may not be one if a binding is optional.
+            if(value != null) {
+                result.addBinding(elem.getTargetName(), value);
+            }
         }
 
         return new VisibilityBindingSet(result, bs.getVisibility());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
index 9e9dd92..666cbb0 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
@@ -19,8 +19,8 @@
 package org.apache.rya.streams.kafka.topology;
 
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.BNodeIdFactory;
 import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.TupleExpr;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -32,17 +32,33 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 public interface TopologyBuilderFactory {
 
     /**
-     * Builds a {@link TopologyBuilder} based on the provided sparql query where
-     * each {@link TupleExpr} in the parsed query is a processor in the
-     * topology.
+     * Builds a {@link TopologyBuilder} based on the provided SPARQL query that
+     * pulls from {@code statementsTopic} for input and writes the query's results
+     * to {@code resultsTopic}.
      *
      * @param sparqlQuery - The SPARQL query to build a topology for. (not null)
-     * @param statementTopic - The topic for the source to read from. (not null)
-     * @param statementTopic - The topic for the sink to write to. (not null)
-     * @return - The created {@link TopologyBuilder}.
-     * @throws MalformedQueryException - The provided query is not a valid
-     *         SPARQL query.
+     * @param statementsTopic - The topic for the source to read from. (not null)
+     * @param resultsTopic - The topic for the sink to write to. (not null)
+     * @param bNodeIdFactory - A factory that generates Blank Node IDs if any are required. (not null)
+     * @return The created {@link TopologyBuilder}.
+     * @throws MalformedQueryException - The provided query is not a valid SPARQL query.
+     * @throws TopologyBuilderException - A problem occurred while constructing the topology.
      */
-    public TopologyBuilder build(final String sparqlQuery, final String statementTopic, final String resultTopic)
-            throws Exception;
-}
+    public TopologyBuilder build(
+            final String sparqlQuery,
+            final String statementsTopic,
+            final String resultsTopic,
+            final BNodeIdFactory bNodeIdFactory) throws MalformedQueryException, TopologyBuilderException;
+
+    /**
+     * An Exception thrown when a problem occurs when constructing the processor
+     * topology in the {@link TopologyFactory}.
+     */
+    public static class TopologyBuilderException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        public TopologyBuilderException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
index 782a58b..08f3625 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
@@ -30,6 +30,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.processor.Processor;
@@ -40,6 +41,9 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.rya.api.function.join.IterativeJoin;
 import org.apache.rya.api.function.join.LeftOuterJoin;
 import org.apache.rya.api.function.join.NaturalJoin;
+import org.apache.rya.api.function.projection.BNodeIdFactory;
+import org.apache.rya.api.function.projection.MultiProjectionEvaluator;
+import org.apache.rya.api.function.projection.ProjectionEvaluator;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.streams.kafka.processors.ProcessorResult;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
@@ -48,17 +52,22 @@ import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
 import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
 import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier;
 import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier;
+import org.apache.rya.streams.kafka.processors.output.StatementOutputFormatterSupplier;
+import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier;
 import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
 import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.algebra.BinaryTupleOperator;
 import org.openrdf.query.algebra.Extension;
 import org.openrdf.query.algebra.Join;
 import org.openrdf.query.algebra.LeftJoin;
+import org.openrdf.query.algebra.MultiProjection;
 import org.openrdf.query.algebra.Projection;
 import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.Reduced;
 import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.algebra.TupleExpr;
 import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
@@ -85,34 +94,28 @@ public class TopologyFactory implements TopologyBuilderFactory {
 
     private List<ProcessorEntry> processorEntryList;
 
-    /**
-     * Builds a {@link TopologyBuilder} based on the provided sparql query.
-     *
-     * @param sparqlQuery - The SPARQL query to build a topology for. (not null)
-     * @param statementTopic - The topic for the source to read from. (not null)
-     * @param resultTopic - The topic for the sink to write to. (not null)
-     * @return - The created {@link TopologyBuilder}.
-     * @throws MalformedQueryException - The provided query is not a valid SPARQL query.
-     * @throws TopologyBuilderException - A problem occurred while constructing the topology.
-     */
     @Override
-    public TopologyBuilder build(final String sparqlQuery, final String statementTopic, final String resultTopic)
+    public TopologyBuilder build(
+            final String sparqlQuery,
+            final String statementsTopic,
+            final String resultsTopic,
+            final BNodeIdFactory bNodeIdFactory)
             throws MalformedQueryException, TopologyBuilderException {
         requireNonNull(sparqlQuery);
-        requireNonNull(statementTopic);
-        requireNonNull(resultTopic);
+        requireNonNull(statementsTopic);
+        requireNonNull(resultsTopic);
 
         final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparqlQuery, null);
         final TopologyBuilder builder = new TopologyBuilder();
 
         final TupleExpr expr = parsedQuery.getTupleExpr();
-        final QueryVisitor visitor = new QueryVisitor();
+        final QueryVisitor visitor = new QueryVisitor(bNodeIdFactory);
         expr.visit(visitor);
 
         processorEntryList = visitor.getProcessorEntryList();
         final Map<TupleExpr, String> idMap = visitor.getIDs();
         // add source node
-        builder.addSource(SOURCE, new StringDeserializer(), new VisibilityStatementDeserializer(), statementTopic);
+        builder.addSource(SOURCE, new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
 
         // processing the processor entry list in reverse order means we go from leaf
         // nodes -> parent nodes.
@@ -146,11 +149,12 @@ public class TopologyFactory implements TopologyBuilderFactory {
             }
         }
 
-        // convert processing results to visibility binding sets
-        builder.addProcessor("OUTPUT_FORMATTER", new BindingSetOutputFormatterSupplier(), entry.getID());
+        // Add a formatter that converts the ProcessorResults into the output format.
+        final SinkEntry<?,?> sinkEntry = visitor.getSinkEntry();
+        builder.addProcessor("OUTPUT_FORMATTER", sinkEntry.getFormatterSupplier(), entry.getID());
 
-        // add sink
-        builder.addSink(SINK, resultTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "OUTPUT_FORMATTER");
+        // Add the sink.
+        builder.addSink(SINK, resultsTopic, sinkEntry.getKeySerializer(), sinkEntry.getValueSerializer(), "OUTPUT_FORMATTER");
 
         return builder;
     }
@@ -264,16 +268,82 @@ public class TopologyFactory implements TopologyBuilderFactory {
     }
 
     /**
+     * Information about how key/value pairs need to be written to the sink.
+     *
+     * @param <K> - The type of Key that the sink uses.
+     * @param <V> - The type of Value that the sink uses.
+     */
+    private final static class SinkEntry<K, V> {
+
+        private final ProcessorSupplier<Object, ProcessorResult> formatterSupplier;
+        private final Serializer<K> keySerializer;
+        private final Serializer<V> valueSerializer;
+
+        /**
+         * Constructs an instance of {@link SinkEntry}.
+         *
+         * @param formatterSupplier - Formats {@link ProcessingResult}s for output to the sink. (not null)
+         * @param keySerializer - Serializes keys that are used to write to the sink. (not null)
+         * @param valueSerializer - Serializes values that are used to write to the sink. (not null)
+         */
+        public SinkEntry(
+                final ProcessorSupplier<Object, ProcessorResult> formatterSupplier,
+                final Serializer<K> keySerializer,
+                final Serializer<V> valueSerializer) {
+            this.keySerializer = requireNonNull(keySerializer);
+            this.valueSerializer = requireNonNull(valueSerializer);
+            this.formatterSupplier = requireNonNull(formatterSupplier);
+        }
+
+        /**
+         * @return Formats {@link ProcessingResult}s for output to the sink.
+         */
+        public ProcessorSupplier<Object, ProcessorResult> getFormatterSupplier() {
+            return formatterSupplier;
+        }
+
+        /**
+         * @return Serializes keys that are used to write to the sink.
+         */
+        public Serializer<K> getKeySerializer() {
+            return keySerializer;
+        }
+
+        /**
+         * @return Serializes values that are used to write to the sink.
+         */
+        public Serializer<V> getValueSerializer() {
+            return valueSerializer;
+        }
+    }
+
+    /**
      * Visits each node in a {@link TupleExpr} and creates a
      * {@link ProcessorSupplier} and meta information needed for creating a
      * {@link TopologyBuilder}.
      */
     final static class QueryVisitor extends QueryModelVisitorBase<TopologyBuilderException> {
-        // Each node needs a ProcessorEntry to be a processor node in the
-        // TopologyBuilder.
+        // Each node needs a ProcessorEntry to be a processor node in the TopologyBuilder.
         private final List<ProcessorEntry> entries = new ArrayList<>();
         private final Map<TupleExpr, String> idMap = new HashMap<>();
 
+        // Default to a Binding Set outputting sink entry.
+        private SinkEntry<?, ?> sinkEntry = new SinkEntry<>(
+                new BindingSetOutputFormatterSupplier(),
+                new StringSerializer(),
+                new VisibilityBindingSetSerializer());
+
+        private final BNodeIdFactory bNodeIdFactory;
+
+        /**
+         * Constructs an instance of {@link QueryVisitor}.
+         *
+         * @param bNodeIdFactory - Builds Blank Node IDs for the query's results. (not null)
+         */
+        public QueryVisitor(final BNodeIdFactory bNodeIdFactory) {
+            this.bNodeIdFactory = requireNonNull(bNodeIdFactory);
+        }
+
         /**
          * @return The {@link ProcessorEntry}s used to create a Topology.
          */
@@ -288,6 +358,23 @@ public class TopologyFactory implements TopologyBuilderFactory {
             return idMap;
         }
 
+        /**
+         * @return Information about how values are to be output by the topology to the results sink.
+         */
+        public SinkEntry<?, ?> getSinkEntry() {
+            return sinkEntry;
+        }
+
+        @Override
+        public void meet(final Reduced node) throws TopologyBuilderException {
+            // This indicates we're outputting VisibilityStatements.
+            sinkEntry = new SinkEntry<>(
+                    new StatementOutputFormatterSupplier(),
+                    new StringSerializer(),
+                    new VisibilityStatementSerializer());
+            super.meet(node);
+        }
+
         @Override
         public void meet(final StatementPattern node) throws TopologyBuilderException {
             // topology parent for Statement Patterns will always be a source
@@ -303,14 +390,39 @@ public class TopologyFactory implements TopologyBuilderFactory {
         public void meet(final Projection node) throws TopologyBuilderException {
             final String id = PROJECTION_PREFIX + UUID.randomUUID();
             final Optional<Side> side = getSide(node);
-            TupleExpr arg = node.getArg();
+
             // If the arg is an Extension, there are rebindings that need to be
             // ignored since they do not have a processor node.
-            if (arg instanceof Extension) {
-                arg = ((Extension) arg).getArg();
+            TupleExpr downstreamNode = node.getArg();
+            if (downstreamNode instanceof Extension) {
+                downstreamNode = ((Extension) downstreamNode).getArg();
+            }
+
+            final ProjectionProcessorSupplier supplier = new ProjectionProcessorSupplier(
+                    ProjectionEvaluator.make(node),
+                    result -> getResult(side, result));
+
+            entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(downstreamNode)));
+            idMap.put(node, id);
+            super.meet(node);
+        }
+
+        @Override
+        public void meet(final MultiProjection node) throws TopologyBuilderException {
+            final String id = PROJECTION_PREFIX + UUID.randomUUID();
+            final Optional<Side> side = getSide(node);
+
+            final MultiProjectionProcessorSupplier supplier = new MultiProjectionProcessorSupplier(
+                    MultiProjectionEvaluator.make(node, bNodeIdFactory),
+                    result -> getResult(side, result));
+
+            // If the arg is an Extension, then this node's grandchild is the next processing node.
+            TupleExpr downstreamNode = node.getArg();
+            if (downstreamNode instanceof Extension) {
+                downstreamNode = ((Extension) downstreamNode).getArg();
             }
-            final ProjectionProcessorSupplier supplier = new ProjectionProcessorSupplier(node.getProjectionElemList(), result -> getResult(side, result));
-            entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(arg)));
+
+            entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(downstreamNode)));
             idMap.put(node, id);
             super.meet(node);
         }
@@ -399,16 +511,4 @@ public class TopologyFactory implements TopologyBuilderFactory {
             }
         }
     }
-
-    /**
-     * An Exception thrown when a problem occurs when constructing the processor
-     * topology in the {@link TopologyFactory}.
-     */
-    public class TopologyBuilderException extends Exception {
-        private static final long serialVersionUID = 1L;
-
-        public TopologyBuilderException(final String message, final Throwable cause) {
-            super(message, cause);
-        }
-    }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
index 0a1a8a4..8898284 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
@@ -42,10 +42,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 
@@ -144,29 +142,34 @@ public final class KafkaTestUtil {
      * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
      * the results topic, and ensures the expected results match the read results.
      *
-     * @param kafka - The embedded kafka instance that is being tested with. (not null)
+     * @param <T> The type of value that will be consumed from the results topic.
+     * @param kafka - The embedded Kafka instance that is being tested with. (not null)
      * @param statementsTopic - The topic statements will be written to. (not null)
      * @param resultsTopic - The topic results will be read from. (not null)
      * @param builder - The streams topology that will be executed. (not null)
      * @param startupMs - How long to wait for the topology to start before writing the statements.
      * @param statements - The statements that will be loaded into the topic. (not null)
      * @param expected - The expected results. (not null)
+     * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
+     *   values from the results topic. (not null)
      * @throws Exception If any exception was thrown while running the test.
      */
-    public static void runStreamProcessingTest(
+    public static <T> void runStreamProcessingTest(
             final KafkaTestInstanceRule kafka,
             final String statementsTopic,
             final String resultsTopic,
             final TopologyBuilder builder,
             final int startupMs,
             final List<VisibilityStatement> statements,
-            final Set<VisibilityBindingSet> expected) throws Exception {
+            final Set<T> expected,
+            final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
         requireNonNull(kafka);
         requireNonNull(statementsTopic);
         requireNonNull(resultsTopic);
         requireNonNull(builder);
         requireNonNull(statements);
         requireNonNull(expected);
+        requireNonNull(expectedDeserializerClass);
 
         // Explicitly create the topics that are being used.
         kafka.createTopic(statementsTopic);
@@ -191,13 +194,12 @@ public final class KafkaTestUtil {
             }
 
             // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
-            try(Consumer<String, VisibilityBindingSet> consumer = KafkaTestUtil.fromStartConsumer(
-                    kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class)) {
+            try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) {
                 // Register the topic.
                 consumer.subscribe(Arrays.asList(resultsTopic));
 
                 // Poll for the result.
-                final Set<VisibilityBindingSet> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
+                final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
 
                 // Show the correct binding sets results from the job.
                 assertEquals(expected, results);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
index e55ec2e..3e0e64d 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -25,11 +25,13 @@ import java.util.Set;
 import java.util.UUID;
 
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Rule;
@@ -57,7 +59,7 @@ public class StatementPatternProcessorIT {
         // Setup a topology.
         final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
         final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Create a statement that generate an SP result.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -73,7 +75,7 @@ public class StatementPatternProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -87,7 +89,7 @@ public class StatementPatternProcessorIT {
         // Setup a topology.
         final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
         final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Create some statements where some generates SP results and others do not.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -111,7 +113,7 @@ public class StatementPatternProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a|b") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -128,7 +130,7 @@ public class StatementPatternProcessorIT {
                 + "?person ?action <urn:Bob>"
                 + "}";
         final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Create some statements where some generates SP results and others do not.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -145,7 +147,7 @@ public class StatementPatternProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -162,7 +164,7 @@ public class StatementPatternProcessorIT {
                 + "?person ?action <urn:Bob>"
                 + "}";
         final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Create some statements where some generates SP results and others do not.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -188,6 +190,6 @@ public class StatementPatternProcessorIT {
         expected.add(new VisibilityBindingSet(bs, "a"));
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index dbad15c..b137a9a 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -24,35 +24,23 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.rya.api.function.join.LeftOuterJoin;
 import org.apache.rya.api.function.join.NaturalJoin;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RdfTestUtil;
 import org.apache.rya.streams.kafka.processors.ProcessorResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
 import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor;
-import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.collect.Lists;
@@ -84,12 +72,13 @@ public class JoinProcessorIT {
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
         // Setup a topology.
-        final String query = "SELECT * WHERE { "
-                + "?person <urn:talksTo> ?employee ."
-                + "?employee <urn:worksAt> ?business"
-                + " }";
+        final String query =
+                "SELECT * WHERE { " +
+                    "?person <urn:talksTo> ?employee ." +
+                    "?employee <urn:worksAt> ?business" +
+                " }";
         final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Create some statements that generate a bunch of right SP results.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -122,7 +111,7 @@ public class JoinProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -133,46 +122,14 @@ public class JoinProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the StatementPatterns that will be evaluated.
-        final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
-        final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
-
         // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        // The topic that Statements are written to is used as a source.
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
-        // Add a processor that handles the first statement pattern.
-        builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp,
-                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
-        // Add a processor that handles the second statement pattern.
-        builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp,
-                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-
-        // Add a processor that handles a natrual join over the SPs.
-        builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
-                "NATURAL_JOIN",
-                new NaturalJoin(),
-                Lists.newArrayList("employee"),
-                Lists.newArrayList("employee", "person", "business"),
-                result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP");
-
-        // Add a state store for the join processor.
-        final StateStoreSupplier joinStoreSupplier =
-                Stores.create( "NATURAL_JOIN" )
-                .withStringKeys()
-                .withValues(new VisibilityBindingSetSerde())
-                .inMemory()
-                .build();
-        builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
-
-        // Add a processor that formats the VisibilityBindingSet for output.
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "NATURAL_JOIN");
-
-        // Add a sink that writes the data out to a new Kafka topic.
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final String query =
+                "SELECT * WHERE { " +
+                    "?person <urn:talksTo> ?employee ." +
+                    "?employee <urn:worksAt> ?business" +
+                " }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Create some statements that generate a bunch of right SP results.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -205,7 +162,7 @@ public class JoinProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -216,46 +173,14 @@ public class JoinProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the StatementPatterns that will be evaluated.
-        final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
-        final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
-
         // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        // The topic that Statements are written to is used as a source.
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
-        // Add a processor that handles the first statement pattern.
-        builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp,
-                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-
-        // Add a processor that handles the second statement pattern.
-        builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp,
-                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
-        // Add a processor that handles a natrual join over the SPs.
-        builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
-                "NATURAL_JOIN",
-                new NaturalJoin(),
-                Lists.newArrayList("employee"),
-                Lists.newArrayList("employee", "person", "business"),
-                result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP");
-
-        // Add a state store for the join processor.
-        final StateStoreSupplier joinStoreSupplier =
-                Stores.create( "NATURAL_JOIN" )
-                .withStringKeys()
-                .withValues(new VisibilityBindingSetSerde())
-                .inMemory()
-                .build();
-        builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
-
-        // Add a processor that formats the VisibilityBindingSet for output.
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "NATURAL_JOIN");
-
-        // Add a sink that writes the data out to a new Kafka topic.
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final String query =
+                "SELECT * WHERE { " +
+                    "?person <urn:talksTo> ?employee ." +
+                    "?employee <urn:worksAt> ?business" +
+                " }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Create some statements that generate a bunch of right SP results.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -294,7 +219,7 @@ public class JoinProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a&c") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -305,67 +230,15 @@ public class JoinProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the StatementPatterns that will be evaluated.
-        final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
-        final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
-        final StatementPattern sp3 = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:hourlyWage> ?wage }");
-
         // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        // The topic that Statements are written to is used as a source.
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
-        // Add a processor that handles the first statement pattern.
-        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1,
-                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-
-        // Add a processor that handles the second statement pattern.
-        builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2,
-                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
-        // Add a processor that handles a natural join over SPs 1 and 2.
-        builder.addProcessor("JOIN1", new JoinProcessorSupplier(
-                "JOIN1",
-                new NaturalJoin(),
-                Lists.newArrayList("employee"),
-                Lists.newArrayList("employee", "person", "business"),
-                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "SP1", "SP2");
-
-        // Add a processor that handles the third statement pattern.
-        builder.addProcessor("SP3", new StatementPatternProcessorSupplier(sp3,
-                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
-        // Add a processor that handles a natural join over JOIN1 and SP3.
-        builder.addProcessor("JOIN2", new JoinProcessorSupplier(
-                "JOIN2",
-                new NaturalJoin(),
-                Lists.newArrayList("employee"),
-                Lists.newArrayList("employee", "business", "wage"),
-                result -> ProcessorResult.make( new UnaryResult(result) )), "JOIN1", "SP3");
-
-        // Setup the join state suppliers.
-        final StateStoreSupplier join1StoreSupplier =
-                Stores.create( "JOIN1" )
-                .withStringKeys()
-                .withValues(new VisibilityBindingSetSerde())
-                .inMemory()
-                .build();
-        builder.addStateStore(join1StoreSupplier, "JOIN1");
-
-        final StateStoreSupplier join2StoreSupplier =
-                Stores.create( "JOIN2" )
-                .withStringKeys()
-                .withValues(new VisibilityBindingSetSerde())
-                .inMemory()
-                .build();
-        builder.addStateStore(join2StoreSupplier, "JOIN2");
-
-        // Add a processor that formats the VisibilityBindingSet for output.
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "JOIN2");
-
-        // Add a sink that writes the data out to a new Kafka topic.
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final String query =
+                "SELECT * WHERE { " +
+                    "?person <urn:talksTo> ?employee ." +
+                    "?employee <urn:worksAt> ?business ." +
+                    "?employee <urn:hourlyWage> ?wage ." +
+                " }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Create some statements that generate a bunch of right SP results.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -387,7 +260,7 @@ public class JoinProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -398,46 +271,14 @@ public class JoinProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the StatementPatterns that will be evaluated.
-        final StatementPattern requiredSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
-        final StatementPattern optionalSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
-
         // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        // The topic that Statements are written to is used as a source.
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
-        // Add a processor that handles the first statement pattern.
-        builder.addProcessor("REQUIRED_SP", new StatementPatternProcessorSupplier(requiredSp,
-                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-
-        // Add a processor that handles the second statement pattern.
-        builder.addProcessor("OPTIONAL_SP", new StatementPatternProcessorSupplier(optionalSp,
-                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
-        // Add a processor that handles a natrual join over the SPs.
-        builder.addProcessor("LEFT_JOIN", new JoinProcessorSupplier(
-                "LEFT_JOIN",
-                new LeftOuterJoin(),
-                Lists.newArrayList("employee"),
-                Lists.newArrayList("employee", "person", "business"),
-                result -> ProcessorResult.make( new UnaryResult(result) )), "REQUIRED_SP", "OPTIONAL_SP");
-
-        // Add a state store for the join processor.
-        final StateStoreSupplier joinStoreSupplier =
-                Stores.create( "LEFT_JOIN" )
-                .withStringKeys()
-                .withValues(new VisibilityBindingSetSerde())
-                .inMemory()
-                .build();
-        builder.addStateStore(joinStoreSupplier, "LEFT_JOIN");
-
-        // Add a processor that formats the VisibilityBindingSet for output.
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "LEFT_JOIN");
-
-        // Add a sink that writes the data out to a new Kafka topic.
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final String query =
+                "SELECT * WHERE { " +
+                    "?person <urn:talksTo> ?employee ." +
+                    "OPTIONAL{ ?employee <urn:worksAt> ?business } " +
+                " }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Create some statements that generate a result that includes the optional value as well as one that does not.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -470,6 +311,6 @@ public class JoinProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "c") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
index ee0e55b..d71577b 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -24,29 +24,13 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.rya.api.function.join.NaturalJoin;
-import org.apache.rya.api.function.projection.MultiProjectionEvaluator;
-import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RdfTestUtil;
-import org.apache.rya.streams.kafka.processors.ProcessorResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
-import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier;
-import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter;
 import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -54,11 +38,6 @@ import org.openrdf.model.BNode;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.model.vocabulary.RDF;
-import org.openrdf.query.algebra.MultiProjection;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.google.common.collect.Lists;
 
 /**
  * Integration tests the methods of {@link MultiProjectionProcessor}.
@@ -76,10 +55,8 @@ public class MultiProjectionProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the RDF model objects that will be used to build the query.
-        final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?thing <urn:corner> ?location . }");
-        final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?thing <urn:compass> ?direction . }");
-        final MultiProjection multiProjection = RdfTestUtil.getMultiProjection(
+        // Create a topology for the Query that will be tested.
+        final String sparql =
                 "CONSTRUCT {" +
                     "_:b a <urn:movementObservation> ; " +
                     "<urn:location> ?location ; " +
@@ -88,38 +65,10 @@ public class MultiProjectionProcessorIT {
                 "WHERE {" +
                     "?thing <urn:corner> ?location ." +
                     "?thing <urn:compass> ?direction." +
-                "}");
-
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1,
-                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-        builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2,
-                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
-        builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
-                "NATURAL_JOIN",
-                new NaturalJoin(),
-                Lists.newArrayList("thing"),
-                Lists.newArrayList("thing", "location", "direction"),
-                result -> ProcessorResult.make( new UnaryResult(result) )), "SP1", "SP2");
-
-        final StateStoreSupplier joinStoreSupplier =
-                Stores.create( "NATURAL_JOIN" )
-                  .withStringKeys()
-                  .withValues(new VisibilityBindingSetSerde())
-                  .inMemory()
-                  .build();
-        builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
+                "}";
 
-        final String blankNodeId = UUID.randomUUID().toString();
-        builder.addProcessor("MULTIPROJECTION", new MultiProjectionProcessorSupplier(
-                MultiProjectionEvaluator.make(multiProjection, () -> blankNodeId),
-                result -> ProcessorResult.make(new UnaryResult(result))), "NATURAL_JOIN");
-
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "MULTIPROJECTION");
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final String bNodeId = UUID.randomUUID().toString();
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, () -> bNodeId);
 
         // Create the statements that will be input into the query.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -130,26 +79,14 @@ public class MultiProjectionProcessorIT {
                 vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:corner"), vf.createURI("urn:corner1")), "a") );
 
         // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        final BNode blankNode = vf.createBNode(blankNodeId);
-
-        MapBindingSet expectedBs = new MapBindingSet();
-        expectedBs.addBinding("subject", blankNode);
-        expectedBs.addBinding("predicate", RDF.TYPE);
-        expectedBs.addBinding("object", vf.createURI("urn:movementObservation"));
-
-        expectedBs = new MapBindingSet();
-        expectedBs.addBinding("subject", blankNode);
-        expectedBs.addBinding("predicate", vf.createURI("urn:direction"));
-        expectedBs.addBinding("object", vf.createURI("urn:NW"));
-
+        final Set<VisibilityStatement> expected = new HashSet<>();
+        final BNode blankNode = vf.createBNode(bNodeId);
 
-        expectedBs = new MapBindingSet();
-        expectedBs.addBinding("subject", blankNode);
-        expectedBs.addBinding("predicate", vf.createURI("urn:location"));
-        expectedBs.addBinding("object", vf.createURI("urn:corner1"));
+        expected.add(new VisibilityStatement(vf.createStatement(blankNode, RDF.TYPE, vf.createURI("urn:movementObservation")), "a"));
+        expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:direction"), vf.createURI("urn:NW")), "a"));
+        expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a"));
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
index 99e2451..bc5f115 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -24,30 +24,20 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.ProjectionEvaluator;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RdfTestUtil;
-import org.apache.rya.streams.kafka.processors.ProcessorResult;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
 import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
-import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter;
-import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.Projection;
-import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.collect.Sets;
@@ -68,34 +58,14 @@ public class ProjectionProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the RDF model objects that will be used to build the query.
+        // Create a topology for the Query that will be tested.
         final String sparql =
                 "SELECT (?person AS ?p) ?otherPerson " +
                 "WHERE { " +
                     "?person <urn:talksTo> ?otherPerson . " +
                 "}";
-        final Projection projection = RdfTestUtil.getProjection(sparql);
-        final StatementPattern sp = RdfTestUtil.getSp(sparql);
 
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        // The topic that Statements are written to is used as a source.
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
-        // Add a processor that handles the first statement pattern.
-        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
-        // Add a processor that handles the projection.
-        builder.addProcessor("P1", new ProjectionProcessorSupplier(
-                ProjectionEvaluator.make(projection),
-                result -> ProcessorResult.make(new UnaryResult(result))), "SP1");
-
-        // Add a processor that formats the VisibilityBindingSet for output.
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "P1");
-
-        // Add a sink that writes the data out to a new Kafka topic.
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
 
         // Load some data into the input topic.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -110,6 +80,6 @@ public class ProjectionProcessorIT {
         expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob"));
         expected.add(new VisibilityBindingSet(expectedBs, "a"));
 
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected));
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
index eda4c89..31462ec 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.rya.streams.kafka.topology;
 
 import static org.junit.Assert.assertEquals;
@@ -5,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.List;
 
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
 import org.apache.rya.streams.kafka.topology.TopologyFactory.ProcessorEntry;
 import org.junit.Before;
 import org.junit.Test;
@@ -15,6 +34,9 @@ import org.openrdf.query.algebra.Projection;
 import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.algebra.Var;
 
+/**
+ * Unit tests the methods of {@link TopologyFactory}.
+ */
 public class TopologyFactoryTest {
     private static TopologyFactory FACTORY;
 
@@ -40,7 +62,7 @@ public class TopologyFactoryTest {
                 + "?person <urn:talksTo> ?otherPerson . "
                 + "}";
 
-        FACTORY.build(query, "source", "sink");
+        FACTORY.build(query, "source", "sink", new RandomUUIDFactory());
         final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
 
         assertTrue(entries.get(0).getNode() instanceof Projection);
@@ -57,7 +79,7 @@ public class TopologyFactoryTest {
                 + "?otherPerson <urn:talksTo> ?dog . "
                 + "}";
 
-        FACTORY.build(query, "source", "sink");
+        FACTORY.build(query, "source", "sink", new RandomUUIDFactory());
         final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
 
         assertTrue(entries.get(0).getNode() instanceof Projection);
@@ -76,7 +98,7 @@ public class TopologyFactoryTest {
                 + "?dog <urn:chews> ?toy . "
                 + "}";
 
-        FACTORY.build(query, "source", "sink");
+        FACTORY.build(query, "source", "sink", new RandomUUIDFactory());
         final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
 
         assertTrue(entries.get(0).getNode() instanceof Projection);
@@ -96,11 +118,11 @@ public class TopologyFactoryTest {
                 + "?person <urn:talksTo> ?otherPerson . "
             + "}";
 
-        FACTORY.build(query, "source", "sink");
+        FACTORY.build(query, "source", "sink", new RandomUUIDFactory());
         final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
 
         assertTrue(entries.get(0).getNode() instanceof Projection);
         final StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson"));
         assertEquals(expected, entries.get(1).getNode());
     }
-}
+}
\ No newline at end of file