You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:57 UTC
[40/50] [abbrv] incubator-rya git commit: RYA-377 Added Construct
query support to Rya Streams.
RYA-377 Added Construct query support to Rya Streams.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/da63fd12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/da63fd12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/da63fd12
Branch: refs/heads/master
Commit: da63fd125e16779df3536b4172fa66e36561e4ff
Parents: 538393f
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Nov 21 18:49:13 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500
----------------------------------------------------------------------
.../projection/MultiProjectionEvaluator.java | 4 +-
.../projection/ProjectionEvaluator.java | 5 +-
.../kafka/topology/TopologyBuilderFactory.java | 40 ++-
.../streams/kafka/topology/TopologyFactory.java | 178 +++++++++++---
.../apache/rya/streams/kafka/KafkaTestUtil.java | 18 +-
.../processors/StatementPatternProcessorIT.java | 18 +-
.../kafka/processors/join/JoinProcessorIT.java | 243 ++++---------------
.../projection/MultiProjectionProcessorIT.java | 87 +------
.../projection/ProjectionProcessorIT.java | 42 +---
.../kafka/topology/TopologyFactoryTest.java | 32 ++-
10 files changed, 280 insertions(+), 387 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java
index e2b7046..0e9093d 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java
@@ -57,9 +57,9 @@ public class MultiProjectionEvaluator {
/**
* Constructs an instance of {@link MultiProjection}.
*
- * @param projections - The {@link ProjectionEvaluators} that handle each projection within the multi. (not null)
+ * @param projections - The {@link ProjectionEvaluators} that handle each projection within the MultiProjection. (not null)
* @param blankNodeSourceNames - If there are blank nodes in the projection, this is a set of their names
- * so that they may be re-label to have the same node IDs. (not null)
+ * so that they may be re-labeled to have the same node IDs. (not null)
* @param bNodeIdFactory - Creates the IDs for Blank Nodes. (not null)
*/
public MultiProjectionEvaluator(
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java
index a0b59c1..4b37448 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java
@@ -179,7 +179,10 @@ public class ProjectionEvaluator {
}
}
- result.addBinding(elem.getTargetName(), value);
+ // Only add the value if there is one. There may not be one if a binding is optional.
+ if(value != null) {
+ result.addBinding(elem.getTargetName(), value);
+ }
}
return new VisibilityBindingSet(result, bs.getVisibility());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
index 9e9dd92..666cbb0 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java
@@ -19,8 +19,8 @@
package org.apache.rya.streams.kafka.topology;
import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.BNodeIdFactory;
import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.TupleExpr;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -32,17 +32,33 @@ import edu.umd.cs.findbugs.annotations.NonNull;
public interface TopologyBuilderFactory {
/**
- * Builds a {@link TopologyBuilder} based on the provided sparql query where
- * each {@link TupleExpr} in the parsed query is a processor in the
- * topology.
+ * Builds a {@link TopologyBuilder} based on the provided SPARQL query that
+ * pulls from {@code statementsTopic} for input and writes the query's results
+ * to {@code resultsTopic}.
*
* @param sparqlQuery - The SPARQL query to build a topology for. (not null)
- * @param statementTopic - The topic for the source to read from. (not null)
- * @param statementTopic - The topic for the sink to write to. (not null)
- * @return - The created {@link TopologyBuilder}.
- * @throws MalformedQueryException - The provided query is not a valid
- * SPARQL query.
+ * @param statementsTopic - The topic for the source to read from. (not null)
+ * @param resultsTopic - The topic for the sink to write to. (not null)
+ * @param bNodeIdFactory - A factory that generates Blank Node IDs if any are required. (not null)
+ * @return The created {@link TopologyBuilder}.
+ * @throws MalformedQueryException - The provided query is not a valid SPARQL query.
+ * @throws TopologyBuilderException - A problem occurred while constructing the topology.
*/
- public TopologyBuilder build(final String sparqlQuery, final String statementTopic, final String resultTopic)
- throws Exception;
-}
+ public TopologyBuilder build(
+ final String sparqlQuery,
+ final String statementsTopic,
+ final String resultsTopic,
+ final BNodeIdFactory bNodeIdFactory) throws MalformedQueryException, TopologyBuilderException;
+
+ /**
+ * An Exception thrown when a problem occurs when constructing the processor
+ * topology in the {@link TopologyFactory}.
+ */
+ public static class TopologyBuilderException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public TopologyBuilderException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
index 782a58b..08f3625 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java
@@ -30,6 +30,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.processor.Processor;
@@ -40,6 +41,9 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.rya.api.function.join.IterativeJoin;
import org.apache.rya.api.function.join.LeftOuterJoin;
import org.apache.rya.api.function.join.NaturalJoin;
+import org.apache.rya.api.function.projection.BNodeIdFactory;
+import org.apache.rya.api.function.projection.MultiProjectionEvaluator;
+import org.apache.rya.api.function.projection.ProjectionEvaluator;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.streams.kafka.processors.ProcessorResult;
import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
@@ -48,17 +52,22 @@ import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier;
import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier;
+import org.apache.rya.streams.kafka.processors.output.StatementOutputFormatterSupplier;
+import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier;
import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.algebra.BinaryTupleOperator;
import org.openrdf.query.algebra.Extension;
import org.openrdf.query.algebra.Join;
import org.openrdf.query.algebra.LeftJoin;
+import org.openrdf.query.algebra.MultiProjection;
import org.openrdf.query.algebra.Projection;
import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.Reduced;
import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
@@ -85,34 +94,28 @@ public class TopologyFactory implements TopologyBuilderFactory {
private List<ProcessorEntry> processorEntryList;
- /**
- * Builds a {@link TopologyBuilder} based on the provided sparql query.
- *
- * @param sparqlQuery - The SPARQL query to build a topology for. (not null)
- * @param statementTopic - The topic for the source to read from. (not null)
- * @param resultTopic - The topic for the sink to write to. (not null)
- * @return - The created {@link TopologyBuilder}.
- * @throws MalformedQueryException - The provided query is not a valid SPARQL query.
- * @throws TopologyBuilderException - A problem occurred while constructing the topology.
- */
@Override
- public TopologyBuilder build(final String sparqlQuery, final String statementTopic, final String resultTopic)
+ public TopologyBuilder build(
+ final String sparqlQuery,
+ final String statementsTopic,
+ final String resultsTopic,
+ final BNodeIdFactory bNodeIdFactory)
throws MalformedQueryException, TopologyBuilderException {
requireNonNull(sparqlQuery);
- requireNonNull(statementTopic);
- requireNonNull(resultTopic);
+ requireNonNull(statementsTopic);
+ requireNonNull(resultsTopic);
final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparqlQuery, null);
final TopologyBuilder builder = new TopologyBuilder();
final TupleExpr expr = parsedQuery.getTupleExpr();
- final QueryVisitor visitor = new QueryVisitor();
+ final QueryVisitor visitor = new QueryVisitor(bNodeIdFactory);
expr.visit(visitor);
processorEntryList = visitor.getProcessorEntryList();
final Map<TupleExpr, String> idMap = visitor.getIDs();
// add source node
- builder.addSource(SOURCE, new StringDeserializer(), new VisibilityStatementDeserializer(), statementTopic);
+ builder.addSource(SOURCE, new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
// processing the processor entry list in reverse order means we go from leaf
// nodes -> parent nodes.
@@ -146,11 +149,12 @@ public class TopologyFactory implements TopologyBuilderFactory {
}
}
- // convert processing results to visibility binding sets
- builder.addProcessor("OUTPUT_FORMATTER", new BindingSetOutputFormatterSupplier(), entry.getID());
+ // Add a formatter that converts the ProcessorResults into the output format.
+ final SinkEntry<?,?> sinkEntry = visitor.getSinkEntry();
+ builder.addProcessor("OUTPUT_FORMATTER", sinkEntry.getFormatterSupplier(), entry.getID());
- // add sink
- builder.addSink(SINK, resultTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "OUTPUT_FORMATTER");
+ // Add the sink.
+ builder.addSink(SINK, resultsTopic, sinkEntry.getKeySerializer(), sinkEntry.getValueSerializer(), "OUTPUT_FORMATTER");
return builder;
}
@@ -264,16 +268,82 @@ public class TopologyFactory implements TopologyBuilderFactory {
}
/**
+ * Information about how key/value pairs need to be written to the sink.
+ *
+ * @param <K> - The type of Key that the sink uses.
+ * @param <V> - The type of Value that the sink uses.
+ */
+ private final static class SinkEntry<K, V> {
+
+ private final ProcessorSupplier<Object, ProcessorResult> formatterSupplier;
+ private final Serializer<K> keySerializer;
+ private final Serializer<V> valueSerializer;
+
+ /**
+ * Constructs an instance of {@link SinkEntry}.
+ *
+ * @param formatterSupplier - Formats {@link ProcessingResult}s for output to the sink. (not null)
+ * @param keySerializer - Serializes keys that are used to write to the sink. (not null)
+ * @param valueSerializer - Serializes values that are used to write to the sink. (not null)
+ */
+ public SinkEntry(
+ final ProcessorSupplier<Object, ProcessorResult> formatterSupplier,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {
+ this.keySerializer = requireNonNull(keySerializer);
+ this.valueSerializer = requireNonNull(valueSerializer);
+ this.formatterSupplier = requireNonNull(formatterSupplier);
+ }
+
+ /**
+ * @return Formats {@link ProcessingResult}s for output to the sink.
+ */
+ public ProcessorSupplier<Object, ProcessorResult> getFormatterSupplier() {
+ return formatterSupplier;
+ }
+
+ /**
+ * @return Serializes keys that are used to write to the sink.
+ */
+ public Serializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ /**
+ * @return Serializes values that are used to write to the sink.
+ */
+ public Serializer<V> getValueSerializer() {
+ return valueSerializer;
+ }
+ }
+
+ /**
* Visits each node in a {@link TupleExpr} and creates a
* {@link ProcessorSupplier} and meta information needed for creating a
* {@link TopologyBuilder}.
*/
final static class QueryVisitor extends QueryModelVisitorBase<TopologyBuilderException> {
- // Each node needs a ProcessorEntry to be a processor node in the
- // TopologyBuilder.
+ // Each node needs a ProcessorEntry to be a processor node in the TopologyBuilder.
private final List<ProcessorEntry> entries = new ArrayList<>();
private final Map<TupleExpr, String> idMap = new HashMap<>();
+ // Default to a Binding Set outputting sink entry.
+ private SinkEntry<?, ?> sinkEntry = new SinkEntry<>(
+ new BindingSetOutputFormatterSupplier(),
+ new StringSerializer(),
+ new VisibilityBindingSetSerializer());
+
+ private final BNodeIdFactory bNodeIdFactory;
+
+ /**
+ * Constructs an instance of {@link QueryVisitor}.
+ *
+ * @param bNodeIdFactory - Builds Blank Node IDs for the query's results. (not null)
+ */
+ public QueryVisitor(final BNodeIdFactory bNodeIdFactory) {
+ this.bNodeIdFactory = requireNonNull(bNodeIdFactory);
+ }
+
/**
* @return The {@link ProcessorEntry}s used to create a Topology.
*/
@@ -288,6 +358,23 @@ public class TopologyFactory implements TopologyBuilderFactory {
return idMap;
}
+ /**
+ * @return Information about how values are to be output by the topology to the results sink.
+ */
+ public SinkEntry<?, ?> getSinkEntry() {
+ return sinkEntry;
+ }
+
+ @Override
+ public void meet(final Reduced node) throws TopologyBuilderException {
+ // This indicates we're outputting VisibilityStatements.
+ sinkEntry = new SinkEntry<>(
+ new StatementOutputFormatterSupplier(),
+ new StringSerializer(),
+ new VisibilityStatementSerializer());
+ super.meet(node);
+ }
+
@Override
public void meet(final StatementPattern node) throws TopologyBuilderException {
// topology parent for Statement Patterns will always be a source
@@ -303,14 +390,39 @@ public class TopologyFactory implements TopologyBuilderFactory {
public void meet(final Projection node) throws TopologyBuilderException {
final String id = PROJECTION_PREFIX + UUID.randomUUID();
final Optional<Side> side = getSide(node);
- TupleExpr arg = node.getArg();
+
// If the arg is an Extension, there are rebindings that need to be
// ignored since they do not have a processor node.
- if (arg instanceof Extension) {
- arg = ((Extension) arg).getArg();
+ TupleExpr downstreamNode = node.getArg();
+ if (downstreamNode instanceof Extension) {
+ downstreamNode = ((Extension) downstreamNode).getArg();
+ }
+
+ final ProjectionProcessorSupplier supplier = new ProjectionProcessorSupplier(
+ ProjectionEvaluator.make(node),
+ result -> getResult(side, result));
+
+ entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(downstreamNode)));
+ idMap.put(node, id);
+ super.meet(node);
+ }
+
+ @Override
+ public void meet(final MultiProjection node) throws TopologyBuilderException {
+ final String id = PROJECTION_PREFIX + UUID.randomUUID();
+ final Optional<Side> side = getSide(node);
+
+ final MultiProjectionProcessorSupplier supplier = new MultiProjectionProcessorSupplier(
+ MultiProjectionEvaluator.make(node, bNodeIdFactory),
+ result -> getResult(side, result));
+
+ // If the arg is an Extension, then this node's grandchild is the next processing node.
+ TupleExpr downstreamNode = node.getArg();
+ if (downstreamNode instanceof Extension) {
+ downstreamNode = ((Extension) downstreamNode).getArg();
}
- final ProjectionProcessorSupplier supplier = new ProjectionProcessorSupplier(node.getProjectionElemList(), result -> getResult(side, result));
- entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(arg)));
+
+ entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(downstreamNode)));
idMap.put(node, id);
super.meet(node);
}
@@ -399,16 +511,4 @@ public class TopologyFactory implements TopologyBuilderFactory {
}
}
}
-
- /**
- * An Exception thrown when a problem occurs when constructing the processor
- * topology in the {@link TopologyFactory}.
- */
- public class TopologyBuilderException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public TopologyBuilderException(final String message, final Throwable cause) {
- super(message, cause);
- }
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
index 0a1a8a4..8898284 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
@@ -42,10 +42,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
@@ -144,29 +142,34 @@ public final class KafkaTestUtil {
* Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
* the results topic, and ensures the expected results match the read results.
*
- * @param kafka - The embedded kafka instance that is being tested with. (not null)
+ * @param <T> The type of value that will be consumed from the results topic.
+ * @param kafka - The embedded Kafka instance that is being tested with. (not null)
* @param statementsTopic - The topic statements will be written to. (not null)
* @param resultsTopic - The topic results will be read from. (not null)
* @param builder - The streams topology that will be executed. (not null)
* @param startupMs - How long to wait for the topology to start before writing the statements.
* @param statements - The statements that will be loaded into the topic. (not null)
* @param expected - The expected results. (not null)
+ * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
+ * values from the results topic. (not null)
* @throws Exception If any exception was thrown while running the test.
*/
- public static void runStreamProcessingTest(
+ public static <T> void runStreamProcessingTest(
final KafkaTestInstanceRule kafka,
final String statementsTopic,
final String resultsTopic,
final TopologyBuilder builder,
final int startupMs,
final List<VisibilityStatement> statements,
- final Set<VisibilityBindingSet> expected) throws Exception {
+ final Set<T> expected,
+ final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
requireNonNull(kafka);
requireNonNull(statementsTopic);
requireNonNull(resultsTopic);
requireNonNull(builder);
requireNonNull(statements);
requireNonNull(expected);
+ requireNonNull(expectedDeserializerClass);
// Explicitly create the topics that are being used.
kafka.createTopic(statementsTopic);
@@ -191,13 +194,12 @@ public final class KafkaTestUtil {
}
// Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
- try(Consumer<String, VisibilityBindingSet> consumer = KafkaTestUtil.fromStartConsumer(
- kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class)) {
+ try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) {
// Register the topic.
consumer.subscribe(Arrays.asList(resultsTopic));
// Poll for the result.
- final Set<VisibilityBindingSet> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
+ final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
// Show the correct binding sets results from the job.
assertEquals(expected, results);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
index e55ec2e..3e0e64d 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -25,11 +25,13 @@ import java.util.Set;
import java.util.UUID;
import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.junit.Rule;
@@ -57,7 +59,7 @@ public class StatementPatternProcessorIT {
// Setup a topology.
final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
final TopologyFactory factory = new TopologyFactory();
- final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Create a statement that generate an SP result.
final ValueFactory vf = new ValueFactoryImpl();
@@ -73,7 +75,7 @@ public class StatementPatternProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -87,7 +89,7 @@ public class StatementPatternProcessorIT {
// Setup a topology.
final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
final TopologyFactory factory = new TopologyFactory();
- final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Create some statements where some generates SP results and others do not.
final ValueFactory vf = new ValueFactoryImpl();
@@ -111,7 +113,7 @@ public class StatementPatternProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a|b") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -128,7 +130,7 @@ public class StatementPatternProcessorIT {
+ "?person ?action <urn:Bob>"
+ "}";
final TopologyFactory factory = new TopologyFactory();
- final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Create some statements where some generates SP results and others do not.
final ValueFactory vf = new ValueFactoryImpl();
@@ -145,7 +147,7 @@ public class StatementPatternProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -162,7 +164,7 @@ public class StatementPatternProcessorIT {
+ "?person ?action <urn:Bob>"
+ "}";
final TopologyFactory factory = new TopologyFactory();
- final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Create some statements where some generates SP results and others do not.
final ValueFactory vf = new ValueFactoryImpl();
@@ -188,6 +190,6 @@ public class StatementPatternProcessorIT {
expected.add(new VisibilityBindingSet(bs, "a"));
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index dbad15c..b137a9a 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -24,35 +24,23 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.rya.api.function.join.LeftOuterJoin;
import org.apache.rya.api.function.join.NaturalJoin;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RdfTestUtil;
import org.apache.rya.streams.kafka.processors.ProcessorResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor;
-import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.impl.MapBindingSet;
import com.google.common.collect.Lists;
@@ -84,12 +72,13 @@ public class JoinProcessorIT {
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
// Setup a topology.
- final String query = "SELECT * WHERE { "
- + "?person <urn:talksTo> ?employee ."
- + "?employee <urn:worksAt> ?business"
- + " }";
+ final String query =
+ "SELECT * WHERE { " +
+ "?person <urn:talksTo> ?employee ." +
+ "?employee <urn:worksAt> ?business" +
+ " }";
final TopologyFactory factory = new TopologyFactory();
- final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic);
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Create some statements that generate a bunch of right SP results.
final ValueFactory vf = new ValueFactoryImpl();
@@ -122,7 +111,7 @@ public class JoinProcessorIT {
expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -133,46 +122,14 @@ public class JoinProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the StatementPatterns that will be evaluated.
- final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
- final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
-
// Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
-
- // The topic that Statements are written to is used as a source.
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
- // Add a processor that handles the first statement pattern.
- builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp,
- result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
- // Add a processor that handles the second statement pattern.
- builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp,
- result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-
- // Add a processor that handles a natrual join over the SPs.
- builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
- "NATURAL_JOIN",
- new NaturalJoin(),
- Lists.newArrayList("employee"),
- Lists.newArrayList("employee", "person", "business"),
- result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP");
-
- // Add a state store for the join processor.
- final StateStoreSupplier joinStoreSupplier =
- Stores.create( "NATURAL_JOIN" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
- builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
-
- // Add a processor that formats the VisibilityBindingSet for output.
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "NATURAL_JOIN");
-
- // Add a sink that writes the data out to a new Kafka topic.
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final String query =
+ "SELECT * WHERE { " +
+ "?person <urn:talksTo> ?employee ." +
+ "?employee <urn:worksAt> ?business" +
+ " }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Create some statements that generate a bunch of right SP results.
final ValueFactory vf = new ValueFactoryImpl();
@@ -205,7 +162,7 @@ public class JoinProcessorIT {
expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -216,46 +173,14 @@ public class JoinProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the StatementPatterns that will be evaluated.
- final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
- final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
-
// Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
-
- // The topic that Statements are written to is used as a source.
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
- // Add a processor that handles the first statement pattern.
- builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp,
- result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-
- // Add a processor that handles the second statement pattern.
- builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp,
- result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
- // Add a processor that handles a natrual join over the SPs.
- builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
- "NATURAL_JOIN",
- new NaturalJoin(),
- Lists.newArrayList("employee"),
- Lists.newArrayList("employee", "person", "business"),
- result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP");
-
- // Add a state store for the join processor.
- final StateStoreSupplier joinStoreSupplier =
- Stores.create( "NATURAL_JOIN" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
- builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
-
- // Add a processor that formats the VisibilityBindingSet for output.
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "NATURAL_JOIN");
-
- // Add a sink that writes the data out to a new Kafka topic.
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final String query =
+ "SELECT * WHERE { " +
+ "?person <urn:talksTo> ?employee ." +
+ "?employee <urn:worksAt> ?business" +
+ " }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Create some statements that generate a bunch of right SP results.
final ValueFactory vf = new ValueFactoryImpl();
@@ -294,7 +219,7 @@ public class JoinProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a&c") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -305,67 +230,15 @@ public class JoinProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the StatementPatterns that will be evaluated.
- final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
- final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
- final StatementPattern sp3 = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:hourlyWage> ?wage }");
-
// Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
-
- // The topic that Statements are written to is used as a source.
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
- // Add a processor that handles the first statement pattern.
- builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1,
- result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-
- // Add a processor that handles the second statement pattern.
- builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2,
- result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
- // Add a processor that handles a natural join over SPs 1 and 2.
- builder.addProcessor("JOIN1", new JoinProcessorSupplier(
- "JOIN1",
- new NaturalJoin(),
- Lists.newArrayList("employee"),
- Lists.newArrayList("employee", "person", "business"),
- result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "SP1", "SP2");
-
- // Add a processor that handles the third statement pattern.
- builder.addProcessor("SP3", new StatementPatternProcessorSupplier(sp3,
- result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
- // Add a processor that handles a natural join over JOIN1 and SP3.
- builder.addProcessor("JOIN2", new JoinProcessorSupplier(
- "JOIN2",
- new NaturalJoin(),
- Lists.newArrayList("employee"),
- Lists.newArrayList("employee", "business", "wage"),
- result -> ProcessorResult.make( new UnaryResult(result) )), "JOIN1", "SP3");
-
- // Setup the join state suppliers.
- final StateStoreSupplier join1StoreSupplier =
- Stores.create( "JOIN1" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
- builder.addStateStore(join1StoreSupplier, "JOIN1");
-
- final StateStoreSupplier join2StoreSupplier =
- Stores.create( "JOIN2" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
- builder.addStateStore(join2StoreSupplier, "JOIN2");
-
- // Add a processor that formats the VisibilityBindingSet for output.
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "JOIN2");
-
- // Add a sink that writes the data out to a new Kafka topic.
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final String query =
+ "SELECT * WHERE { " +
+ "?person <urn:talksTo> ?employee ." +
+ "?employee <urn:worksAt> ?business ." +
+ "?employee <urn:hourlyWage> ?wage ." +
+ " }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Create some statements that generate a bunch of right SP results.
final ValueFactory vf = new ValueFactoryImpl();
@@ -387,7 +260,7 @@ public class JoinProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected);
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -398,46 +271,14 @@ public class JoinProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the StatementPatterns that will be evaluated.
- final StatementPattern requiredSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }");
- final StatementPattern optionalSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }");
-
// Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
-
- // The topic that Statements are written to is used as a source.
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
- // Add a processor that handles the first statement pattern.
- builder.addProcessor("REQUIRED_SP", new StatementPatternProcessorSupplier(requiredSp,
- result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
-
- // Add a processor that handles the second statement pattern.
- builder.addProcessor("OPTIONAL_SP", new StatementPatternProcessorSupplier(optionalSp,
- result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
- // Add a processor that handles a natrual join over the SPs.
- builder.addProcessor("LEFT_JOIN", new JoinProcessorSupplier(
- "LEFT_JOIN",
- new LeftOuterJoin(),
- Lists.newArrayList("employee"),
- Lists.newArrayList("employee", "person", "business"),
- result -> ProcessorResult.make( new UnaryResult(result) )), "REQUIRED_SP", "OPTIONAL_SP");
-
- // Add a state store for the join processor.
- final StateStoreSupplier joinStoreSupplier =
- Stores.create( "LEFT_JOIN" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
- builder.addStateStore(joinStoreSupplier, "LEFT_JOIN");
-
- // Add a processor that formats the VisibilityBindingSet for output.
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "LEFT_JOIN");
-
- // Add a sink that writes the data out to a new Kafka topic.
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final String query =
+ "SELECT * WHERE { " +
+ "?person <urn:talksTo> ?employee ." +
+ "OPTIONAL{ ?employee <urn:worksAt> ?business } " +
+ " }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Create some statements that generate a result that includes the optional value as well as one that does not.
final ValueFactory vf = new ValueFactoryImpl();
@@ -470,6 +311,6 @@ public class JoinProcessorIT {
expected.add( new VisibilityBindingSet(bs, "c") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
index ee0e55b..d71577b 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -24,29 +24,13 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.rya.api.function.join.NaturalJoin;
-import org.apache.rya.api.function.projection.MultiProjectionEvaluator;
-import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RdfTestUtil;
-import org.apache.rya.streams.kafka.processors.ProcessorResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
-import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier;
-import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter;
import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.junit.Rule;
import org.junit.Test;
@@ -54,11 +38,6 @@ import org.openrdf.model.BNode;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.model.vocabulary.RDF;
-import org.openrdf.query.algebra.MultiProjection;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.google.common.collect.Lists;
/**
* Integration tests the methods of {@link MultiProjectionProcessor}.
@@ -76,10 +55,8 @@ public class MultiProjectionProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the RDF model objects that will be used to build the query.
- final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?thing <urn:corner> ?location . }");
- final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?thing <urn:compass> ?direction . }");
- final MultiProjection multiProjection = RdfTestUtil.getMultiProjection(
+ // Create a topology for the Query that will be tested.
+ final String sparql =
"CONSTRUCT {" +
"_:b a <urn:movementObservation> ; " +
"<urn:location> ?location ; " +
@@ -88,38 +65,10 @@ public class MultiProjectionProcessorIT {
"WHERE {" +
"?thing <urn:corner> ?location ." +
"?thing <urn:compass> ?direction." +
- "}");
-
- // Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
- builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1,
- result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS");
- builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2,
- result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS");
-
- builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier(
- "NATURAL_JOIN",
- new NaturalJoin(),
- Lists.newArrayList("thing"),
- Lists.newArrayList("thing", "location", "direction"),
- result -> ProcessorResult.make( new UnaryResult(result) )), "SP1", "SP2");
-
- final StateStoreSupplier joinStoreSupplier =
- Stores.create( "NATURAL_JOIN" )
- .withStringKeys()
- .withValues(new VisibilityBindingSetSerde())
- .inMemory()
- .build();
- builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN");
+ "}";
- final String blankNodeId = UUID.randomUUID().toString();
- builder.addProcessor("MULTIPROJECTION", new MultiProjectionProcessorSupplier(
- MultiProjectionEvaluator.make(multiProjection, () -> blankNodeId),
- result -> ProcessorResult.make(new UnaryResult(result))), "NATURAL_JOIN");
-
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "MULTIPROJECTION");
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final String bNodeId = UUID.randomUUID().toString();
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, () -> bNodeId);
// Create the statements that will be input into the query.
final ValueFactory vf = new ValueFactoryImpl();
@@ -130,26 +79,14 @@ public class MultiProjectionProcessorIT {
vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:corner"), vf.createURI("urn:corner1")), "a") );
// Make the expected results.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
- final BNode blankNode = vf.createBNode(blankNodeId);
-
- MapBindingSet expectedBs = new MapBindingSet();
- expectedBs.addBinding("subject", blankNode);
- expectedBs.addBinding("predicate", RDF.TYPE);
- expectedBs.addBinding("object", vf.createURI("urn:movementObservation"));
-
- expectedBs = new MapBindingSet();
- expectedBs.addBinding("subject", blankNode);
- expectedBs.addBinding("predicate", vf.createURI("urn:direction"));
- expectedBs.addBinding("object", vf.createURI("urn:NW"));
-
+ final Set<VisibilityStatement> expected = new HashSet<>();
+ final BNode blankNode = vf.createBNode(bNodeId);
- expectedBs = new MapBindingSet();
- expectedBs.addBinding("subject", blankNode);
- expectedBs.addBinding("predicate", vf.createURI("urn:location"));
- expectedBs.addBinding("object", vf.createURI("urn:corner1"));
+ expected.add(new VisibilityStatement(vf.createStatement(blankNode, RDF.TYPE, vf.createURI("urn:movementObservation")), "a"));
+ expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:direction"), vf.createURI("urn:NW")), "a"));
+ expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a"));
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected);
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
index 99e2451..bc5f115 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -24,30 +24,20 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.ProjectionEvaluator;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RdfTestUtil;
-import org.apache.rya.streams.kafka.processors.ProcessorResult;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
-import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter;
-import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.Projection;
-import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.impl.MapBindingSet;
import com.google.common.collect.Sets;
@@ -68,34 +58,14 @@ public class ProjectionProcessorIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
- // Get the RDF model objects that will be used to build the query.
+ // Create a topology for the Query that will be tested.
final String sparql =
"SELECT (?person AS ?p) ?otherPerson " +
"WHERE { " +
"?person <urn:talksTo> ?otherPerson . " +
"}";
- final Projection projection = RdfTestUtil.getProjection(sparql);
- final StatementPattern sp = RdfTestUtil.getSp(sparql);
- // Setup a topology.
- final TopologyBuilder builder = new TopologyBuilder();
-
- // The topic that Statements are written to is used as a source.
- builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic);
-
- // Add a processor that handles the first statement pattern.
- builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
-
- // Add a processor that handles the projection.
- builder.addProcessor("P1", new ProjectionProcessorSupplier(
- ProjectionEvaluator.make(projection),
- result -> ProcessorResult.make(new UnaryResult(result))), "SP1");
-
- // Add a processor that formats the VisibilityBindingSet for output.
- builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "P1");
-
- // Add a sink that writes the data out to a new Kafka topic.
- builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
// Load some data into the input topic.
final ValueFactory vf = new ValueFactoryImpl();
@@ -110,6 +80,6 @@ public class ProjectionProcessorIT {
expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob"));
expected.add(new VisibilityBindingSet(expectedBs, "a"));
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected));
+ KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
index eda4c89..31462ec 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.rya.streams.kafka.topology;
import static org.junit.Assert.assertEquals;
@@ -5,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.util.List;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.streams.kafka.topology.TopologyFactory.ProcessorEntry;
import org.junit.Before;
import org.junit.Test;
@@ -15,6 +34,9 @@ import org.openrdf.query.algebra.Projection;
import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.algebra.Var;
+/**
+ * Unit tests the methods of {@link TopologyFactory}.
+ */
public class TopologyFactoryTest {
private static TopologyFactory FACTORY;
@@ -40,7 +62,7 @@ public class TopologyFactoryTest {
+ "?person <urn:talksTo> ?otherPerson . "
+ "}";
- FACTORY.build(query, "source", "sink");
+ FACTORY.build(query, "source", "sink", new RandomUUIDFactory());
final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
assertTrue(entries.get(0).getNode() instanceof Projection);
@@ -57,7 +79,7 @@ public class TopologyFactoryTest {
+ "?otherPerson <urn:talksTo> ?dog . "
+ "}";
- FACTORY.build(query, "source", "sink");
+ FACTORY.build(query, "source", "sink", new RandomUUIDFactory());
final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
assertTrue(entries.get(0).getNode() instanceof Projection);
@@ -76,7 +98,7 @@ public class TopologyFactoryTest {
+ "?dog <urn:chews> ?toy . "
+ "}";
- FACTORY.build(query, "source", "sink");
+ FACTORY.build(query, "source", "sink", new RandomUUIDFactory());
final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
assertTrue(entries.get(0).getNode() instanceof Projection);
@@ -96,11 +118,11 @@ public class TopologyFactoryTest {
+ "?person <urn:talksTo> ?otherPerson . "
+ "}";
- FACTORY.build(query, "source", "sink");
+ FACTORY.build(query, "source", "sink", new RandomUUIDFactory());
final List<ProcessorEntry> entries = FACTORY.getProcessorEntry();
assertTrue(entries.get(0).getNode() instanceof Projection);
final StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson"));
assertEquals(expected, entries.get(1).getNode());
}
-}
+}
\ No newline at end of file