You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:49 UTC
[32/50] [abbrv] incubator-rya git commit: RYA-377 Fixed the project
layout so that it builds even when the geoindexing profile is not enabled.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
new file mode 100644
index 0000000..ba11e57
--- /dev/null
+++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/sp/StatementPatternProcessorIT.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.sp;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+/**
+ * Integration tests the methods of {@link StatementPatternProcessor}.
+ */
+public class StatementPatternProcessorIT {
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+ @Test
+ public void singlePattern_singleStatement() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create a statement that generate an SP result.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+ // Show the correct binding set results from the job.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+ final QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void singlePattern_manyStatements() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create some statements where some generates SP results and others do not.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+ statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoin")), "b") );
+ statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Alice")), "a|b") );
+ statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "c") );
+
+ // Show the correct binding set results from the job.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ bs = new QueryBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Bob"));
+ bs.addBinding("otherPerson", vf.createURI("urn:Alice"));
+ expected.add( new VisibilityBindingSet(bs, "a|b") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void multiplePatterns_singleStatement() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final String query = "SELECT * WHERE { "
+ + "?person <urn:talksTo> ?otherPerson . "
+ + "?person ?action <urn:Bob>"
+ + "}";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create some statements where some generates SP results and others do not.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+ // Show the correct binding set results from the job.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+ final QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("action", vf.createURI("urn:talksTo"));
+ bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+ expected.add( new VisibilityBindingSet(bs, "a") );
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+
+ @Test
+ public void multiplePatterns_multipleStatements() throws Exception {
+ // Enumerate some topics that will be re-used
+ final String ryaInstance = UUID.randomUUID().toString();
+ final UUID queryId = UUID.randomUUID();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // Setup a topology.
+ final String query = "SELECT * WHERE { "
+ + "?person <urn:talksTo> ?otherPerson ."
+ + "?person ?action <urn:Bob>"
+ + "}";
+ final TopologyFactory factory = new TopologyFactory();
+ final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+ // Create some statements where some generates SP results and others do not.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+ statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "a|b") );
+ statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:walksWith"), vf.createURI("urn:Bob")), "b") );
+
+ // Show the correct binding set results from the job.
+ final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+ QueryBindingSet bs = new QueryBindingSet();
+ bs = new QueryBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("action", vf.createURI("urn:talksTo"));
+ bs.addBinding("otherPerson", vf.createURI("urn:Charlie"));
+ expected.add(new VisibilityBindingSet(bs, "a&(a|b)"));
+
+ bs = new QueryBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("action", vf.createURI("urn:talksTo"));
+ bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+
+ // Run the test.
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka-test/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka-test/pom.xml b/extras/rya.streams/kafka-test/pom.xml
new file mode 100644
index 0000000..4a423e2
--- /dev/null
+++ b/extras/rya.streams/kafka-test/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.streams.parent</artifactId>
+ <version>3.2.12-incubating-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rya.streams.kafka-test</artifactId>
+
+ <name>Apache Rya Streams Kafka Test</name>
+ <description>
+ A common test jar containing utilities used to run Kafka based Rya
+ Streams integration tests.
+ </description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.streams.kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
new file mode 100644
index 0000000..ee25f8c
--- /dev/null
+++ b/extras/rya.streams/kafka-test/src/main/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
+
+import com.google.common.collect.Sets;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Utility functions that make it easier to test Rya Streams applications.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaStreamsTestUtil {
+
+ /**
+ * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
+ * the results topic, and ensures the expected results match the read results.
+ *
+ * @param <T> The type of value that will be consumed from the results topic.
+ * @param kafka - The embedded Kafka instance that is being tested with. (not null)
+ * @param statementsTopic - The topic statements will be written to. (not null)
+ * @param resultsTopic - The topic results will be read from. (not null)
+ * @param builder - The streams topology that will be executed. (not null)
+ * @param statements - The statements that will be loaded into the topic. (not null)
+ * @param expected - The expected results. (not null)
+ * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
+ * values from the results topic. (not null)
+ * @throws Exception If any exception was thrown while running the test.
+ */
+ public static <T> void runStreamProcessingTest(
+ final KafkaTestInstanceRule kafka,
+ final String statementsTopic,
+ final String resultsTopic,
+ final TopologyBuilder builder,
+ final List<VisibilityStatement> statements,
+ final Set<T> expected,
+ final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
+ requireNonNull(kafka);
+ requireNonNull(statementsTopic);
+ requireNonNull(resultsTopic);
+ requireNonNull(builder);
+ requireNonNull(statements);
+ requireNonNull(expected);
+ requireNonNull(expectedDeserializerClass);
+
+ // Explicitly create the topics that are being used.
+ kafka.createTopic(statementsTopic);
+ kafka.createTopic(resultsTopic);
+
+ // Start the streams program.
+ final Properties props = kafka.createBootstrapServerConfig();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT");
+
+ final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
+ streams.cleanUp();
+ try {
+ streams.start();
+
+ // Wait for the streams application to start. Streams only see data after their consumers are connected.
+ Thread.sleep(6000);
+
+ // Load the statements into the input topic.
+ try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
+ kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
+ new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
+ }
+
+ // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
+ try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) {
+ // Register the topic.
+ consumer.subscribe(Arrays.asList(resultsTopic));
+
+ // Poll for the result.
+ final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
+
+ // Show the correct binding sets results from the job.
+ assertEquals(expected, results);
+ }
+ } finally {
+ streams.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 8926870..778630d 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -36,23 +36,22 @@ under the License.
<profiles>
<profile>
<id>geoindexing</id>
- <dependencies>
- <!-- Rya dependencies -->
- <dependency>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.pcj.functions.geo</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.geo.common</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
+ <dependencies>
+ <!-- Rya dependencies -->
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.pcj.functions.geo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.geo.common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
</profile>
</profiles>
-
<dependencies>
<!-- Rya dependencies -->
<dependency>
@@ -106,6 +105,11 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.rdf</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
<artifactId>rya.test.kafka</artifactId>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java
deleted file mode 100644
index b4388c3..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.openrdf.query.algebra.Filter;
-import org.openrdf.query.algebra.MultiProjection;
-import org.openrdf.query.algebra.Projection;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import edu.umd.cs.findbugs.annotations.Nullable;
-
-/**
- * A set of utility functions that are useful when writing tests RDF functions.
- */
-@DefaultAnnotation(NonNull.class)
-public final class RdfTestUtil {
-
- private RdfTestUtil() { }
-
- /**
- * Fetch the {@link StatementPattern} from a SPARQL string.
- *
- * @param sparql - A SPARQL query that contains only a single Statement Patern. (not nul)
- * @return The {@link StatementPattern} that was in the query, if it could be found. Otherwise {@code null}
- * @throws Exception The statement pattern could not be found in the parsed SPARQL query.
- */
- public static @Nullable StatementPattern getSp(final String sparql) throws Exception {
- requireNonNull(sparql);
-
- final AtomicReference<StatementPattern> statementPattern = new AtomicReference<>();
- final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
- parsed.getTupleExpr().visitChildren(new QueryModelVisitorBase<Exception>() {
- @Override
- public void meet(final StatementPattern node) throws Exception {
- statementPattern.set(node);
- }
- });
- return statementPattern.get();
- }
-
- /**
- * Get the first {@link Projection} node from a SPARQL query.
- *
- * @param sparql - The query that contains a single Projection node.
- * @return The first {@link Projection} that is encountered.
- * @throws Exception The query could not be parsed.
- */
- public static @Nullable Projection getProjection(final String sparql) throws Exception {
- requireNonNull(sparql);
-
- final AtomicReference<Projection> projection = new AtomicReference<>();
- final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
- parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() {
- @Override
- public void meet(final Projection node) throws Exception {
- projection.set(node);
- }
- });
-
- return projection.get();
- }
-
- /**
- * Get the first {@link MultiProjection} node from a SPARQL query.
- *
- * @param sparql - The query that contains a single Projection node.
- * @return The first {@link MultiProjection} that is encountered.
- * @throws Exception The query could not be parsed.
- */
- public static @Nullable MultiProjection getMultiProjection(final String sparql) throws Exception {
- requireNonNull(sparql);
-
- final AtomicReference<MultiProjection> multiProjection = new AtomicReference<>();
- final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
- parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() {
- @Override
- public void meet(final MultiProjection node) throws Exception {
- multiProjection.set(node);
- }
- });
-
- return multiProjection.get();
- }
-
- /**
- * Get the first {@link Filter} node from a SPARQL query.
- *
- * @param sparql - The query that contains a single Projection node.
- * @return The first {@link Filter} that is encountered.
- * @throws Exception The query could not be parsed.
- */
- public static @Nullable Filter getFilter(final String sparql) throws Exception {
- requireNonNull(sparql);
-
- final AtomicReference<Filter> filter = new AtomicReference<>();
- final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
- parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() {
- @Override
- public void meet(final Filter node) throws Exception {
- filter.set(node);
- }
- });
-
- return filter.get();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
deleted file mode 100644
index ee25f8c..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka;
-
-import static java.util.Objects.requireNonNull;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.apache.rya.test.kafka.KafkaTestUtil;
-
-import com.google.common.collect.Sets;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Utility functions that make it easier to test Rya Streams applications.
- */
-@DefaultAnnotation(NonNull.class)
-public class RyaStreamsTestUtil {
-
- /**
- * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
- * the results topic, and ensures the expected results match the read results.
- *
- * @param <T> The type of value that will be consumed from the results topic.
- * @param kafka - The embedded Kafka instance that is being tested with. (not null)
- * @param statementsTopic - The topic statements will be written to. (not null)
- * @param resultsTopic - The topic results will be read from. (not null)
- * @param builder - The streams topology that will be executed. (not null)
- * @param statements - The statements that will be loaded into the topic. (not null)
- * @param expected - The expected results. (not null)
- * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
- * values from the results topic. (not null)
- * @throws Exception If any exception was thrown while running the test.
- */
- public static <T> void runStreamProcessingTest(
- final KafkaTestInstanceRule kafka,
- final String statementsTopic,
- final String resultsTopic,
- final TopologyBuilder builder,
- final List<VisibilityStatement> statements,
- final Set<T> expected,
- final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
- requireNonNull(kafka);
- requireNonNull(statementsTopic);
- requireNonNull(resultsTopic);
- requireNonNull(builder);
- requireNonNull(statements);
- requireNonNull(expected);
- requireNonNull(expectedDeserializerClass);
-
- // Explicitly create the topics that are being used.
- kafka.createTopic(statementsTopic);
- kafka.createTopic(resultsTopic);
-
- // Start the streams program.
- final Properties props = kafka.createBootstrapServerConfig();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT");
-
- final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
- streams.cleanUp();
- try {
- streams.start();
-
- // Wait for the streams application to start. Streams only see data after their consumers are connected.
- Thread.sleep(6000);
-
- // Load the statements into the input topic.
- try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
- kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
- new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
- }
-
- // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
- try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) {
- // Register the topic.
- consumer.subscribe(Arrays.asList(resultsTopic));
-
- // Poll for the result.
- final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
-
- // Show the correct binding sets results from the job.
- assertEquals(expected, results);
- }
- } finally {
- streams.close();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
deleted file mode 100644
index 33dc945..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-/**
- * Integration tests the methods of {@link StatementPatternProcessor}.
- */
-public class StatementPatternProcessorIT {
-
- @Rule
- public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
-
- @Test
- public void singlePattern_singleStatement() throws Exception {
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
- final TopologyFactory factory = new TopologyFactory();
- final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Create a statement that generate an SP result.
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-
- // Show the correct binding set results from the job.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
-
- final QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
- expected.add( new VisibilityBindingSet(bs, "a") );
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-
- @Test
- public void singlePattern_manyStatements() throws Exception {
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }";
- final TopologyFactory factory = new TopologyFactory();
- final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Create some statements where some generates SP results and others do not.
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
- statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoin")), "b") );
- statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Alice")), "a|b") );
- statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "c") );
-
- // Show the correct binding set results from the job.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
-
- QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
- expected.add( new VisibilityBindingSet(bs, "a") );
-
- bs = new QueryBindingSet();
- bs.addBinding("person", vf.createURI("urn:Bob"));
- bs.addBinding("otherPerson", vf.createURI("urn:Alice"));
- expected.add( new VisibilityBindingSet(bs, "a|b") );
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-
- @Test
- public void multiplePatterns_singleStatement() throws Exception {
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final String query = "SELECT * WHERE { "
- + "?person <urn:talksTo> ?otherPerson . "
- + "?person ?action <urn:Bob>"
- + "}";
- final TopologyFactory factory = new TopologyFactory();
- final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Create some statements where some generates SP results and others do not.
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-
- // Show the correct binding set results from the job.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
-
- final QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("action", vf.createURI("urn:talksTo"));
- bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
- expected.add( new VisibilityBindingSet(bs, "a") );
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-
- @Test
- public void multiplePatterns_multipleStatements() throws Exception {
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final String query = "SELECT * WHERE { "
- + "?person <urn:talksTo> ?otherPerson ."
- + "?person ?action <urn:Bob>"
- + "}";
- final TopologyFactory factory = new TopologyFactory();
- final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Create some statements where some generates SP results and others do not.
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
- statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "a|b") );
- statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:walksWith"), vf.createURI("urn:Bob")), "b") );
-
- // Show the correct binding set results from the job.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
-
- QueryBindingSet bs = new QueryBindingSet();
- bs = new QueryBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("action", vf.createURI("urn:talksTo"));
- bs.addBinding("otherPerson", vf.createURI("urn:Charlie"));
- expected.add(new VisibilityBindingSet(bs, "a&(a|b)"));
-
- bs = new QueryBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("action", vf.createURI("urn:talksTo"));
- bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
- expected.add(new VisibilityBindingSet(bs, "a"));
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
deleted file mode 100644
index 072469a..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.aggregation;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier.AggregationProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.impl.MapBindingSet;
-
-/**
- * Integration tests {@link AggregationProcessor}.
- */
-public class AggregationProcessorIT {
-
- @Rule
- public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
-
- @Test
- public void count() throws Exception {
- // A query that figures out how many books each person has.
- final String sparql =
- "SELECT ?person (count(?book) as ?bookCount) " +
- "WHERE { " +
- "?person <urn:hasBook> ?book " +
- "} GROUP BY ?person";
-
- // Create the statements that will be input into the query..
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "a"));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 2")), "b"));
-
- // Make the expected results.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
- MapBindingSet bs = new MapBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
- expected.add(new VisibilityBindingSet(bs, "a"));
-
- bs = new MapBindingSet();
- bs.addBinding("person", vf.createURI("urn:Bob"));
- bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("bookCount", vf.createLiteral("2", XMLSchema.INTEGER));
- expected.add(new VisibilityBindingSet(bs, "a&b"));
-
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-
- @Test
- public void sum() throws Exception {
- // A query that figures out how much food each person has.
- final String sparql =
- "SELECT ?person (sum(?foodCount) as ?totalFood) " +
- "WHERE { " +
- "?person <urn:hasFoodType> ?food . " +
- "?food <urn:count> ?foodCount . " +
- "} GROUP BY ?person";
-
- // Create the statements that will be input into the query..
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:corn")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:apple")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:corn"), vf.createURI("urn:count"), vf.createLiteral(4)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:count"), vf.createLiteral(3)), ""));
-
- // Make the expected results.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
- MapBindingSet bs = new MapBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("totalFood", vf.createLiteral("4", XMLSchema.INTEGER));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("totalFood", vf.createLiteral("7", XMLSchema.INTEGER));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-
- @Test
- public void average() throws Exception {
- // A query that figures out the average age across all people.
- final String sparql =
- "SELECT (avg(?age) as ?avgAge) " +
- "WHERE { " +
- "?person <urn:age> ?age " +
- "}";
-
- // Create the statements that will be input into the query..
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(3)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(2)), ""));
-
- // Make the expected results.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
- MapBindingSet bs = new MapBindingSet();
- bs.addBinding("avgAge", vf.createLiteral("3", XMLSchema.DECIMAL));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("avgAge", vf.createLiteral("5", XMLSchema.DECIMAL));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("avgAge", vf.createLiteral("4", XMLSchema.DECIMAL));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-
- @Test
- public void min() throws Exception {
- // A query that figures out what the youngest age is across all people.
- final String sparql =
- "SELECT (min(?age) as ?youngest) " +
- "WHERE { " +
- "?person <urn:age> ?age " +
- "}";
-
- // Create the statements that will be input into the query..
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
-
- // Make the expected results.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
- MapBindingSet bs = new MapBindingSet();
- bs.addBinding("youngest", vf.createLiteral(13));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("youngest", vf.createLiteral(7));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("youngest", vf.createLiteral(5));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-
- @Test
- public void max() throws Exception {
- // A query that figures out what the oldest age is across all people.
- final String sparql =
- "SELECT (max(?age) as ?oldest) " +
- "WHERE { " +
- "?person <urn:age> ?age " +
- "}";
-
- // Create the statements that will be input into the query..
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
-
- // Make the expected results.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
- MapBindingSet bs = new MapBindingSet();
- bs.addBinding("oldest", vf.createLiteral(13));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("oldest", vf.createLiteral(14));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("oldest", vf.createLiteral(25));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-
- @Test
- public void multipleGroupByVars() throws Exception {
- // A query that contains more than one group by variable.
- final String sparql =
- "SELECT ?business ?employee (sum(?hours) AS ?totalHours) " +
- "WHERE {" +
- "?employee <urn:worksAt> ?business . " +
- "?business <urn:hasTimecardId> ?timecardId . " +
- "?employee <urn:hasTimecardId> ?timecardId . " +
- "?timecardId <urn:hours> ?hours . " +
- "} GROUP BY ?business ?employee";
-
- // Create the statements that will be input into the query.
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:timecard1"), vf.createURI("urn:hours"), vf.createLiteral(40)), ""));
-
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:timecard2"), vf.createURI("urn:hours"), vf.createLiteral(25)), ""));
-
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:timecard3"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
-
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:timecard4"), vf.createURI("urn:hours"), vf.createLiteral(28)), ""));
-
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:CoffeeShop"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:timecard5"), vf.createURI("urn:hours"), vf.createLiteral(12)), ""));
-
- // Make the expected results.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
- MapBindingSet bs = new MapBindingSet();
- bs.addBinding("business", vf.createURI("urn:TacoJoint"));
- bs.addBinding("employee", vf.createURI("urn:Alice"));
- bs.addBinding("totalHours", vf.createLiteral("40", XMLSchema.INTEGER));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("business", vf.createURI("urn:TacoJoint"));
- bs.addBinding("employee", vf.createURI("urn:Alice"));
- bs.addBinding("totalHours", vf.createLiteral("65", XMLSchema.INTEGER));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("business", vf.createURI("urn:TacoJoint"));
- bs.addBinding("employee", vf.createURI("urn:Bob"));
- bs.addBinding("totalHours", vf.createLiteral("28", XMLSchema.INTEGER));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("business", vf.createURI("urn:TacoJoint"));
- bs.addBinding("employee", vf.createURI("urn:Bob"));
- bs.addBinding("totalHours", vf.createLiteral("56", XMLSchema.INTEGER));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("business", vf.createURI("urn:CoffeeShop"));
- bs.addBinding("employee", vf.createURI("urn:Alice"));
- bs.addBinding("totalHours", vf.createLiteral("12", XMLSchema.INTEGER));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-
- @Test
- public void multipleAggregations() throws Exception {
- // A query that figures out what the youngest and oldest ages are across all people.
- final String sparql =
- "SELECT (min(?age) as ?youngest) (max(?age) as ?oldest) " +
- "WHERE { " +
- "?person <urn:age> ?age " +
- "}";
-
- // Create the statements that will be input into the query..
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), ""));
- statements.add(new VisibilityStatement(
- vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), ""));
-
- // Make the expected results.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
- MapBindingSet bs = new MapBindingSet();
- bs.addBinding("youngest", vf.createLiteral(13));
- bs.addBinding("oldest", vf.createLiteral(13));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("youngest", vf.createLiteral(13));
- bs.addBinding("oldest", vf.createLiteral(14));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("youngest", vf.createLiteral(7));
- bs.addBinding("oldest", vf.createLiteral(14));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("youngest", vf.createLiteral(5));
- bs.addBinding("oldest", vf.createLiteral(14));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- bs = new MapBindingSet();
- bs.addBinding("youngest", vf.createLiteral(5));
- bs.addBinding("oldest", vf.createLiteral(25));
- expected.add(new VisibilityBindingSet(bs, ""));
-
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Setup a topology.
- final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
deleted file mode 100644
index aaa67ea..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.filter;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.impl.MapBindingSet;
-
-/**
- * Integration tests the methods of {@link FilterProcessor}.
- */
-public class FilterProcessorIT {
-
- @Rule
- public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
-
- @Test
- public void showProcessorWorks() throws Exception {
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Get the RDF model objects that will be used to build the query.
- final String sparql =
- "SELECT * " +
- "WHERE { " +
- "FILTER(?age < 10)" +
- "?person <urn:age> ?age " +
- "}";
-
- // Setup a topology.
- final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Create the statements that will be input into the query.
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = new ArrayList<>();
- statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(11)), "a"));
- statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(9)), "a"));
-
- // Make the expected results.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
- final MapBindingSet bs = new MapBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("age", vf.createLiteral(9));
- expected.add( new VisibilityBindingSet(bs, "a") );
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
deleted file mode 100644
index 3ff8e8d..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.filter;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.rya.api.function.filter.FilterEvaluator;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.streams.kafka.RdfTestUtil;
-import org.apache.rya.streams.kafka.processors.ProcessorResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
-import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.Filter;
-import org.openrdf.query.impl.MapBindingSet;
-
-/**
- * Unit tests the methods of {@link FilterProcessor}.
- */
-public class FilterProcessorTest {
-
- @Test
- public void showFilterFunctionIsCalled() throws Exception {
- // Read the filter object from a SPARQL query.
- final Filter filter = RdfTestUtil.getFilter(
- "SELECT * " +
- "WHERE { " +
- "FILTER(?age < 10)" +
- "?person <urn:age> ?age " +
- "}");
-
- // Create a Binding Set that will be passed into the Filter function based on the where clause.
- final ValueFactory vf = new ValueFactoryImpl();
- final MapBindingSet bs = new MapBindingSet();
- bs.addBinding("person", vf.createURI("urn:Alice"));
- bs.addBinding("age", vf.createLiteral(9));
- final VisibilityBindingSet inputVisBs = new VisibilityBindingSet(bs, "a");
-
- // Mock the processor context that will be invoked.
- final ProcessorContext context = mock(ProcessorContext.class);
-
- // Run the test.
- final FilterProcessor processor = new FilterProcessor(
- FilterEvaluator.make(filter),
- result -> ProcessorResult.make(new UnaryResult(result)));
- processor.init(context);
- processor.process("key", ProcessorResult.make(new UnaryResult(inputVisBs)));
-
- // Verify the binding set was passed through.
- verify(context, times(1)).forward(eq("key"), eq(ProcessorResult.make(new UnaryResult(inputVisBs))));
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
deleted file mode 100644
index c090afa..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.filter;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.indexing.GeoConstants;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.Resource;
-import org.openrdf.model.Statement;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.StatementImpl;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.evaluation.function.Function;
-import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.vividsolutions.jts.geom.Coordinate;
-import com.vividsolutions.jts.geom.Geometry;
-import com.vividsolutions.jts.geom.GeometryFactory;
-import com.vividsolutions.jts.io.WKTWriter;
-
-/**
- * Integration tests the geo methods of {@link FilterProcessor}.
- */
-public class GeoFilterIT {
- private static final String GEO = "http://www.opengis.net/def/function/geosparql/";
- private static final GeometryFactory GF = new GeometryFactory();
- private static final Geometry ZERO = GF.createPoint(new Coordinate(0, 0));
- private static final Geometry ONE = GF.createPoint(new Coordinate(1, 1));
-
- @Rule
- public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
-
- @Test
- public void showGeoFunctionsRegistered() {
- int count = 0;
- final Collection<Function> funcs = FunctionRegistry.getInstance().getAll();
- for (final Function fun : funcs) {
- if (fun.getURI().startsWith(GEO)) {
- count++;
- }
- }
-
- // There are 30 geo functions registered, ensure that there are 30.
- assertEquals(30, count);
- }
-
- @Test
- public void showProcessorWorks() throws Exception {
- // Enumerate some topics that will be re-used
- final String ryaInstance = UUID.randomUUID().toString();
- final UUID queryId = UUID.randomUUID();
- final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
- final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
- // Get the RDF model objects that will be used to build the query.
- final String sparql =
- "PREFIX geo: <http://www.opengis.net/ont/geosparql#>\n"
- + "PREFIX geof: <" + GEO + ">\n"
- + "SELECT * \n"
- + "WHERE { \n"
- + " <urn:event1> geo:asWKT ?point .\n"
- + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
- + "}";
-
- // Setup a topology.
- final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
-
- // Create the statements that will be input into the query.
- final ValueFactory vf = new ValueFactoryImpl();
- final List<VisibilityStatement> statements = getStatements();
-
- // Make the expected results.
- final Set<VisibilityBindingSet> expected = new HashSet<>();
- final MapBindingSet bs = new MapBindingSet();
- final WKTWriter w = new WKTWriter();
- bs.addBinding("point", vf.createLiteral(w.write(ZERO), GeoConstants.XMLSCHEMA_OGC_WKT));
- expected.add( new VisibilityBindingSet(bs, "a") );
-
- // Run the test.
- RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class);
- }
-
- private List<VisibilityStatement> getStatements() throws Exception {
- final List<VisibilityStatement> statements = new ArrayList<>();
- // geo 2x2 points
- statements.add(new VisibilityStatement(statement(ZERO), "a"));
- statements.add(new VisibilityStatement(statement(ONE), "a"));
- return statements;
- }
-
- private static Statement statement(final Geometry geo) {
- final ValueFactory vf = new ValueFactoryImpl();
- final Resource subject = vf.createURI("urn:event1");
- final URI predicate = GeoConstants.GEO_AS_WKT;
- final WKTWriter w = new WKTWriter();
- final Value object = vf.createLiteral(w.write(geo), GeoConstants.XMLSCHEMA_OGC_WKT);
- return new StatementImpl(subject, predicate, object);
- }
-}
\ No newline at end of file