You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:50 UTC
[33/50] [abbrv] incubator-rya git commit: RYA-377 Fixed the project
layout so that it builds even when the geoindexing profile is not enabled.
RYA-377 Fixed the project layout so that it builds even when the geoindexing profile is not enabled.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/92c85ee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/92c85ee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/92c85ee1
Branch: refs/heads/master
Commit: 92c85ee11030712289df48faed4710359b1b0601
Parents: 923448f
Author: kchilton2 <ke...@gmail.com>
Authored: Wed Jan 3 16:41:08 2018 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500
----------------------------------------------------------------------
extras/rya.streams/geo/pom.xml | 71 +++
.../kafka/processors/filter/GeoFilterIT.java | 137 ++++++
extras/rya.streams/integration/pom.xml | 69 +++
.../aggregation/AggregationProcessorIT.java | 457 +++++++++++++++++++
.../processors/filter/FilterProcessorIT.java | 86 ++++
.../processors/filter/FilterProcessorTest.java | 75 +++
.../processors/filter/TemporalFilterIT.java | 231 ++++++++++
.../kafka/processors/join/JoinProcessorIT.java | 316 +++++++++++++
.../projection/MultiProjectionProcessorIT.java | 93 ++++
.../projection/ProjectionProcessorIT.java | 86 ++++
.../sp/StatementPatternProcessorIT.java | 196 ++++++++
extras/rya.streams/kafka-test/pom.xml | 51 +++
.../rya/streams/kafka/RyaStreamsTestUtil.java | 122 +++++
extras/rya.streams/kafka/pom.xml | 32 +-
.../apache/rya/streams/kafka/RdfTestUtil.java | 131 ------
.../rya/streams/kafka/RyaStreamsTestUtil.java | 122 -----
.../processors/StatementPatternProcessorIT.java | 195 --------
.../aggregation/AggregationProcessorIT.java | 457 -------------------
.../processors/filter/FilterProcessorIT.java | 86 ----
.../processors/filter/FilterProcessorTest.java | 75 ---
.../kafka/processors/filter/GeoFilterIT.java | 137 ------
.../processors/filter/TemporalFilterIT.java | 231 ----------
.../kafka/processors/join/JoinProcessorIT.java | 316 -------------
.../projection/MultiProjectionProcessorIT.java | 92 ----
.../projection/ProjectionProcessorIT.java | 85 ----
extras/rya.streams/pom.xml | 3 +
pom.xml | 10 +
test/pom.xml | 1 +
test/rdf/pom.xml | 59 +++
.../apache/rya/streams/kafka/RdfTestUtil.java | 131 ++++++
30 files changed, 2212 insertions(+), 1941 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/geo/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/geo/pom.xml b/extras/rya.streams/geo/pom.xml
new file mode 100644
index 0000000..2f179d0
--- /dev/null
+++ b/extras/rya.streams/geo/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.streams.parent</artifactId>
+ <version>3.2.12-incubating-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rya.streams.geo-test</artifactId>
+
+ <name>Apache Rya Streams Geo Test</name>
+ <description>
+ A module that contains Geo integration tests with Rya Streams.
+ </description>
+
+ <dependencies>
+ <!-- Rya dependencies -->
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.pcj.functions.geo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.geo.common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.kafka</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.streams.kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
new file mode 100644
index 0000000..c090afa
--- /dev/null
+++ b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.filter;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.evaluation.function.Function;
+import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.io.WKTWriter;
+
+/**
+ * Integration tests the geo methods of {@link FilterProcessor}.
+ */
+public class GeoFilterIT {
+ private static final String GEO = "http://www.opengis.net/def/function/geosparql/";
+ private static final GeometryFactory GF = new GeometryFactory();
+ private static final Geometry ZERO = GF.createPoint(new Coordinate(0, 0));
+ private static final Geometry ONE = GF.createPoint(new Coordinate(1, 1));
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+ @Test
+ public void showGeoFunctionsRegistered() {
+ int count = 0;
+ final Collection<Function> funcs = FunctionRegistry.getInstance().getAll();
+ for (final Function fun : funcs) {
+ if (fun.getURI().startsWith(GEO)) {
+ count++;
+ }
+ }
+
+ // There are 30 geo functions registered, ensure that there are 30.
+ assertEquals(30, count);
+ }
+
+ @Test
+ public void showProcessorWorks() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Get the RDF model objects that will be used to build the query.
+ final String sparql =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>\n"
+ + "PREFIX geof: <" + GEO + ">\n"
+ + "SELECT * \n"
+ + "WHERE { \n"
+ + " <urn:event1> geo:asWKT ?point .\n"
+ + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + "}";
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create the statements that will be input into the query.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = getStatements();
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ final MapBindingSet bs = new MapBindingSet();
+ final WKTWriter w = new WKTWriter();
+ bs.addBinding("point", vf.createLiteral(w.write(ZERO), GeoConstants.XMLSCHEMA_OGC_WKT));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ private List<VisibilityStatement> getStatements() throws Exception {
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ // geo 2x2 points
+ statements.add(new VisibilityStatement(statement(ZERO), "a"));
+ statements.add(new VisibilityStatement(statement(ONE), "a"));
+ return statements;
+ }
+
+ private static Statement statement(final Geometry geo) {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("urn:event1");
+ final URI predicate = GeoConstants.GEO_AS_WKT;
+ final WKTWriter w = new WKTWriter();
+ final Value object = vf.createLiteral(w.write(geo), GeoConstants.XMLSCHEMA_OGC_WKT);
+ return new StatementImpl(subject, predicate, object);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/pom.xml b/extras/rya.streams/integration/pom.xml
new file mode 100644
index 0000000..26ec9f7
--- /dev/null
+++ b/extras/rya.streams/integration/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.streams.parent</artifactId>
+ <version>3.2.12-incubating-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rya.streams.integration</artifactId>
+
+ <name>Apache Rya Streams Kafka Integration Tests</name>
+ <description>
+ A module that contains Kafka Integration tests for Rya Streams.
+ </description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.streams.kafka</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.rdf</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.kafka</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.streams.kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
new file mode 100644
index 0000000..072469a
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.aggregation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier.AggregationProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Integration tests {@link AggregationProcessor}.
+ */
+public class AggregationProcessorIT {
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
+
+ @Test
+ public void count() throws Exception {
+ // A query that figures out how many books each person has.
+ final String sparql =
+ "SELECT ?person (count(?book) as ?bookCount) " +
+ "WHERE { " +
+ "?person <urn:hasBook> ?book " +
+ "} GROUP BY ?person";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "a"));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 2")), "b"));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Bob"));
+ bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("bookCount", vf.createLiteral("2", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, "a&b"));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void sum() throws Exception {
+ // A query that figures out how much food each person has.
+ final String sparql =
+ "SELECT ?person (sum(?foodCount) as ?totalFood) " +
+ "WHERE { " +
+ "?person <urn:hasFoodType> ?food . " +
+ "?food <urn:count> ?foodCount . " +
+ "} GROUP BY ?person";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:corn")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:apple")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:corn"), vf.createURI("urn:count"), vf.createLiteral(4)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:count"), vf.createLiteral(3)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("totalFood", vf.createLiteral("4", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("totalFood", vf.createLiteral("7", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void average() throws Exception {
+ // A query that figures out the average age across all people.
+ final String sparql =
+ "SELECT (avg(?age) as ?avgAge) " +
+ "WHERE { " +
+ "?person <urn:age> ?age " +
+ "}";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(3)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(2)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("avgAge", vf.createLiteral("3", XMLSchema.DECIMAL));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("avgAge", vf.createLiteral("5", XMLSchema.DECIMAL));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("avgAge", vf.createLiteral("4", XMLSchema.DECIMAL));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void min() throws Exception {
+ // A query that figures out what the youngest age is across all people.
+ final String sparql =
+ "SELECT (min(?age) as ?youngest) " +
+ "WHERE { " +
+ "?person <urn:age> ?age " +
+ "}";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(13));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(7));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(5));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void max() throws Exception {
+ // A query that figures out what the oldest age is across all people.
+ final String sparql =
+ "SELECT (max(?age) as ?oldest) " +
+ "WHERE { " +
+ "?person <urn:age> ?age " +
+ "}";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("oldest", vf.createLiteral(13));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("oldest", vf.createLiteral(14));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("oldest", vf.createLiteral(25));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void multipleGroupByVars() throws Exception {
+ // A query that contains more than one group by variable.
+ final String sparql =
+ "SELECT ?business ?employee (sum(?hours) AS ?totalHours) " +
+ "WHERE {" +
+ "?employee <urn:worksAt> ?business . " +
+ "?business <urn:hasTimecardId> ?timecardId . " +
+ "?employee <urn:hasTimecardId> ?timecardId . " +
+ "?timecardId <urn:hours> ?hours . " +
+ "} GROUP BY ?business ?employee";
+
+ // Create the statements that will be input into the query.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:timecard1"), vf.createURI("urn:hours"), vf.createLiteral(40)), ""));
+
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:timecard2"), vf.createURI("urn:hours"), vf.createLiteral(25)), ""));
+
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:timecard3"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
+
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:timecard4"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
+
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:CoffeeShop"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:timecard5"), vf.createURI("urn:hours"), vf.createLiteral(12)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+ bs.addBinding("employee", vf.createURI("urn:Alice"));
+ bs.addBinding("totalHours", vf.createLiteral("40", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+ bs.addBinding("employee", vf.createURI("urn:Alice"));
+ bs.addBinding("totalHours", vf.createLiteral("65", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("totalHours", vf.createLiteral("28", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("business", vf.createURI("urn:TacoJoint"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("totalHours", vf.createLiteral("56", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("business", vf.createURI("urn:CoffeeShop"));
+ bs.addBinding("employee", vf.createURI("urn:Alice"));
+ bs.addBinding("totalHours", vf.createLiteral("12", XMLSchema.INTEGER));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void multipleAggregations() throws Exception {
+ // A query that figures out what the youngest and oldest ages are across all people.
+ final String sparql =
+ "SELECT (min(?age) as ?youngest) (max(?age) as ?oldest) " +
+ "WHERE { " +
+ "?person <urn:age> ?age " +
+ "}";
+
+ // Create the statements that will be input into the query..
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
+ statements.add(new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(13));
+ bs.addBinding("oldest", vf.createLiteral(13));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(13));
+ bs.addBinding("oldest", vf.createLiteral(14));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(7));
+ bs.addBinding("oldest", vf.createLiteral(14));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(5));
+ bs.addBinding("oldest", vf.createLiteral(14));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ bs = new MapBindingSet();
+ bs.addBinding("youngest", vf.createLiteral(5));
+ bs.addBinding("oldest", vf.createLiteral(25));
+ expected.add(new VisibilityBindingSet(bs, ""));
+
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
new file mode 100644
index 0000000..aaa67ea
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.filter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Integration tests the methods of {@link FilterProcessor}.
+ */
+public class FilterProcessorIT {
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+ @Test
+ public void showProcessorWorks() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Get the RDF model objects that will be used to build the query.
+ final String sparql =
+ "SELECT * " +
+ "WHERE { " +
+ "FILTER(?age < 10)" +
+ "?person <urn:age> ?age " +
+ "}";
+
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create the statements that will be input into the query.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(11)), "a"));
+ statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(9)), "a"));
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ final MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("age", vf.createLiteral(9));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
new file mode 100644
index 0000000..3ff8e8d
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.filter;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.rya.api.function.filter.FilterEvaluator;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.RdfTestUtil;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Unit tests the methods of {@link FilterProcessor}.
+ */
+public class FilterProcessorTest {
+
+ @Test
+ public void showFilterFunctionIsCalled() throws Exception {
+ // Read the filter object from a SPARQL query.
+ final Filter filter = RdfTestUtil.getFilter(
+ "SELECT * " +
+ "WHERE { " +
+ "FILTER(?age < 10)" +
+ "?person <urn:age> ?age " +
+ "}");
+
+ // Create a Binding Set that will be passed into the Filter function based on the where clause.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("age", vf.createLiteral(9));
+ final VisibilityBindingSet inputVisBs = new VisibilityBindingSet(bs, "a");
+
+ // Mock the processor context that will be invoked.
+ final ProcessorContext context = mock(ProcessorContext.class);
+
+ // Run the test.
+ final FilterProcessor processor = new FilterProcessor(
+ FilterEvaluator.make(filter),
+ result -> ProcessorResult.make(new UnaryResult(result)));
+ processor.init(context);
+ processor.process("key", ProcessorResult.make(new UnaryResult(inputVisBs)));
+
+ // Verify the binding set was passed through.
+ verify(context, times(1)).forward(eq("key"), eq(ProcessorResult.make(new UnaryResult(inputVisBs))));
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
new file mode 100644
index 0000000..22a883b
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.filter;
+
+import static org.junit.Assert.assertEquals;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.evaluation.function.Function;
+import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Integration tests the temporal methods of {@link FilterProcessor}.
+ */
+public class TemporalFilterIT {
+ private static final ValueFactory vf = new ValueFactoryImpl();
+ private static final String TEMPORAL = "http://rya.apache.org/ns/temporal";
+ private static final ZonedDateTime TIME = ZonedDateTime.parse("2015-12-30T12:00:00Z");
+ private static final ZonedDateTime TIME_10 = ZonedDateTime.parse("2015-12-30T12:00:10Z");
+ private static final ZonedDateTime TIME_20 = ZonedDateTime.parse("2015-12-30T12:00:20Z");
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
+
+ @Test
+ public void temporalFunctionsRegistered() {
+ int count = 0;
+ final Collection<Function> funcs = FunctionRegistry.getInstance().getAll();
+ for (final Function fun : funcs) {
+ if (fun.getURI().startsWith(TEMPORAL)) {
+ count++;
+ }
+ }
+
+ // There are 4 temporal functions registered, ensure that there are 4.
+ assertEquals(4, count);
+ }
+
+ @Test
+ public void showEqualsWorks() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Get the RDF model objects that will be used to build the query.
+ final String sparql =
+ "PREFIX time: <http://www.w3.org/2006/time/> \n"
+ + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n"
+ + "SELECT * \n"
+ + "WHERE { \n"
+ + " <urn:time> time:atTime ?date .\n"
+ + " FILTER(tempf:equals(?date, \"" + TIME.toString() + "\")) "
+ + "}";
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create the statements that will be input into the query.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = getStatements();
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ final MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("date", vf.createLiteral(TIME.toString()));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void showBeforeWorks() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Get the RDF model objects that will be used to build the query.
+ final String sparql =
+ "PREFIX time: <http://www.w3.org/2006/time/> \n"
+ + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n"
+ + "SELECT * \n"
+ + "WHERE { \n"
+ + " <urn:time> time:atTime ?date .\n"
+ + " FILTER(tempf:before(?date, \"" + TIME_10.toString() + "\")) "
+ + "}";
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create the statements that will be input into the query.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = getStatements();
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ final MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("date", vf.createLiteral(TIME.toString()));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void showAfterWorks() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Get the RDF model objects that will be used to build the query.
+ final String sparql =
+ "PREFIX time: <http://www.w3.org/2006/time/> \n"
+ + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n"
+ + "SELECT * \n"
+ + "WHERE { \n"
+ + " <urn:time> time:atTime ?date .\n"
+ + " FILTER(tempf:after(?date, \"" + TIME_10.toString() + "\")) "
+ + "}";
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create the statements that will be input into the query.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = getStatements();
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ final MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("date", vf.createLiteral(TIME_20.toString()));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void showWithinWorks() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Get the RDF model objects that will be used to build the query.
+ final String sparql =
+ "PREFIX time: <http://www.w3.org/2006/time/> \n"
+ + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n"
+ + "SELECT * \n"
+ + "WHERE { \n"
+ + " <urn:time> time:atTime ?date .\n"
+ + " FILTER(tempf:within(?date, \"" + TIME.toString() + "/" + TIME_20.toString() + "\")) "
+ + "}";
+ // Setup a topology.
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create the statements that will be input into the query.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = getStatements();
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ final MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("date", vf.createLiteral(TIME_10.toString()));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ private List<VisibilityStatement> getStatements() throws Exception {
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(statement(TIME), "a"));
+ statements.add(new VisibilityStatement(statement(TIME_10), "a"));
+ statements.add(new VisibilityStatement(statement(TIME_20), "a"));
+ return statements;
+ }
+
+ private static Statement statement(final ZonedDateTime time) {
+ final Resource subject = vf.createURI("urn:time");
+ final URI predicate = vf.createURI("http://www.w3.org/2006/time/atTime");
+ final Value object = vf.createLiteral(time.toString());
+ return new StatementImpl(subject, predicate, object);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
new file mode 100644
index 0000000..bdb9be6
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.join.NaturalJoin;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration tests the methods of {@link JoinProcessor}.
+ */
+public class JoinProcessorIT {
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+ @Test(expected = IllegalArgumentException.class)
+ public void badAllVars() throws IllegalArgumentException {
+ new JoinProcessorSupplier(
+ "NATURAL_JOIN",
+ new NaturalJoin(),
+ Lists.newArrayList("employee"),
+ Lists.newArrayList("person", "employee", "business"),
+ result -> ProcessorResult.make( new UnaryResult(result) ));
+ }
+
+ @Test
+ public void newLeftResult() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final String query =
+ "SELECT * WHERE { " +
+ "?person <urn:talksTo> ?employee ." +
+ "?employee <urn:worksAt> ?business" +
+ " }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create some statements that generate a bunch of right SP results.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+
+ // Add a statement that will generate a left result that joins with some of those right results.
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+ expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+ expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void newRightResult() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final String query =
+ "SELECT * WHERE { " +
+ "?person <urn:talksTo> ?employee ." +
+ "?employee <urn:worksAt> ?business" +
+ " }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create some statements that generate a bunch of right SP results.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+
+ // Add a statement that will generate a left result that joins with some of those right results.
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+ expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+ expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void newResultsBothSides() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final String query =
+ "SELECT * WHERE { " +
+ "?person <urn:talksTo> ?employee ." +
+ "?employee <urn:worksAt> ?business" +
+ " }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create some statements that generate a bunch of right SP results.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") );
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+ expected.add( new VisibilityBindingSet(bs, "a&b&c") );
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+ expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Bob"));
+ bs.addBinding("employee", vf.createURI("urn:Charlie"));
+ bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+ expected.add( new VisibilityBindingSet(bs, "a&c") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void manyJoins() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final String query =
+ "SELECT * WHERE { " +
+ "?person <urn:talksTo> ?employee ." +
+ "?employee <urn:worksAt> ?business ." +
+ "?employee <urn:hourlyWage> ?wage ." +
+ " }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create some statements that generate a bunch of right SP results.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hourlyWage"), vf.createLiteral(7.25)), "a") );
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ final MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+ bs.addBinding("wage", vf.createLiteral(7.25));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void leftJoin() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final String query =
+ "SELECT * WHERE { " +
+ "?person <urn:talksTo> ?employee ." +
+ "OPTIONAL{ ?employee <urn:worksAt> ?business } " +
+ " }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create some statements that generate a result that includes the optional value as well as one that does not.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "b") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "d") );
+
+ // Make the expected results.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("employee", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:TacoPlace"));
+ expected.add( new VisibilityBindingSet(bs, "a&b") );
+
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Bob"));
+ bs.addBinding("employee", vf.createURI("urn:Charlie"));
+ expected.add( new VisibilityBindingSet(bs, "c") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
new file mode 100644
index 0000000..a560294
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.projection;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.BNode;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.RDF;
+
+/**
+ * Integration tests the methods of {@link MultiProjectionProcessor}.
+ */
+public class MultiProjectionProcessorIT {
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+ @Test
+ public void showProcessorWorks() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Create a topology for the Query that will be tested.
+ final String sparql =
+ "CONSTRUCT {" +
+ "_:b a <urn:movementObservation> ; " +
+ "<urn:location> ?location ; " +
+ "<urn:direction> ?direction ; " +
+ "}" +
+ "WHERE {" +
+ "?thing <urn:corner> ?location ." +
+ "?thing <urn:compass> ?direction." +
+ "}";
+
+ final String bNodeId = UUID.randomUUID().toString();
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, () -> bNodeId);
+
+ // Create the statements that will be input into the query.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:compass"), vf.createURI("urn:NW")), "a") );
+ statements.add( new VisibilityStatement(
+ vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:corner"), vf.createURI("urn:corner1")), "a") );
+
+ // Make the expected results.
+ final Set<VisibilityStatement> expected = new HashSet<>();
+ final BNode blankNode = vf.createBNode(bNodeId);
+
+ expected.add(new VisibilityStatement(vf.createStatement(blankNode, RDF.TYPE, vf.createURI("urn:movementObservation")), "a"));
+ expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:direction"), vf.createURI("urn:NW")), "a"));
+ expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a"));
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityStatementDeserializer.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
new file mode 100644
index 0000000..322bba9
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.projection;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link StatementPatternProcessor}.
+ */
+public class ProjectionProcessorIT {
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+ @Test
+ public void showProcessorWorks() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Create a topology for the Query that will be tested.
+ final String sparql =
+ "SELECT (?person AS ?p) ?otherPerson " +
+ "WHERE { " +
+ "?person <urn:talksTo> ?otherPerson . " +
+ "}";
+
+ final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Load some data into the input topic.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+ // Show the correct binding set results from the job.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+ final MapBindingSet expectedBs = new MapBindingSet();
+ expectedBs.addBinding("p", vf.createURI("urn:Alice"));
+ expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+ expected.add(new VisibilityBindingSet(expectedBs, "a"));
+
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
+ }
+}
\ No newline at end of file