You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:29 UTC
[12/50] [abbrv] incubator-rya git commit: RYA-377 TopologyBuilder
RYA-377 TopologyBuilder
A factory for turning TupleExpr parsed from
SPARQL into TopologyBuilder objects used
by Rya Streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/83d09f42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/83d09f42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/83d09f42
Branch: refs/heads/master
Commit: 83d09f42c98f16150aab69c42465157b55c28a14
Parents: b8b0a12
Author: Andrew Smith <sm...@gmail.com>
Authored: Tue Nov 14 16:28:43 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:00 2018 -0500
----------------------------------------------------------------------
.../streams/kafka/topology/TopologyFactory.java | 414 +++++++++++++++++++
.../processors/StatementPatternProcessorIT.java | 124 ++----
.../kafka/processors/join/JoinProcessorIT.java | 86 ++--
.../kafka/topology/TopologyFactoryTest.java | 106 +++++
4 files changed, 573 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
new file mode 100644
index 0000000..782a58b
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.topology;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.rya.api.function.join.IterativeJoin;
+import org.apache.rya.api.function.join.LeftOuterJoin;
+import org.apache.rya.api.function.join.NaturalJoin;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier;
+import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.BinaryTupleOperator;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.LeftJoin;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Factory for building {@link TopologyBuilder}s from a SPARQL query.
+ */
+@DefaultAnnotation(NonNull.class)
+public class TopologyFactory implements TopologyBuilderFactory {
+ private static final String SOURCE = "SOURCE";
+ private static final String STATEMENT_PATTERN_PREFIX = "SP_";
+ private static final String JOIN_PREFIX = "JOIN_";
+ private static final String PROJECTION_PREFIX = "PROJECTION_";
+ private static final String SINK = "SINK";
+
+ private List<ProcessorEntry> processorEntryList;
+
+ /**
+ * Builds a {@link TopologyBuilder} based on the provided sparql query.
+ *
+ * @param sparqlQuery - The SPARQL query to build a topology for. (not null)
+ * @param statementTopic - The topic for the source to read from. (not null)
+ * @param resultTopic - The topic for the sink to write to. (not null)
+ * @return - The created {@link TopologyBuilder}.
+ * @throws MalformedQueryException - The provided query is not a valid SPARQL query.
+ * @throws TopologyBuilderException - A problem occurred while constructing the topology.
+ */
+ @Override
+ public TopologyBuilder build(final String sparqlQuery, final String statementTopic, final String resultTopic)
+ throws MalformedQueryException, TopologyBuilderException {
+ requireNonNull(sparqlQuery);
+ requireNonNull(statementTopic);
+ requireNonNull(resultTopic);
+
+ final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparqlQuery, null);
+ final TopologyBuilder builder = new TopologyBuilder();
+
+ final TupleExpr expr = parsedQuery.getTupleExpr();
+ final QueryVisitor visitor = new QueryVisitor();
+ expr.visit(visitor);
+
+ processorEntryList = visitor.getProcessorEntryList();
+ final Map<TupleExpr, String> idMap = visitor.getIDs();
+ // add source node
+ builder.addSource(SOURCE, new StringDeserializer(), new VisibilityStatementDeserializer(), statementTopic);
+
+ // processing the processor entry list in reverse order means we go from leaf
+ // nodes -> parent nodes.
+ // So, when the parent processing nodes get added, the upstream
+ // processing node will already exist.
+
+ ProcessorEntry entry = null;
+ for (int ii = processorEntryList.size() - 1; ii >= 0; ii--) {
+ entry = processorEntryList.get(ii);
+ //statement patterns need to be connected to the Source.
+ if(entry.getNode() instanceof StatementPattern) {
+ builder.addProcessor(entry.getID(), entry.getSupplier(), SOURCE);
+ } else {
+ final List<TupleExpr> parents = entry.getUpstreamNodes();
+ final String[] parentIDs = new String[parents.size()];
+ for (int id = 0; id < parents.size(); id++) {
+ parentIDs[id] = idMap.get(parents.get(id));
+ }
+ builder.addProcessor(entry.getID(), entry.getSupplier(), parentIDs);
+ }
+
+ if (entry.getNode() instanceof Join || entry.getNode() instanceof LeftJoin) {
+ // Add a state store for the join processor.
+ final StateStoreSupplier joinStoreSupplier =
+ Stores.create( entry.getID() )
+ .withStringKeys()
+ .withValues(new VisibilityBindingSetSerde())
+ .persistent()
+ .build();
+ builder.addStateStore(joinStoreSupplier, entry.getID());
+ }
+ }
+
+ // convert processing results to visibility binding sets
+ builder.addProcessor("OUTPUT_FORMATTER", new BindingSetOutputFormatterSupplier(), entry.getID());
+
+ // add sink
+ builder.addSink(SINK, resultTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "OUTPUT_FORMATTER");
+
+ return builder;
+ }
+
+ @VisibleForTesting
+ public List<ProcessorEntry> getProcessorEntry() {
+ return processorEntryList;
+ }
+
+ /**
+ * An entry to be added as a Processing node in kafka streams'
+ * TopologyBuilder.
+ */
+ final static class ProcessorEntry {
+ private final TupleExpr node;
+ private final String id;
+ private final Optional<Side> downstreamSide;
+ private final ProcessorSupplier<?, ?> supplier;
+ private final List<TupleExpr> upstreamNodes;
+
+ /**
+ * Creates a new {@link ProcessorEntry}.
+ *
+ * @param node - The RDF node to be added as a processor. (not null)
+ * @param id - The id for the {@link TupleExpr} node. (not null)
+ * @param downstreamSide - Which side the current node is on from its downstream processor. (not null)
+ * @param supplier - Supplies the {@link Processor} for this node. (not null)
+ * @param upstreamNodes - The RDF nodes that will become upstream processing nodes. (not null)
+ */
+ public ProcessorEntry(final TupleExpr node, final String id, final Optional<Side> downstreamSide, final ProcessorSupplier<?, ?> supplier, final List<TupleExpr> upstreamNodes) {
+ this.node = requireNonNull(node);
+ this.id = requireNonNull(id);
+ this.downstreamSide = requireNonNull(downstreamSide);
+ this.supplier = requireNonNull(supplier);
+ this.upstreamNodes = requireNonNull(upstreamNodes);
+ }
+
+ /**
+ * @return - The RDF node to added as a processor.
+ */
+ public TupleExpr getNode() {
+ return node;
+ }
+
+ /**
+ * @return - The side the node is on from its downstream processor.
+ */
+ public Optional<Side> getDownstreamSide() {
+ return downstreamSide;
+ }
+
+ /**
+ * @return - The upstream parents to this node. These parent nodes must
+ * result in a {@link ProcessorEntry}
+ */
+ public List<TupleExpr> getUpstreamNodes() {
+ return upstreamNodes;
+ }
+
+ /**
+ * @return - The processor id of the node.
+ */
+ public String getID() {
+ return id;
+ }
+
+ /**
+ * @return - The {@link ProcessorSupplier} used to supply the
+ * {@link Processor} for this node.
+ */
+ public ProcessorSupplier<?, ?> getSupplier() {
+ return supplier;
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (!(other instanceof ProcessorEntry)) {
+ return false;
+ }
+ final ProcessorEntry o = (ProcessorEntry) other;
+ return Objects.equals(node, o.node) &&
+ Objects.equals(id, o.id) &&
+ Objects.equals(downstreamSide, o.downstreamSide) &&
+ Objects.equals(supplier, o.supplier) &&
+ Objects.equals(upstreamNodes, o.upstreamNodes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(node, downstreamSide, upstreamNodes, id, supplier);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("ID: " + id + "\n");
+ if (downstreamSide.isPresent()) {
+ sb.append("***********************************\n");
+ sb.append("SIDE: " + downstreamSide.get() + "\n");
+ }
+ sb.append("***********************************\n");
+ sb.append("PARENTS: ");
+ for (final TupleExpr expr : upstreamNodes) {
+ sb.append(expr.toString() + ",");
+ }
+ sb.append("\n***********************************\n");
+ sb.append("NODE: " + node.toString());
+ sb.append("\n");
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Visits each node in a {@link TupleExpr} and creates a
+ * {@link ProcessorSupplier} and meta information needed for creating a
+ * {@link TopologyBuilder}.
+ */
+ final static class QueryVisitor extends QueryModelVisitorBase<TopologyBuilderException> {
+ // Each node needs a ProcessorEntry to be a processor node in the
+ // TopologyBuilder.
+ private final List<ProcessorEntry> entries = new ArrayList<>();
+ private final Map<TupleExpr, String> idMap = new HashMap<>();
+
+ /**
+ * @return The {@link ProcessorEntry}s used to create a Topology.
+ */
+ public List<ProcessorEntry> getProcessorEntryList() {
+ return entries;
+ }
+
+ /**
+ * @return The IDs created for each {@link TupleExpr} node in the query that resulted in a {@link ProcessorEntry}.
+ */
+ public Map<TupleExpr, String> getIDs() {
+ return idMap;
+ }
+
+ @Override
+ public void meet(final StatementPattern node) throws TopologyBuilderException {
+ // topology parent for Statement Patterns will always be a source
+ final String id = STATEMENT_PATTERN_PREFIX + UUID.randomUUID();
+ final Optional<Side> side = getSide(node);
+ final StatementPatternProcessorSupplier supplier = new StatementPatternProcessorSupplier(node, result -> getResult(side, result));
+ entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList()));
+ idMap.put(node, id);
+ super.meet(node);
+ }
+
+ @Override
+ public void meet(final Projection node) throws TopologyBuilderException {
+ final String id = PROJECTION_PREFIX + UUID.randomUUID();
+ final Optional<Side> side = getSide(node);
+ TupleExpr arg = node.getArg();
+ // If the arg is an Extension, there are rebindings that need to be
+ // ignored since they do not have a processor node.
+ if (arg instanceof Extension) {
+ arg = ((Extension) arg).getArg();
+ }
+ final ProjectionProcessorSupplier supplier = new ProjectionProcessorSupplier(node.getProjectionElemList(), result -> getResult(side, result));
+ entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(arg)));
+ idMap.put(node, id);
+ super.meet(node);
+ }
+
+ @Override
+ public void meet(final Join node) throws TopologyBuilderException {
+ final String id = JOIN_PREFIX + UUID.randomUUID();
+ meetJoin(id, new NaturalJoin(), node);
+ super.meet(node);
+ }
+
+ @Override
+ public void meet(final LeftJoin node) throws TopologyBuilderException {
+ final String id = JOIN_PREFIX + UUID.randomUUID();
+ meetJoin(id, new LeftOuterJoin(), node);
+ super.meet(node);
+ }
+
+ /**
+ * Gets the {@link Side} the current node in the visitor is on relative to the provided node.
+ * @param node - The node used to determine the side of the current visitor node.
+ * @return The {@link Side} the current node is on.
+ */
+ private Optional<Side> getSide(final QueryModelNode node) {
+ // if query parent is a binary operator, need to determine if its left or right.
+ if (node.getParentNode() instanceof BinaryTupleOperator) {
+ final BinaryTupleOperator binary = (BinaryTupleOperator) node.getParentNode();
+ if (node.equals(binary.getLeftArg())) {
+ return Optional.of(Side.LEFT);
+ } else {
+ return Optional.of(Side.RIGHT);
+ }
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Creates a join entry based on a provided {@link IterativeJoin} and the Join's
+ * {@link BinaryTupleOperator}.
+ *
+ * @param id - The ID of the join.
+ * @param joinFunction - The {@link IterativeJoin} function to perform during processing.
+ * @param node - The {@link BinaryTupleOperator} used to create the process.
+ */
+ private void meetJoin(final String id, final IterativeJoin joinFunction, final BinaryTupleOperator node) {
+ final Set<String> leftArgs = node.getLeftArg().getBindingNames();
+ final Set<String> rightArgs = node.getRightArg().getBindingNames();
+ final List<String> joinVars = Lists.newArrayList(Sets.intersection(leftArgs, rightArgs));
+
+ leftArgs.removeAll(joinVars);
+ rightArgs.removeAll(joinVars);
+
+ final List<String> otherVars = new ArrayList<>();
+ otherVars.addAll(leftArgs);
+ otherVars.addAll(rightArgs);
+
+ // the join variables need to be sorted so that when compared to all
+ // the variables, the start of the all variable list is congruent to
+ // the join var list.
+ joinVars.sort(Comparator.naturalOrder());
+ otherVars.sort(Comparator.naturalOrder());
+
+ final List<String> allVars = new ArrayList<>();
+ allVars.addAll(joinVars);
+ allVars.addAll(otherVars);
+
+ final Optional<Side> side = getSide(node);
+ final JoinProcessorSupplier supplier = new JoinProcessorSupplier(id, joinFunction, joinVars, allVars, result -> getResult(side, result));
+ entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(node.getLeftArg(), node.getRightArg())));
+ idMap.put(node, id);
+ }
+
+ /**
+ * Creates a {@link ProcessorResult} based on a side and result.
+ *
+ * @param side - If one is present, a {@link BinaryResult} is created.
+ * @param result - The result to wrap in a {@link ProcessorResult}.
+ * @return The {@link ProcessorResult} used by the {@link Processor}.
+ */
+ private ProcessorResult getResult(final Optional<Side> side, final VisibilityBindingSet result) {
+ if (side.isPresent()) {
+ return ProcessorResult.make(new BinaryResult(side.get(), result));
+ } else {
+ return ProcessorResult.make(new UnaryResult(result));
+ }
+ }
+ }
+
+ /**
+ * An Exception thrown when a problem occurs when constructing the processor
+ * topology in the {@link TopologyFactory}.
+ */
+ public class TopologyBuilderException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public TopologyBuilderException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
index 0b2ff60..e55ec2e 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -24,25 +24,18 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RdfTestUtil;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
-import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
/**
@@ -61,23 +54,10 @@ public class StatementPatternProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the StatementPattern object that will be evaluated.
- final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
-
// Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
-
- // The topic that Statements are written to is used as a source.
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
- // Add a processor that handles the first statement pattern.
- builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
- // Add a processor that formats the VisibilityBindingSet for output.
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1");
-
- // Add a sink that writes the data out to a new Kafka topic.
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
// Create a statement that generate an SP result.
final ValueFactory vf = new ValueFactoryImpl();
@@ -104,23 +84,10 @@ public class StatementPatternProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the StatementPattern object that will be evaluated.
- final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
-
// Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
-
- // The topic that Statements are written to is used as a source.
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
- // Add a processor that handles the first statement pattern.
- builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
- // Add a processor that formats the VisibilityBindingSet for output.
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1");
-
- // Add a sink that writes the data out to a new Kafka topic.
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
// Create some statements where some generates SP results and others do not.
final ValueFactory vf = new ValueFactoryImpl();
@@ -155,27 +122,13 @@ public class StatementPatternProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the StatementPattern object that will be evaluated.
- final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
- final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?person ?action <urn:Bob> }");
-
// Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
-
- // The topic that Statements are written to is used as a source.
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
- // Add a processor that handles the first statement pattern.
- builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
- // Add a processor that handles the second statement pattern.
- builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
- // Add a processor that formats the VisibilityBindingSet for output.
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1", "SP2");
-
- // Add a sink that writes the data out to a new Kafka topic.
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final String query = "SELECT * WHERE { "
+ + "?person <urn:talksTo> ?otherPerson . "
+ + "?person ?action <urn:Bob>"
+ + "}";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
// Create some statements where some generates SP results and others do not.
final ValueFactory vf = new ValueFactoryImpl();
@@ -185,14 +138,10 @@ public class StatementPatternProcessorIT {
// Show the correct binding set results from the job.
final Set<VisibilityBindingSet> expected = new HashSet<>();
- QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
- expected.add( new VisibilityBindingSet(bs, "a") );
-
- bs = new QueryBindingSet();
+ final QueryBindingSet bs = new QueryBindingSet();
bs.addBinding("person", vf.createURI("urn:Alice"));
bs.addBinding("action", vf.createURI("urn:talksTo"));
+ bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
@@ -207,27 +156,13 @@ public class StatementPatternProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the StatementPattern object that will be evaluated.
- final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }");
- final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?person ?action <urn:Bob> }");
-
// Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
-
- // The topic that Statements are written to is used as a source.
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
- // Add a processor that handles the first statement pattern.
- builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
- // Add a processor that handles the second statement pattern.
- builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
- // Add a processor that formats the VisibilityBindingSet for output.
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1", "SP2");
-
- // Add a sink that writes the data out to a new Kafka topic.
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final String query = "SELECT * WHERE { "
+ + "?person <urn:talksTo> ?otherPerson ."
+ + "?person ?action <urn:Bob>"
+ + "}";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
// Create some statements where some generates SP results and others do not.
final ValueFactory vf = new ValueFactoryImpl();
@@ -240,24 +175,17 @@ public class StatementPatternProcessorIT {
final Set<VisibilityBindingSet> expected = new HashSet<>();
QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
- expected.add( new VisibilityBindingSet(bs, "a") );
-
bs = new QueryBindingSet();
bs.addBinding("person", vf.createURI("urn:Alice"));
bs.addBinding("action", vf.createURI("urn:talksTo"));
- expected.add( new VisibilityBindingSet(bs, "a") );
-
- bs = new QueryBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
bs.addBinding("otherPerson", vf.createURI("urn:Charlie"));
- expected.add( new VisibilityBindingSet(bs, "a|b") );
+ expected.add(new VisibilityBindingSet(bs, "a&(a|b)"));
bs = new QueryBindingSet();
- bs.addBinding("person", vf.createURI("urn:Charlie"));
- bs.addBinding("action", vf.createURI("urn:walksWith"));
- expected.add( new VisibilityBindingSet(bs, "b") );
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("action", vf.createURI("urn:talksTo"));
+ bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
// Run the test.
KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index 7051efa..dbad15c 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -46,6 +46,7 @@ import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterS
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.junit.Rule;
import org.junit.Test;
@@ -82,46 +83,13 @@ public class JoinProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the StatementPatterns that will be evaluated.
- final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
- final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
-
// Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
-
- // The topic that Statements are written to is used as a source.
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
- // Add a processor that handles the first statement pattern.
- builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp,
- result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-
- // Add a processor that handles the second statement pattern.
- builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp,
- result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
- // Add a processor that handles a natrual join over the SPs.
- builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
- "NATURAL_JOIN",
- new NaturalJoin(),
- Lists.newArrayList("employee"),
- Lists.newArrayList("employee", "person", "business"),
- result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP");
-
- // Add a state store for the join processor.
- final StateStoreSupplier joinStoreSupplier =
- Stores.create( "NATURAL_JOIN" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
- builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
-
- // Add a processor that formats the VisibilityBindingSet for output.
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "NATURAL_JOIN");
-
- // Add a sink that writes the data out to a new Kafka topic.
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final String query = "SELECT * WHERE { "
+ + "?person <urn:talksTo> ?employee ."
+ + "?employee <urn:worksAt> ?business"
+ + " }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
// Create some statements that generate a bunch of right SP results.
final ValueFactory vf = new ValueFactoryImpl();
@@ -194,10 +162,10 @@ public class JoinProcessorIT {
// Add a state store for the join processor.
final StateStoreSupplier joinStoreSupplier =
Stores.create( "NATURAL_JOIN" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
+ .withStringKeys()
+ .withValues(new VisibilityBindingSetSerde())
+ .inMemory()
+ .build();
builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
// Add a processor that formats the VisibilityBindingSet for output.
@@ -277,10 +245,10 @@ public class JoinProcessorIT {
// Add a state store for the join processor.
final StateStoreSupplier joinStoreSupplier =
Stores.create( "NATURAL_JOIN" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
+ .withStringKeys()
+ .withValues(new VisibilityBindingSetSerde())
+ .inMemory()
+ .build();
builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
// Add a processor that formats the VisibilityBindingSet for output.
@@ -379,18 +347,18 @@ public class JoinProcessorIT {
// Setup the join state suppliers.
final StateStoreSupplier join1StoreSupplier =
Stores.create( "JOIN1" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
+ .withStringKeys()
+ .withValues(new VisibilityBindingSetSerde())
+ .inMemory()
+ .build();
builder.addStateStore(join1StoreSupplier, "JOIN1");
final StateStoreSupplier join2StoreSupplier =
Stores.create( "JOIN2" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
+ .withStringKeys()
+ .withValues(new VisibilityBindingSetSerde())
+ .inMemory()
+ .build();
builder.addStateStore(join2StoreSupplier, "JOIN2");
// Add a processor that formats the VisibilityBindingSet for output.
@@ -459,10 +427,10 @@ public class JoinProcessorIT {
// Add a state store for the join processor.
final StateStoreSupplier joinStoreSupplier =
Stores.create( "LEFT_JOIN" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
+ .withStringKeys()
+ .withValues(new VisibilityBindingSetSerde())
+ .inMemory()
+ .build();
builder.addStateStore(joinStoreSupplier, "LEFT_JOIN");
// Add a processor that formats the VisibilityBindingSet for output.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
new file mode 100644
index 0000000..eda4c89
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
@@ -0,0 +1,106 @@
+package org.apache.rya.streams.kafka.topology;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.rya.streams.kafka.topology.TopologyFactory.ProcessorEntry;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+
+public class TopologyFactoryTest {
+ private static TopologyFactory FACTORY;
+
+ private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+ private static final Var TALKS_TO = new Var("-const-urn:talksTo", VF.createURI("urn:talksTo"));
+ private static final Var CHEWS = new Var("-const-urn:chews", VF.createURI("urn:chews"));
+
+ static {
+ TALKS_TO.setAnonymous(true);
+ TALKS_TO.setConstant(true);
+ CHEWS.setAnonymous(true);
+ CHEWS.setConstant(true);
+ }
+
+ @Before
+ public void setup() {
+ FACTORY = new TopologyFactory();
+ }
+
+ @Test
+ public void projectionStatementPattern() throws Exception {
+ final String query = "SELECT * WHERE { "
+ + "?person <urn:talksTo> ?otherPerson . "
+ + "}";
+
+ FACTORY.build(query, "source", "sink");
+ final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
+
+ assertTrue(entries.get(0).getNode() instanceof Projection);
+ assertTrue(entries.get(1).getNode() instanceof StatementPattern);
+
+ final StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson"));
+ assertEquals(expected, entries.get(1).getNode());
+ }
+
+ @Test
+ public void projectionJoinStatementPattern() throws Exception {
+ final String query = "SELECT * WHERE { "
+ + "?person <urn:talksTo> ?otherPerson . "
+ + "?otherPerson <urn:talksTo> ?dog . "
+ + "}";
+
+ FACTORY.build(query, "source", "sink");
+ final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
+
+ assertTrue(entries.get(0).getNode() instanceof Projection);
+ assertTrue(entries.get(1).getNode() instanceof Join);
+ StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson"));
+ assertEquals(expected, entries.get(2).getNode());
+ expected = new StatementPattern(new Var("otherPerson"), TALKS_TO, new Var("dog"));
+ assertEquals(expected, entries.get(3).getNode());
+ }
+
+ @Test
+ public void projectionJoinJoinStatementPattern() throws Exception {
+ final String query = "SELECT * WHERE { "
+ + "?person <urn:talksTo> ?otherPerson . "
+ + "?otherPerson <urn:talksTo> ?dog . "
+ + "?dog <urn:chews> ?toy . "
+ + "}";
+
+ FACTORY.build(query, "source", "sink");
+ final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
+
+ assertTrue(entries.get(0).getNode() instanceof Projection);
+ assertTrue(entries.get(1).getNode() instanceof Join);
+ assertTrue(entries.get(2).getNode() instanceof Join);
+ StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson"));
+ assertEquals(expected, entries.get(3).getNode());
+ expected = new StatementPattern(new Var("otherPerson"), TALKS_TO, new Var("dog"));
+ assertEquals(expected, entries.get(4).getNode());
+ expected = new StatementPattern(new Var("dog"), CHEWS, new Var("toy"));
+ assertEquals(expected, entries.get(5).getNode());
+ }
+
+ @Test
+ public void projectionStatementPattern_rebind() throws Exception {
+ final String query = "CONSTRUCT { ?person <urn:mightKnow> ?otherPerson } WHERE { "
+ + "?person <urn:talksTo> ?otherPerson . "
+ + "}";
+
+ FACTORY.build(query, "source", "sink");
+ final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
+
+ assertTrue(entries.get(0).getNode() instanceof Projection);
+ final StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson"));
+ assertEquals(expected, entries.get(1).getNode());
+ }
+}