You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:29 UTC

[12/50] [abbrv] incubator-rya git commit: RYA-377 TopologyBuilder

RYA-377 TopologyBuilder

A factory for turning TupleExpr parsed from
SPARQL into TopologyBuilder objects used
by Rya Streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/83d09f42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/83d09f42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/83d09f42

Branch: refs/heads/master
Commit: 83d09f42c98f16150aab69c42465157b55c28a14
Parents: b8b0a12
Author: Andrew Smith <sm...@gmail.com>
Authored: Tue Nov 14 16:28:43 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:00 2018 -0500

----------------------------------------------------------------------
 .../streams/kafka/topology/TopologyFactory.java | 414 +++++++++++++++++++
 .../processors/StatementPatternProcessorIT.java | 124 ++----
 .../kafka/processors/join/JoinProcessorIT.java  |  86 ++--
 .../kafka/topology/TopologyFactoryTest.java     | 106 +++++
 4 files changed, 573 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
new file mode 100644
index 0000000..782a58b
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.topology;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.rya.api.function.join.IterativeJoin;
+import org.apache.rya.api.function.join.LeftOuterJoin;
+import org.apache.rya.api.function.join.NaturalJoin;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier;
+import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.BinaryTupleOperator;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.LeftJoin;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Factory for building {@link TopologyBuilder}s from a SPARQL query.
+ */
+@DefaultAnnotation(NonNull.class)
+public class TopologyFactory implements TopologyBuilderFactory {
+    private static final String SOURCE = "SOURCE";
+    private static final String STATEMENT_PATTERN_PREFIX = "SP_";
+    private static final String JOIN_PREFIX = "JOIN_";
+    private static final String PROJECTION_PREFIX = "PROJECTION_";
+    private static final String SINK = "SINK";
+
+    private List<ProcessorEntry> processorEntryList;
+
+    /**
+     * Builds a {@link TopologyBuilder} based on the provided sparql query.
+     *
+     * @param sparqlQuery - The SPARQL query to build a topology for. (not null)
+     * @param statementTopic - The topic for the source to read from. (not null)
+     * @param resultTopic - The topic for the sink to write to. (not null)
+     * @return - The created {@link TopologyBuilder}.
+     * @throws MalformedQueryException - The provided query is not a valid SPARQL query.
+     * @throws TopologyBuilderException - A problem occurred while constructing the topology.
+     */
+    @Override
+    public TopologyBuilder build(final String sparqlQuery, final String statementTopic, final String resultTopic)
+            throws MalformedQueryException, TopologyBuilderException {
+        requireNonNull(sparqlQuery);
+        requireNonNull(statementTopic);
+        requireNonNull(resultTopic);
+
+        final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparqlQuery, null);
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        final TupleExpr expr = parsedQuery.getTupleExpr();
+        final QueryVisitor visitor = new QueryVisitor();
+        expr.visit(visitor);
+
+        processorEntryList = visitor.getProcessorEntryList();
+        final Map<TupleExpr, String> idMap = visitor.getIDs();
+        // add source node
+        builder.addSource(SOURCE, new StringDeserializer(), new VisibilityStatementDeserializer(), statementTopic);
+
+        // processing the processor entry list in reverse order means we go from leaf
+        // nodes -> parent nodes.
+        // So, when the parent processing nodes get added, the upstream
+        // processing node will already exist.
+
+        ProcessorEntry entry = null;
+        for (int ii = processorEntryList.size() - 1; ii >= 0; ii--) {
+            entry = processorEntryList.get(ii);
+            //statement patterns need to be connected to the Source.
+            if(entry.getNode() instanceof StatementPattern) {
+                builder.addProcessor(entry.getID(), entry.getSupplier(), SOURCE);
+            } else {
+                final List<TupleExpr> parents = entry.getUpstreamNodes();
+                final String[] parentIDs = new String[parents.size()];
+                for (int id = 0; id < parents.size(); id++) {
+                    parentIDs[id] = idMap.get(parents.get(id));
+                }
+                builder.addProcessor(entry.getID(), entry.getSupplier(), parentIDs);
+            }
+
+            if (entry.getNode() instanceof Join || entry.getNode() instanceof LeftJoin) {
+                // Add a state store for the join processor.
+                final StateStoreSupplier joinStoreSupplier =
+                        Stores.create( entry.getID() )
+                        .withStringKeys()
+                        .withValues(new VisibilityBindingSetSerde())
+                        .persistent()
+                        .build();
+                builder.addStateStore(joinStoreSupplier, entry.getID());
+            }
+        }
+
+        // convert processing results to visibility binding sets
+        builder.addProcessor("OUTPUT_FORMATTER", new BindingSetOutputFormatterSupplier(), entry.getID());
+
+        // add sink
+        builder.addSink(SINK, resultTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "OUTPUT_FORMATTER");
+
+        return builder;
+    }
+
+    @VisibleForTesting
+    public List<ProcessorEntry> getProcessorEntry() {
+        return processorEntryList;
+    }
+
+    /**
+     * An entry to be added as a Processing node in kafka streams'
+     * TopologyBuilder.
+     */
+    final static class ProcessorEntry {
+        private final TupleExpr node;
+        private final String id;
+        private final Optional<Side> downstreamSide;
+        private final ProcessorSupplier<?, ?> supplier;
+        private final List<TupleExpr> upstreamNodes;
+
+        /**
+         * Creates a new {@link ProcessorEntry}.
+         *
+         * @param node - The RDF node to be added as a processor. (not null)
+         * @param id - The id for the {@link TupleExpr} node. (not null)
+         * @param downstreamSide - Which side the current node is on from its downstream processor. (not null)
+         * @param supplier - Supplies the {@link Processor} for this node. (not null)
+         * @param upstreamNodes - The RDF nodes that will become upstream processing nodes. (not null)
+         */
+        public ProcessorEntry(final TupleExpr node, final String id, final Optional<Side> downstreamSide, final ProcessorSupplier<?, ?> supplier, final List<TupleExpr> upstreamNodes) {
+            this.node = requireNonNull(node);
+            this.id = requireNonNull(id);
+            this.downstreamSide = requireNonNull(downstreamSide);
+            this.supplier = requireNonNull(supplier);
+            this.upstreamNodes = requireNonNull(upstreamNodes);
+        }
+
+        /**
+         * @return - The RDF node to added as a processor.
+         */
+        public TupleExpr getNode() {
+            return node;
+        }
+
+        /**
+         * @return - The side the node is on from its downstream processor.
+         */
+        public Optional<Side> getDownstreamSide() {
+            return downstreamSide;
+        }
+
+        /**
+         * @return - The upstream parents to this node. These parent nodes must
+         *         result in a {@link ProcessorEntry}
+         */
+        public List<TupleExpr> getUpstreamNodes() {
+            return upstreamNodes;
+        }
+
+        /**
+         * @return - The processor id of the node.
+         */
+        public String getID() {
+            return id;
+        }
+
+        /**
+         * @return - The {@link ProcessorSupplier} used to supply the
+         *         {@link Processor} for this node.
+         */
+        public ProcessorSupplier<?, ?> getSupplier() {
+            return supplier;
+        }
+
+        @Override
+        public boolean equals(final Object other) {
+            if (!(other instanceof ProcessorEntry)) {
+                return false;
+            }
+            final ProcessorEntry o = (ProcessorEntry) other;
+            return Objects.equals(node, o.node) &&
+                    Objects.equals(id, o.id) &&
+                    Objects.equals(downstreamSide, o.downstreamSide) &&
+                    Objects.equals(supplier, o.supplier) &&
+                    Objects.equals(upstreamNodes, o.upstreamNodes);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(node, downstreamSide, upstreamNodes, id, supplier);
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new StringBuilder();
+            sb.append("ID: " + id + "\n");
+            if (downstreamSide.isPresent()) {
+                sb.append("***********************************\n");
+                sb.append("SIDE: " + downstreamSide.get() + "\n");
+            }
+            sb.append("***********************************\n");
+            sb.append("PARENTS: ");
+            for (final TupleExpr expr : upstreamNodes) {
+                sb.append(expr.toString() + ",");
+            }
+            sb.append("\n***********************************\n");
+            sb.append("NODE: " + node.toString());
+            sb.append("\n");
+            return sb.toString();
+        }
+    }
+
+    /**
+     * Visits each node in a {@link TupleExpr} and creates a
+     * {@link ProcessorSupplier} and meta information needed for creating a
+     * {@link TopologyBuilder}.
+     */
+    final static class QueryVisitor extends QueryModelVisitorBase<TopologyBuilderException> {
+        // Each node needs a ProcessorEntry to be a processor node in the
+        // TopologyBuilder.
+        private final List<ProcessorEntry> entries = new ArrayList<>();
+        private final Map<TupleExpr, String> idMap = new HashMap<>();
+
+        /**
+         * @return The {@link ProcessorEntry}s used to create a Topology.
+         */
+        public List<ProcessorEntry> getProcessorEntryList() {
+            return entries;
+        }
+
+        /**
+         * @return The IDs created for each {@link TupleExpr} node in the query that resulted in a {@link ProcessorEntry}.
+         */
+        public Map<TupleExpr, String> getIDs() {
+            return idMap;
+        }
+
+        @Override
+        public void meet(final StatementPattern node) throws TopologyBuilderException {
+            // topology parent for Statement Patterns will always be a source
+            final String id = STATEMENT_PATTERN_PREFIX + UUID.randomUUID();
+            final Optional<Side> side = getSide(node);
+            final StatementPatternProcessorSupplier supplier = new StatementPatternProcessorSupplier(node, result -> getResult(side, result));
+            entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList()));
+            idMap.put(node, id);
+            super.meet(node);
+        }
+
+        @Override
+        public void meet(final Projection node) throws TopologyBuilderException {
+            final String id = PROJECTION_PREFIX + UUID.randomUUID();
+            final Optional<Side> side = getSide(node);
+            TupleExpr arg = node.getArg();
+            // If the arg is an Extension, there are rebindings that need to be
+            // ignored since they do not have a processor node.
+            if (arg instanceof Extension) {
+                arg = ((Extension) arg).getArg();
+            }
+            final ProjectionProcessorSupplier supplier = new ProjectionProcessorSupplier(node.getProjectionElemList(), result -> getResult(side, result));
+            entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(arg)));
+            idMap.put(node, id);
+            super.meet(node);
+        }
+
+        @Override
+        public void meet(final Join node) throws TopologyBuilderException {
+            final String id = JOIN_PREFIX + UUID.randomUUID();
+            meetJoin(id, new NaturalJoin(), node);
+            super.meet(node);
+        }
+
+        @Override
+        public void meet(final LeftJoin node) throws TopologyBuilderException {
+            final String id = JOIN_PREFIX + UUID.randomUUID();
+            meetJoin(id, new LeftOuterJoin(), node);
+            super.meet(node);
+        }
+
+        /**
+         * Gets the {@link Side} the current node in the visitor is on relative to the provided node.
+         * @param node - The node used to determine the side of the current visitor node.
+         * @return The {@link Side} the current node is on.
+         */
+        private Optional<Side> getSide(final QueryModelNode node) {
+            // if query parent is a binary operator, need to determine if its left or right.
+            if (node.getParentNode() instanceof BinaryTupleOperator) {
+                final BinaryTupleOperator binary = (BinaryTupleOperator) node.getParentNode();
+                if (node.equals(binary.getLeftArg())) {
+                    return Optional.of(Side.LEFT);
+                } else {
+                    return Optional.of(Side.RIGHT);
+                }
+            } else {
+                return Optional.empty();
+            }
+        }
+
+        /**
+         * Creates a join entry based on a provided {@link IterativeJoin} and the Join's
+         * {@link BinaryTupleOperator}.
+         *
+         * @param id - The ID of the join.
+         * @param joinFunction - The {@link IterativeJoin} function to perform during processing.
+         * @param node - The {@link BinaryTupleOperator} used to create the process.
+         */
+        private void meetJoin(final String id, final IterativeJoin joinFunction, final BinaryTupleOperator node) {
+            final Set<String> leftArgs = node.getLeftArg().getBindingNames();
+            final Set<String> rightArgs = node.getRightArg().getBindingNames();
+            final List<String> joinVars = Lists.newArrayList(Sets.intersection(leftArgs, rightArgs));
+
+            leftArgs.removeAll(joinVars);
+            rightArgs.removeAll(joinVars);
+
+            final List<String> otherVars = new ArrayList<>();
+            otherVars.addAll(leftArgs);
+            otherVars.addAll(rightArgs);
+
+            // the join variables need to be sorted so that when compared to all
+            // the variables, the start of the all variable list is congruent to
+            // the join var list.
+            joinVars.sort(Comparator.naturalOrder());
+            otherVars.sort(Comparator.naturalOrder());
+
+            final List<String> allVars = new ArrayList<>();
+            allVars.addAll(joinVars);
+            allVars.addAll(otherVars);
+
+            final Optional<Side> side = getSide(node);
+            final JoinProcessorSupplier supplier = new JoinProcessorSupplier(id, joinFunction, joinVars, allVars, result -> getResult(side, result));
+            entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(node.getLeftArg(), node.getRightArg())));
+            idMap.put(node, id);
+        }
+
+        /**
+         * Creates a {@link ProcessorResult} based on a side and result.
+         *
+         * @param side - If one is present, a {@link BinaryResult} is created.
+         * @param result - The result to wrap in a {@link ProcessorResult}.
+         * @return The {@link ProcessorResult} used by the {@link Processor}.
+         */
+        private ProcessorResult getResult(final Optional<Side> side, final VisibilityBindingSet result) {
+            if (side.isPresent()) {
+                return ProcessorResult.make(new BinaryResult(side.get(), result));
+            } else {
+                return ProcessorResult.make(new UnaryResult(result));
+            }
+        }
+    }
+
+    /**
+     * An Exception thrown when a problem occurs when constructing the processor
+     * topology in the {@link TopologyFactory}.
+     */
+    public class TopologyBuilderException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        public TopologyBuilderException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
index 0b2ff60..e55ec2e 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -24,25 +24,18 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RdfTestUtil;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
 import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
