You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:49:00 UTC
[43/50] [abbrv] incubator-rya git commit: RYA-377 Implement a command
for running a Rya Streams query out of the command line client.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
deleted file mode 100644
index 8898284..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka;
-
-import static java.util.Objects.requireNonNull;
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-
-import com.google.common.collect.Sets;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * A set of utility functions that are useful when writing tests against a Kafka instance.
- */
-@DefaultAnnotation(NonNull.class)
-public final class KafkaTestUtil {
-
- private KafkaTestUtil() { }
-
- /**
- * Create a {@link Producer} that is able to write to a topic that is hosted within an embedded instance of Kafka.
- *
- * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null)
- * @param keySerializerClass - Serializes the keys. (not null)
- * @param valueSerializerClass - Serializes the values. (not null)
- * @return A {@link Producer} that can be used to write records to a topic.
- */
- public static <K, V> Producer<K, V> makeProducer(
- final KafkaTestInstanceRule kafka,
- final Class<? extends Serializer<K>> keySerializerClass,
- final Class<? extends Serializer<V>> valueSerializerClass) {
- requireNonNull(kafka);
- requireNonNull(keySerializerClass);
- requireNonNull(valueSerializerClass);
-
- final Properties props = kafka.createBootstrapServerConfig();
- props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
- props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
- return new KafkaProducer<>(props);
- }
-
- /**
- * Create a {@link Consumer} that has a unique group ID and reads everything from a topic that is hosted within an
- * embedded instance of Kafka starting at the earliest point by default.
- *
- * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null)
- * @param keyDeserializerClass - Deserializes the keys. (not null)
- * @param valueDeserializerClass - Deserializes the values. (not null)
- * @return A {@link Consumer} that can be used to read records from a topic.
- */
- public static <K, V> Consumer<K, V> fromStartConsumer(
- final KafkaTestInstanceRule kafka,
- final Class<? extends Deserializer<K>> keyDeserializerClass,
- final Class<? extends Deserializer<V>> valueDeserializerClass) {
- requireNonNull(kafka);
- requireNonNull(keyDeserializerClass);
- requireNonNull(valueDeserializerClass);
-
- final Properties props = kafka.createBootstrapServerConfig();
- props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
- props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
- props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
- return new KafkaConsumer<>(props);
- }
-
- /**
- * Polls a {@link Consumer> until it has either polled too many times without hitting the target number
- * of results, or it hits the target number of results.
- *
- * @param pollMs - How long each poll could take.
- * @param pollIterations - The maximum number of polls that will be attempted.
- * @param targetSize - The number of results to read before stopping.
- * @param consumer - The consumer that will be polled.
- * @return The results that were read frmo the consumer.
- * @throws Exception If the poll failed.
- */
- public static <K, V> List<V> pollForResults(
- final int pollMs,
- final int pollIterations,
- final int targetSize,
- final Consumer<K, V> consumer) throws Exception {
- requireNonNull(consumer);
-
- final List<V> values = new ArrayList<>();
-
- int i = 0;
- while(values.size() < targetSize && i < pollIterations) {
- for(final ConsumerRecord<K, V> record : consumer.poll(pollMs)) {
- values.add( record.value() );
- }
- i++;
- }
-
- return values;
- }
-
- /**
- * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
- * the results topic, and ensures the expected results match the read results.
- *
- * @param <T> The type of value that will be consumed from the results topic.
- * @param kafka - The embedded Kafka instance that is being tested with. (not null)
- * @param statementsTopic - The topic statements will be written to. (not null)
- * @param resultsTopic - The topic results will be read from. (not null)
- * @param builder - The streams topology that will be executed. (not null)
- * @param startupMs - How long to wait for the topology to start before writing the statements.
- * @param statements - The statements that will be loaded into the topic. (not null)
- * @param expected - The expected results. (not null)
- * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
- * values from the results topic. (not null)
- * @throws Exception If any exception was thrown while running the test.
- */
- public static <T> void runStreamProcessingTest(
- final KafkaTestInstanceRule kafka,
- final String statementsTopic,
- final String resultsTopic,
- final TopologyBuilder builder,
- final int startupMs,
- final List<VisibilityStatement> statements,
- final Set<T> expected,
- final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
- requireNonNull(kafka);
- requireNonNull(statementsTopic);
- requireNonNull(resultsTopic);
- requireNonNull(builder);
- requireNonNull(statements);
- requireNonNull(expected);
- requireNonNull(expectedDeserializerClass);
-
- // Explicitly create the topics that are being used.
- kafka.createTopic(statementsTopic);
- kafka.createTopic(resultsTopic);
-
- // Start the streams program.
- final Properties props = kafka.createBootstrapServerConfig();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT");
-
- final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
- streams.cleanUp();
- try {
- streams.start();
-
- // Wait for the streams application to start. Streams only see data after their consumers are connected.
- Thread.sleep(startupMs);
-
- // Load the statements into the input topic.
- try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
- kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
- new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
- }
-
- // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
- try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) {
- // Register the topic.
- consumer.subscribe(Arrays.asList(resultsTopic));
-
- // Poll for the result.
- final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
-
- // Show the correct binding sets results from the job.
- assertEquals(expected, results);
- }
- } finally {
- streams.close();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
new file mode 100644
index 0000000..b7e2be2
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
+
+import com.google.common.collect.Sets;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Utility functions that make it easier to test Rya Streams applications.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaStreamsTestUtil {
+
+ /**
+ * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
+ * the results topic, and ensures the expected results match the read results.
+ *
+ * @param <T> The type of value that will be consumed from the results topic.
+ * @param kafka - The embedded Kafka instance that is being tested with. (not null)
+ * @param statementsTopic - The topic statements will be written to. (not null)
+ * @param resultsTopic - The topic results will be read from. (not null)
+ * @param builder - The streams topology that will be executed. (not null)
+ * @param startupMs - How long to wait for the topology to start before writing the statements.
+ * @param statements - The statements that will be loaded into the topic. (not null)
+ * @param expected - The expected results. (not null)
+ * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
+ * values from the results topic. (not null)
+ * @throws Exception If any exception was thrown while running the test.
+ */
+ public static <T> void runStreamProcessingTest(
+ final KafkaTestInstanceRule kafka,
+ final String statementsTopic,
+ final String resultsTopic,
+ final TopologyBuilder builder,
+ final int startupMs,
+ final List<VisibilityStatement> statements,
+ final Set<T> expected,
+ final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
+ requireNonNull(kafka);
+ requireNonNull(statementsTopic);
+ requireNonNull(resultsTopic);
+ requireNonNull(builder);
+ requireNonNull(statements);
+ requireNonNull(expected);
+ requireNonNull(expectedDeserializerClass);
+
+ // Explicitly create the topics that are being used.
+ kafka.createTopic(statementsTopic);
+ kafka.createTopic(resultsTopic);
+
+ // Start the streams program.
+ final Properties props = kafka.createBootstrapServerConfig();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT");
+
+ final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
+ streams.cleanUp();
+ try {
+ streams.start();
+
+ // Wait for the streams application to start. Streams only see data after their consumers are connected.
+ Thread.sleep(startupMs);
+
+ // Load the statements into the input topic.
+ try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
+ kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
+ new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
+ }
+
+ // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
+ try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) {
+ // Register the topic.
+ consumer.subscribe(Arrays.asList(resultsTopic));
+
+ // Poll for the result.
+ final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
+
+ // Show the correct binding sets results from the job.
+ assertEquals(expected, results);
+ }
+ } finally {
+ streams.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
index 67889e9..c740ba2 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
@@ -31,10 +31,10 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.streams.api.entity.QueryResultStream;
import org.apache.rya.streams.api.interactor.GetQueryResultStream;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
index b48addd..7bfa560 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
@@ -31,11 +31,10 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
-import org.apache.rya.test.kafka.KafkaITBase;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
@@ -45,7 +44,7 @@ import org.openrdf.rio.UnsupportedRDFormatException;
/**
* Integration tests the {@link KafkaLoadStatements} command
*/
-public class KafkaLoadStatementsIT extends KafkaITBase {
+public class KafkaLoadStatementsIT {
private static final Path TURTLE_FILE = Paths.get("src/test/resources/statements.ttl");
private static final Path INVALID = Paths.get("src/test/resources/invalid.INVALID");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
new file mode 100644
index 0000000..33b3a92
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.LoadStatements;
+import org.apache.rya.streams.api.interactor.RunQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration tests the methods of {@link KafkaRunQuery}.
+ */
+public class KafkaRunQueryIT {
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+ private Producer<String, VisibilityStatement> producer;
+ private Consumer<String, VisibilityBindingSet> consumer;
+
+ @Before
+ public void setup() {
+ producer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
+ consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
+ }
+
+ @After
+ public void cleanup() {
+ producer.close();
+ consumer.close();
+ }
+
+ @Test
+ public void runQuery() throws Exception {
+ // Setup some constant that will be used through the test.
+ final String ryaInstance = UUID.randomUUID().toString();
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+
+ // This query is completely in memory, so it doesn't need to be closed.
+ final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() );
+
+ // Add the query to the query repository.
+ final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }");
+ final UUID queryId = sQuery.getQueryId();
+ final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+ // The thread that will run the tested interactor.
+ final Thread testThread = new Thread() {
+ @Override
+ public void run() {
+ final RunQuery runQuery = new KafkaRunQuery(
+ kafka.getKafkaHostname(),
+ kafka.getKafkaPort(),
+ statementsTopic,
+ resultsTopic,
+ queries,
+ new TopologyFactory());
+ try {
+ runQuery.run(queryId);
+ } catch (final RyaStreamsException e) {
+ // Do nothing. Test will still fail because the expected results will be missing.
+ }
+ }
+ };
+
+ // Create the topics.
+ kafka.createTopic(statementsTopic);
+ kafka.createTopic(resultsTopic);
+
+ // Create the statements that will be loaded.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(vf.createStatement(
+ vf.createURI("urn:Alice"),
+ vf.createURI("urn:worksAt"),
+ vf.createURI("urn:BurgerJoint")), "a"));
+ statements.add(new VisibilityStatement(vf.createStatement(
+ vf.createURI("urn:Bob"),
+ vf.createURI("urn:worksAt"),
+ vf.createURI("urn:TacoShop")), "a"));
+ statements.add(new VisibilityStatement(vf.createStatement(
+ vf.createURI("urn:Charlie"),
+ vf.createURI("urn:worksAt"),
+ vf.createURI("urn:TacoShop")), "a"));
+
+ // Create the expected results.
+ final List<VisibilityBindingSet> expected = new ArrayList<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:TacoShop"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Charlie"));
+ bs.addBinding("business", vf.createURI("urn:TacoShop"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+
+ // Execute the test. This will result in a set of results that were read from the results topic.
+ final List<VisibilityBindingSet> results;
+ try {
+ // Wait for the program to start.
+ testThread.start();
+ Thread.sleep(2000);
+
+ // Write some statements to the program.
+ final LoadStatements loadStatements = new KafkaLoadStatements(statementsTopic, producer);
+ loadStatements.fromCollection(statements);
+
+ // Read the output of the streams program.
+ consumer.subscribe( Lists.newArrayList(resultsTopic) );
+ results = KafkaTestUtil.pollForResults(500, 6, 3, consumer);
+ } finally {
+ // Tear down the test.
+ testThread.interrupt();
+ testThread.join(3000);
+ }
+
+ // Show the read results matched the expected ones.
+ assertEquals(expected, results);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
index 3e0e64d..80b6e42 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -28,8 +28,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -75,7 +75,7 @@ public class StatementPatternProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -113,7 +113,7 @@ public class StatementPatternProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a|b") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -147,7 +147,7 @@ public class StatementPatternProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -190,6 +190,6 @@ public class StatementPatternProcessorIT {
expected.add(new VisibilityBindingSet(bs, "a"));
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
index 0348dcd..fb5305f 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
@@ -28,8 +28,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -81,6 +81,6 @@ public class FilterProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index b137a9a..51bb0ae 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -29,8 +29,8 @@ import org.apache.rya.api.function.join.NaturalJoin;
import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
import org.apache.rya.streams.kafka.processors.ProcessorResult;
import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor;
@@ -111,7 +111,7 @@ public class JoinProcessorIT {
expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -162,7 +162,7 @@ public class JoinProcessorIT {
expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -219,7 +219,7 @@ public class JoinProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a&c") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -260,7 +260,7 @@ public class JoinProcessorIT {
expected.add( new VisibilityBindingSet(bs, "a") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class);
}
@Test
@@ -311,6 +311,6 @@ public class JoinProcessorIT {
expected.add( new VisibilityBindingSet(bs, "c") );
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
index d71577b..c96919c 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -26,8 +26,8 @@ import java.util.UUID;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -87,6 +87,6 @@ public class MultiProjectionProcessorIT {
expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a"));
// Run the test.
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
index bc5f115..63c2cc7 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -28,8 +28,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -80,6 +80,6 @@ public class ProjectionProcessorIT {
expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob"));
expected.add(new VisibilityBindingSet(expectedBs, "a"));
- KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
+ RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
index ff2b59b..04c81ed 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
@@ -33,11 +33,11 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.streams.api.queries.ChangeLogEntry;
import org.apache.rya.streams.api.queries.QueryChange;
import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaITBase;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
index f9129ff..70cba1c 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
@@ -29,8 +29,8 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
index b85eb0c..9e85f52 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
@@ -29,8 +29,8 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/test/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/test/kafka/pom.xml b/test/kafka/pom.xml
index 44773a7..8492629 100644
--- a/test/kafka/pom.xml
+++ b/test/kafka/pom.xml
@@ -70,6 +70,11 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>com.github.stephenc.findbugs</groupId>
+ <artifactId>findbugs-annotations</artifactId>
+ </dependency>
<!-- Testing dependencies. -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
index f76fa2b..252c288 100644
--- a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
@@ -105,6 +105,13 @@ public class KafkaTestInstanceRule extends ExternalResource {
}
/**
+ * @return The hostnames of the Zookeeper servers.
+ */
+ public String getZookeeperServers() {
+ return kafkaInstance.getZookeeperConnect();
+ }
+
+ /**
* @return The hostname of the Kafka Broker.
*/
public String getKafkaHostname() {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java
new file mode 100644
index 0000000..4b41f1a
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.test.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A set of utility functions that are useful when writing tests against a Kafka instance.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class KafkaTestUtil {
+
+ private KafkaTestUtil() { }
+
+ /**
+ * Create a {@link Producer} that is able to write to a topic that is hosted within an embedded instance of Kafka.
+ *
+ * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null)
+ * @param keySerializerClass - Serializes the keys. (not null)
+ * @param valueSerializerClass - Serializes the values. (not null)
+ * @return A {@link Producer} that can be used to write records to a topic.
+ */
+ public static <K, V> Producer<K, V> makeProducer(
+ final KafkaTestInstanceRule kafka,
+ final Class<? extends Serializer<K>> keySerializerClass,
+ final Class<? extends Serializer<V>> valueSerializerClass) {
+ requireNonNull(kafka);
+ requireNonNull(keySerializerClass);
+ requireNonNull(valueSerializerClass);
+
+ final Properties props = kafka.createBootstrapServerConfig();
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
+ return new KafkaProducer<>(props);
+ }
+
+ /**
+ * Create a {@link Consumer} that has a unique group ID and reads everything from a topic that is hosted within an
+ * embedded instance of Kafka starting at the earliest point by default.
+ *
+ * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null)
+ * @param keyDeserializerClass - Deserializes the keys. (not null)
+ * @param valueDeserializerClass - Deserializes the values. (not null)
+ * @return A {@link Consumer} that can be used to read records from a topic.
+ */
+ public static <K, V> Consumer<K, V> fromStartConsumer(
+ final KafkaTestInstanceRule kafka,
+ final Class<? extends Deserializer<K>> keyDeserializerClass,
+ final Class<? extends Deserializer<V>> valueDeserializerClass) {
+ requireNonNull(kafka);
+ requireNonNull(keyDeserializerClass);
+ requireNonNull(valueDeserializerClass);
+
+ final Properties props = kafka.createBootstrapServerConfig();
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
+ return new KafkaConsumer<>(props);
+ }
+
+ /**
+ * Polls a {@link Consumer> until it has either polled too many times without hitting the target number
+ * of results, or it hits the target number of results.
+ *
+ * @param pollMs - How long each poll could take.
+ * @param pollIterations - The maximum number of polls that will be attempted.
+ * @param targetSize - The number of results to read before stopping.
+ * @param consumer - The consumer that will be polled.
+ * @return The results that were read frmo the consumer.
+ * @throws Exception If the poll failed.
+ */
+ public static <K, V> List<V> pollForResults(
+ final int pollMs,
+ final int pollIterations,
+ final int targetSize,
+ final Consumer<K, V> consumer) throws Exception {
+ requireNonNull(consumer);
+
+ final List<V> values = new ArrayList<>();
+
+ int i = 0;
+ while(values.size() < targetSize && i < pollIterations) {
+ for(final ConsumerRecord<K, V> record : consumer.poll(pollMs)) {
+ values.add( record.value() );
+ }
+ i++;
+ }
+
+ return values;
+ }
+}
\ No newline at end of file