-import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.algebra.evaluation.QueryBindingSet;
 
 /**
@@ -61,23 +54,10 @@ public class StatementPatternProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the StatementPattern object that will be evaluated.
-        final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
-
         // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        // The topic that Statements are written to is used as a source.
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
-        // Add a processor that handles the first statement pattern.
-        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
-        // Add a processor that formats the VisibilityBindingSet for output.
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1");
-
-        // Add a sink that writes the data out to a new Kafka topic.
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
 
         // Create a statement that generate an SP result.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -104,23 +84,10 @@ public class StatementPatternProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the StatementPattern object that will be evaluated.
-        final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
-
         // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        // The topic that Statements are written to is used as a source.
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
-        // Add a processor that handles the first statement pattern.
-        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
-        // Add a processor that formats the VisibilityBindingSet for output.
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1");
-
-        // Add a sink that writes the data out to a new Kafka topic.
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
 
         // Create some statements where some generates SP results and others do not.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -155,27 +122,13 @@ public class StatementPatternProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the StatementPattern object that will be evaluated.
-        final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
-        final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?person ?action <urn:Bob> }");
-
         // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        // The topic that Statements are written to is used as a source.
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
-        // Add a processor that handles the first statement pattern.
-        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
-        // Add a processor that handles the second statement pattern.
-        builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
-        // Add a processor that formats the VisibilityBindingSet for output.
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1", "SP2");
-
-        // Add a sink that writes the data out to a new Kafka topic.
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final String query = "SELECT * WHERE { "
+                + "?person <urn:talksTo> ?otherPerson . "
+                + "?person ?action <urn:Bob>"
+                + "}";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
 
         // Create some statements where some generates SP results and others do not.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -185,14 +138,10 @@ public class StatementPatternProcessorIT {
         // Show the correct binding set results from the job.
         final Set<VisibilityBindingSet> expected = new HashSet<>();
 
-        QueryBindingSet bs = new QueryBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        bs = new QueryBindingSet();
+        final QueryBindingSet bs = new QueryBindingSet();
         bs.addBinding("person", vf.createURI("urn:Alice"));
         bs.addBinding("action", vf.createURI("urn:talksTo"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
@@ -207,27 +156,13 @@ public class StatementPatternProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the StatementPattern object that will be evaluated.
-        final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
-        final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?person ?action <urn:Bob> }");
-
         // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        // The topic that Statements are written to is used as a source.
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
-        // Add a processor that handles the first statement pattern.
-        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
-        // Add a processor that handles the second statement pattern.
-        builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
-        // Add a processor that formats the VisibilityBindingSet for output.
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1", "SP2");
-
-        // Add a sink that writes the data out to a new Kafka topic.
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final String query = "SELECT * WHERE { "
+                + "?person <urn:talksTo> ?otherPerson ."
+                + "?person ?action <urn:Bob>"
+                + "}";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
 
         // Create some statements where some generates SP results and others do not.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -240,24 +175,17 @@ public class StatementPatternProcessorIT {
         final Set<VisibilityBindingSet> expected = new HashSet<>();
 
         QueryBindingSet bs = new QueryBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
         bs = new QueryBindingSet();
         bs.addBinding("person", vf.createURI("urn:Alice"));
         bs.addBinding("action", vf.createURI("urn:talksTo"));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        bs = new QueryBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
         bs.addBinding("otherPerson", vf.createURI("urn:Charlie"));
-        expected.add( new VisibilityBindingSet(bs, "a|b") );
+        expected.add(new VisibilityBindingSet(bs, "a&(a|b)"));
 
         bs = new QueryBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Charlie"));
-        bs.addBinding("action", vf.createURI("urn:walksWith"));
-        expected.add( new VisibilityBindingSet(bs, "b") );
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("action", vf.createURI("urn:talksTo"));
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
 
         // Run the test.
         KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index 7051efa..dbad15c 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -46,6 +46,7 @@ import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterS
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -82,46 +83,13 @@ public class JoinProcessorIT {
         final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 
-        // Get the StatementPatterns that will be evaluated.
-        final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
-        final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
-
         // Setup a topology.
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        // The topic that Statements are written to is used as a source.
-        builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
-        // Add a processor that handles the first statement pattern.
-        builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp,
-                result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-
-        // Add a processor that handles the second statement pattern.
-        builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp,
-                result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
-        // Add a processor that handles a natrual join over the SPs.
-        builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
-                "NATURAL_JOIN",
-                new NaturalJoin(),
-                Lists.newArrayList("employee"),
-                Lists.newArrayList("employee", "person", "business"),
-                result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP");
-
-        // Add a state store for the join processor.
-        final StateStoreSupplier joinStoreSupplier =
-                Stores.create( "NATURAL_JOIN" )
-                  .withStringKeys()
-                  .withValues(new VisibilityBindingSetSerde())
-                  .inMemory()
-                  .build();
-        builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
-
-        // Add a processor that formats the VisibilityBindingSet for output.
-        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "NATURAL_JOIN");
-
-        // Add a sink that writes the data out to a new Kafka topic.
-        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+        final String query = "SELECT * WHERE { "
+                + "?person <urn:talksTo> ?employee ."
+                + "?employee <urn:worksAt> ?business"
+                + " }";
+        final TopologyFactory factory = new TopologyFactory();
+        final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
 
         // Create some statements that generate a bunch of right SP results.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -194,10 +162,10 @@ public class JoinProcessorIT {
         // Add a state store for the join processor.
         final StateStoreSupplier joinStoreSupplier =
                 Stores.create( "NATURAL_JOIN" )
-                  .withStringKeys()
-                  .withValues(new VisibilityBindingSetSerde())
-                  .inMemory()
-                  .build();
+                .withStringKeys()
+                .withValues(new VisibilityBindingSetSerde())
+                .inMemory()
+                .build();
         builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
 
         // Add a processor that formats the VisibilityBindingSet for output.
@@ -277,10 +245,10 @@ public class JoinProcessorIT {
         // Add a state store for the join processor.
         final StateStoreSupplier joinStoreSupplier =
                 Stores.create( "NATURAL_JOIN" )
-                  .withStringKeys()
-                  .withValues(new VisibilityBindingSetSerde())
-                  .inMemory()
-                  .build();
+                .withStringKeys()
+                .withValues(new VisibilityBindingSetSerde())
+                .inMemory()
+                .build();
         builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
 
         // Add a processor that formats the VisibilityBindingSet for output.
@@ -379,18 +347,18 @@ public class JoinProcessorIT {
         // Setup the join state suppliers.
         final StateStoreSupplier join1StoreSupplier =
                 Stores.create( "JOIN1" )
-                  .withStringKeys()
-                  .withValues(new VisibilityBindingSetSerde())
-                  .inMemory()
-                  .build();
+                .withStringKeys()
+                .withValues(new VisibilityBindingSetSerde())
+                .inMemory()
+                .build();
         builder.addStateStore(join1StoreSupplier, "JOIN1");
 
         final StateStoreSupplier join2StoreSupplier =
                 Stores.create( "JOIN2" )
-                  .withStringKeys()
-                  .withValues(new VisibilityBindingSetSerde())
-                  .inMemory()
-                  .build();
+                .withStringKeys()
+                .withValues(new VisibilityBindingSetSerde())
+                .inMemory()
+                .build();
         builder.addStateStore(join2StoreSupplier, "JOIN2");
 
         // Add a processor that formats the VisibilityBindingSet for output.
@@ -459,10 +427,10 @@ public class JoinProcessorIT {
         // Add a state store for the join processor.
         final StateStoreSupplier joinStoreSupplier =
                 Stores.create( "LEFT_JOIN" )
-                  .withStringKeys()
-                  .withValues(new VisibilityBindingSetSerde())
-                  .inMemory()
-                  .build();
+                .withStringKeys()
+                .withValues(new VisibilityBindingSetSerde())
+                .inMemory()
+                .build();
         builder.addStateStore(joinStoreSupplier, "LEFT_JOIN");
 
         // Add a processor that formats the VisibilityBindingSet for output.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
new file mode 100644
index 0000000..eda4c89
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
@@ -0,0 +1,106 @@
+package org.apache.rya.streams.kafka.topology;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.rya.streams.kafka.topology.TopologyFactory.ProcessorEntry;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+
+public class TopologyFactoryTest {
+    private static TopologyFactory FACTORY;
+
+    private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+    private static final Var TALKS_TO = new Var("-const-urn:talksTo", VF.createURI("urn:talksTo"));
+    private static final Var CHEWS = new Var("-const-urn:chews", VF.createURI("urn:chews"));
+
+    static {
+        TALKS_TO.setAnonymous(true);
+        TALKS_TO.setConstant(true);
+        CHEWS.setAnonymous(true);
+        CHEWS.setConstant(true);
+    }
+
+    @Before
+    public void setup() {
+        FACTORY = new TopologyFactory();
+    }
+
+    @Test
+    public void projectionStatementPattern() throws Exception {
+        final String query = "SELECT * WHERE { "
+                + "?person <urn:talksTo> ?otherPerson . "
+                + "}";
+
+        FACTORY.build(query, "source", "sink");
+        final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
+
+        assertTrue(entries.get(0).getNode() instanceof Projection);
+        assertTrue(entries.get(1).getNode() instanceof StatementPattern);
+
+        final StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson"));
+        assertEquals(expected, entries.get(1).getNode());
+    }
+
+    @Test
+    public void projectionJoinStatementPattern() throws Exception {
+        final String query = "SELECT * WHERE { "
+                + "?person <urn:talksTo> ?otherPerson . "
+                + "?otherPerson <urn:talksTo> ?dog . "
+                + "}";
+
+        FACTORY.build(query, "source", "sink");
+        final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
+
+        assertTrue(entries.get(0).getNode() instanceof Projection);
+        assertTrue(entries.get(1).getNode() instanceof Join);
+        StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson"));
+        assertEquals(expected, entries.get(2).getNode());
+        expected = new StatementPattern(new Var("otherPerson"), TALKS_TO, new Var("dog"));
+        assertEquals(expected, entries.get(3).getNode());
+    }
+
+    @Test
+    public void projectionJoinJoinStatementPattern() throws Exception {
+        final String query = "SELECT * WHERE { "
+                + "?person <urn:talksTo> ?otherPerson . "
+                + "?otherPerson <urn:talksTo> ?dog . "
+                + "?dog <urn:chews> ?toy . "
+                + "}";
+
+        FACTORY.build(query, "source", "sink");
+        final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
+
+        assertTrue(entries.get(0).getNode() instanceof Projection);
+        assertTrue(entries.get(1).getNode() instanceof Join);
+        assertTrue(entries.get(2).getNode() instanceof Join);
+        StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson"));
+        assertEquals(expected, entries.get(3).getNode());
+        expected = new StatementPattern(new Var("otherPerson"), TALKS_TO, new Var("dog"));
+        assertEquals(expected, entries.get(4).getNode());
+        expected = new StatementPattern(new Var("dog"), CHEWS, new Var("toy"));
+        assertEquals(expected, entries.get(5).getNode());
+    }
+
+    @Test
+    public void projectionStatementPattern_rebind() throws Exception {
+        final String query = "CONSTRUCT { ?person <urn:mightKnow> ?otherPerson } WHERE { "
+                + "?person <urn:talksTo> ?otherPerson . "
+            + "}";
+
+        FACTORY.build(query, "source", "sink");
+        final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
+
+        assertTrue(entries.get(0).getNode() instanceof Projection);
+        final StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson"));
+        assertEquals(expected, entries.get(1).getNode());
+    }
+}