You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:49:00 UTC

[43/50] [abbrv] incubator-rya git commit: RYA-377 Implement a command for running a Rya Streams query out of the command line client.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
deleted file mode 100644
index 8898284..0000000
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka;
-
-import static java.util.Objects.requireNonNull;
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
-import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-
-import com.google.common.collect.Sets;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * A set of utility functions that are useful when writing tests against a Kafka instance.
- */
-@DefaultAnnotation(NonNull.class)
-public final class KafkaTestUtil {
-
-    private KafkaTestUtil() { }
-
-    /**
-     * Create a {@link Producer} that is able to write to a topic that is hosted within an embedded instance of Kafka.
-     *
-     * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null)
-     * @param keySerializerClass - Serializes the keys. (not null)
-     * @param valueSerializerClass - Serializes the values. (not null)
-     * @return A {@link Producer} that can be used to write records to a topic.
-     */
-    public static <K, V> Producer<K, V> makeProducer(
-            final KafkaTestInstanceRule kafka,
-            final Class<? extends Serializer<K>> keySerializerClass,
-            final Class<? extends Serializer<V>> valueSerializerClass) {
-        requireNonNull(kafka);
-        requireNonNull(keySerializerClass);
-        requireNonNull(valueSerializerClass);
-
-        final Properties props = kafka.createBootstrapServerConfig();
-        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
-        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
-        return new KafkaProducer<>(props);
-    }
-
-    /**
-     * Create a {@link Consumer} that has a unique group ID and reads everything from a topic that is hosted within an
-     * embedded instance of Kafka starting at the earliest point by default.
-     *
-     * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null)
-     * @param keyDeserializerClass - Deserializes the keys. (not null)
-     * @param valueDeserializerClass - Deserializes the values. (not null)
-     * @return A {@link Consumer} that can be used to read records from a topic.
-     */
-    public static <K, V> Consumer<K, V> fromStartConsumer(
-            final KafkaTestInstanceRule kafka,
-            final Class<? extends Deserializer<K>> keyDeserializerClass,
-            final Class<? extends Deserializer<V>> valueDeserializerClass) {
-        requireNonNull(kafka);
-        requireNonNull(keyDeserializerClass);
-        requireNonNull(valueDeserializerClass);
-
-        final Properties props = kafka.createBootstrapServerConfig();
-        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
-        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
-        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
-        return new KafkaConsumer<>(props);
-    }
-
-    /**
-     * Polls a {@link Consumer> until it has either polled too many times without hitting the target number
-     * of results, or it hits the target number of results.
-     *
-     * @param pollMs - How long each poll could take.
-     * @param pollIterations - The maximum number of polls that will be attempted.
-     * @param targetSize - The number of results to read before stopping.
-     * @param consumer - The consumer that will be polled.
-     * @return The results that were read frmo the consumer.
-     * @throws Exception If the poll failed.
-     */
-    public static <K, V> List<V> pollForResults(
-            final int pollMs,
-            final int pollIterations,
-            final int targetSize,
-            final Consumer<K, V> consumer) throws Exception {
-        requireNonNull(consumer);
-
-        final List<V> values = new ArrayList<>();
-
-        int i = 0;
-        while(values.size() < targetSize && i < pollIterations) {
-            for(final ConsumerRecord<K, V> record : consumer.poll(pollMs)) {
-                values.add( record.value() );
-            }
-            i++;
-        }
-
-        return values;
-    }
-
-    /**
-     * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
-     * the results topic, and ensures the expected results match the read results.
-     *
-     * @param <T> The type of value that will be consumed from the results topic.
-     * @param kafka - The embedded Kafka instance that is being tested with. (not null)
-     * @param statementsTopic - The topic statements will be written to. (not null)
-     * @param resultsTopic - The topic results will be read from. (not null)
-     * @param builder - The streams topology that will be executed. (not null)
-     * @param startupMs - How long to wait for the topology to start before writing the statements.
-     * @param statements - The statements that will be loaded into the topic. (not null)
-     * @param expected - The expected results. (not null)
-     * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
-     *   values from the results topic. (not null)
-     * @throws Exception If any exception was thrown while running the test.
-     */
-    public static <T> void runStreamProcessingTest(
-            final KafkaTestInstanceRule kafka,
-            final String statementsTopic,
-            final String resultsTopic,
-            final TopologyBuilder builder,
-            final int startupMs,
-            final List<VisibilityStatement> statements,
-            final Set<T> expected,
-            final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
-        requireNonNull(kafka);
-        requireNonNull(statementsTopic);
-        requireNonNull(resultsTopic);
-        requireNonNull(builder);
-        requireNonNull(statements);
-        requireNonNull(expected);
-        requireNonNull(expectedDeserializerClass);
-
-        // Explicitly create the topics that are being used.
-        kafka.createTopic(statementsTopic);
-        kafka.createTopic(resultsTopic);
-
-        // Start the streams program.
-        final Properties props = kafka.createBootstrapServerConfig();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT");
-
-        final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
-        streams.cleanUp();
-        try {
-            streams.start();
-
-            // Wait for the streams application to start. Streams only see data after their consumers are connected.
-            Thread.sleep(startupMs);
-
-            // Load the statements into the input topic.
-            try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
-                    kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
-                new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
-            }
-
-            // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
-            try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) {
-                // Register the topic.
-                consumer.subscribe(Arrays.asList(resultsTopic));
-
-                // Poll for the result.
-                final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
-
-                // Show the correct binding sets results from the job.
-                assertEquals(expected, results);
-            }
-        } finally {
-            streams.close();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
new file mode 100644
index 0000000..b7e2be2
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
+
+import com.google.common.collect.Sets;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Utility functions that make it easier to test Rya Streams applications.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaStreamsTestUtil {
+
+    /**
+     * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of
+     * the results topic, and ensures the expected results match the read results.
+     *
+     * @param <T> The type of value that will be consumed from the results topic.
+     * @param kafka - The embedded Kafka instance that is being tested with. (not null)
+     * @param statementsTopic - The topic statements will be written to. (not null)
+     * @param resultsTopic - The topic results will be read from. (not null)
+     * @param builder - The streams topology that will be executed. (not null)
+     * @param startupMs - How long to wait for the topology to start before writing the statements.
+     * @param statements - The statements that will be loaded into the topic. (not null)
+     * @param expected - The expected results. (not null)
+     * @param expectedDeserializerClass - The class of the deserializer that will be used when reading
+     *   values from the results topic. (not null)
+     * @throws Exception If any exception was thrown while running the test.
+     */
+    public static <T> void runStreamProcessingTest(
+            final KafkaTestInstanceRule kafka,
+            final String statementsTopic,
+            final String resultsTopic,
+            final TopologyBuilder builder,
+            final int startupMs,
+            final List<VisibilityStatement> statements,
+            final Set<T> expected,
+            final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
+        requireNonNull(kafka);
+        requireNonNull(statementsTopic);
+        requireNonNull(resultsTopic);
+        requireNonNull(builder);
+        requireNonNull(statements);
+        requireNonNull(expected);
+        requireNonNull(expectedDeserializerClass);
+
+        // Explicitly create the topics that are being used.
+        kafka.createTopic(statementsTopic);
+        kafka.createTopic(resultsTopic);
+
+        // Start the streams program.
+        final Properties props = kafka.createBootstrapServerConfig();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT");
+
+        final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props));
+        streams.cleanUp();
+        try {
+            streams.start();
+
+            // Wait for the streams application to start. Streams only see data after their consumers are connected.
+            Thread.sleep(startupMs);
+
+            // Load the statements into the input topic.
+            try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer(
+                    kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
+                new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
+            }
+
+            // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found.
+            try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) {
+                // Register the topic.
+                consumer.subscribe(Arrays.asList(resultsTopic));
+
+                // Poll for the result.
+                final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) );
+
+                // Show the correct binding sets results from the job.
+                assertEquals(expected, results);
+            }
+        } finally {
+            streams.close();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
index 67889e9..c740ba2 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
@@ -31,10 +31,10 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.streams.api.entity.QueryResultStream;
 import org.apache.rya.streams.api.interactor.GetQueryResultStream;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
 import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
index b48addd..7bfa560 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
@@ -31,11 +31,10 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
-import org.apache.rya.test.kafka.KafkaITBase;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
 import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;
@@ -45,7 +44,7 @@ import org.openrdf.rio.UnsupportedRDFormatException;
 /**
  * Integration tests the {@link KafkaLoadStatements} command
  */
-public class KafkaLoadStatementsIT extends KafkaITBase {
+public class KafkaLoadStatementsIT {
     private static final Path TURTLE_FILE = Paths.get("src/test/resources/statements.ttl");
 
     private static final Path INVALID = Paths.get("src/test/resources/invalid.INVALID");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
new file mode 100644
index 0000000..33b3a92
--- /dev/null
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.LoadStatements;
+import org.apache.rya.streams.api.interactor.RunQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration tests the methods of {@link KafkaRunQuery}.
+ */
+public class KafkaRunQueryIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    private Producer<String, VisibilityStatement> producer;
+    private Consumer<String, VisibilityBindingSet> consumer;
+
+    @Before
+    public void setup() {
+        producer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
+        consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
+    }
+
+    @After
+    public void cleanup() {
+        producer.close();
+        consumer.close();
+    }
+
+    @Test
+    public void runQuery() throws Exception {
+        // Setup some constant that will be used through the test.
+        final String ryaInstance = UUID.randomUUID().toString();
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+
+        // This query is completely in memory, so it doesn't need to be closed.
+        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() );
+
+        // Add the query to the query repository.
+        final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }");
+        final UUID queryId = sQuery.getQueryId();
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // The thread that will run the tested interactor.
+        final Thread testThread = new Thread() {
+            @Override
+            public void run() {
+                final RunQuery runQuery = new KafkaRunQuery(
+                        kafka.getKafkaHostname(),
+                        kafka.getKafkaPort(),
+                        statementsTopic,
+                        resultsTopic,
+                        queries,
+                        new TopologyFactory());
+                try {
+                    runQuery.run(queryId);
+                } catch (final RyaStreamsException e) {
+                    // Do nothing. Test will still fail because the expected results will be missing.
+                }
+            }
+        };
+
+        // Create the topics.
+        kafka.createTopic(statementsTopic);
+        kafka.createTopic(resultsTopic);
+
+        // Create the statements that will be loaded.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(vf.createStatement(
+                vf.createURI("urn:Alice"),
+                vf.createURI("urn:worksAt"),
+                vf.createURI("urn:BurgerJoint")), "a"));
+        statements.add(new VisibilityStatement(vf.createStatement(
+                vf.createURI("urn:Bob"),
+                vf.createURI("urn:worksAt"),
+                vf.createURI("urn:TacoShop")), "a"));
+        statements.add(new VisibilityStatement(vf.createStatement(
+                vf.createURI("urn:Charlie"),
+                vf.createURI("urn:worksAt"),
+                vf.createURI("urn:TacoShop")), "a"));
+
+        // Create the expected results.
+        final List<VisibilityBindingSet> expected = new ArrayList<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoShop"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Charlie"));
+        bs.addBinding("business", vf.createURI("urn:TacoShop"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+
+        // Execute the test. This will result in a set of results that were read from the results topic.
+        final List<VisibilityBindingSet> results;
+        try {
+            // Wait for the program to start.
+            testThread.start();
+            Thread.sleep(2000);
+
+            // Write some statements to the program.
+            final LoadStatements loadStatements = new KafkaLoadStatements(statementsTopic, producer);
+            loadStatements.fromCollection(statements);
+
+            // Read the output of the streams program.
+            consumer.subscribe( Lists.newArrayList(resultsTopic) );
+            results = KafkaTestUtil.pollForResults(500, 6, 3, consumer);
+        } finally {
+            // Tear down the test.
+            testThread.interrupt();
+            testThread.join(3000);
+        }
+
+        // Show the read results matched the expected ones.
+        assertEquals(expected, results);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
index 3e0e64d..80b6e42 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -28,8 +28,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.rya.api.function.projection.RandomUUIDFactory;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
 import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -75,7 +75,7 @@ public class StatementPatternProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -113,7 +113,7 @@ public class StatementPatternProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a|b") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -147,7 +147,7 @@ public class StatementPatternProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -190,6 +190,6 @@ public class StatementPatternProcessorIT {
         expected.add(new VisibilityBindingSet(bs, "a"));
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
index 0348dcd..fb5305f 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java
@@ -28,8 +28,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.rya.api.function.projection.RandomUUIDFactory;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
 import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -81,6 +81,6 @@ public class FilterProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
index b137a9a..51bb0ae 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
@@ -29,8 +29,8 @@ import org.apache.rya.api.function.join.NaturalJoin;
 import org.apache.rya.api.function.projection.RandomUUIDFactory;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
 import org.apache.rya.streams.kafka.processors.ProcessorResult;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
 import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor;
@@ -111,7 +111,7 @@ public class JoinProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -162,7 +162,7 @@ public class JoinProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -219,7 +219,7 @@ public class JoinProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a&c") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -260,7 +260,7 @@ public class JoinProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "a") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 
     @Test
@@ -311,6 +311,6 @@ public class JoinProcessorIT {
         expected.add( new VisibilityBindingSet(bs, "c") );
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
index d71577b..c96919c 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
@@ -26,8 +26,8 @@ import java.util.UUID;
 
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
 import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -87,6 +87,6 @@ public class MultiProjectionProcessorIT {
         expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a"));
 
         // Run the test.
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
index bc5f115..63c2cc7 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -28,8 +28,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.rya.api.function.projection.RandomUUIDFactory;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
 import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.topology.TopologyFactory;
@@ -80,6 +80,6 @@ public class ProjectionProcessorIT {
         expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob"));
         expected.add(new VisibilityBindingSet(expectedBs, "a"));
 
-        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
+        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
index ff2b59b..04c81ed 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
@@ -33,11 +33,11 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.streams.api.queries.ChangeLogEntry;
 import org.apache.rya.streams.api.queries.QueryChange;
 import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaITBase;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
index f9129ff..70cba1c 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
@@ -29,8 +29,8 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
 import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
index b85eb0c..9e85f52 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
@@ -29,8 +29,8 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
 import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/test/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/test/kafka/pom.xml b/test/kafka/pom.xml
index 44773a7..8492629 100644
--- a/test/kafka/pom.xml
+++ b/test/kafka/pom.xml
@@ -70,6 +70,11 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+        
+        <dependency>
+            <groupId>com.github.stephenc.findbugs</groupId>
+            <artifactId>findbugs-annotations</artifactId>
+        </dependency>
     
         <!-- Testing dependencies. -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
index f76fa2b..252c288 100644
--- a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
@@ -105,6 +105,13 @@ public class KafkaTestInstanceRule extends ExternalResource {
     }
 
     /**
+     * @return The hostnames of the Zookeeper servers.
+     */
+    public String getZookeeperServers() {
+        return kafkaInstance.getZookeeperConnect();
+    }
+
+    /**
      * @return The hostname of the Kafka Broker.
      */
     public String getKafkaHostname() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java
new file mode 100644
index 0000000..4b41f1a
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.test.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A set of utility functions that are useful when writing tests against a Kafka instance.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class KafkaTestUtil {
+
+    private KafkaTestUtil() { }
+
+    /**
+     * Create a {@link Producer} that is able to write to a topic that is hosted within an embedded instance of Kafka.
+     *
+     * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null)
+     * @param keySerializerClass - Serializes the keys. (not null)
+     * @param valueSerializerClass - Serializes the values. (not null)
+     * @return A {@link Producer} that can be used to write records to a topic.
+     */
+    public static <K, V> Producer<K, V> makeProducer(
+            final KafkaTestInstanceRule kafka,
+            final Class<? extends Serializer<K>> keySerializerClass,
+            final Class<? extends Serializer<V>> valueSerializerClass) {
+        requireNonNull(kafka);
+        requireNonNull(keySerializerClass);
+        requireNonNull(valueSerializerClass);
+
+        final Properties props = kafka.createBootstrapServerConfig();
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
+        return new KafkaProducer<>(props);
+    }
+
+    /**
+     * Create a {@link Consumer} that has a unique group ID and reads everything from a topic that is hosted within an
+     * embedded instance of Kafka starting at the earliest point by default.
+     *
+     * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null)
+     * @param keyDeserializerClass - Deserializes the keys. (not null)
+     * @param valueDeserializerClass - Deserializes the values. (not null)
+     * @return A {@link Consumer} that can be used to read records from a topic.
+     */
+    public static <K, V> Consumer<K, V> fromStartConsumer(
+            final KafkaTestInstanceRule kafka,
+            final Class<? extends Deserializer<K>> keyDeserializerClass,
+            final Class<? extends Deserializer<V>> valueDeserializerClass) {
+        requireNonNull(kafka);
+        requireNonNull(keyDeserializerClass);
+        requireNonNull(valueDeserializerClass);
+
+        final Properties props = kafka.createBootstrapServerConfig();
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
+        return new KafkaConsumer<>(props);
+    }
+
+    /**
+     * Polls a {@link Consumer> until it has either polled too many times without hitting the target number
+     * of results, or it hits the target number of results.
+     *
+     * @param pollMs - How long each poll could take.
+     * @param pollIterations - The maximum number of polls that will be attempted.
+     * @param targetSize - The number of results to read before stopping.
+     * @param consumer - The consumer that will be polled.
+     * @return The results that were read frmo the consumer.
+     * @throws Exception If the poll failed.
+     */
+    public static <K, V> List<V> pollForResults(
+            final int pollMs,
+            final int pollIterations,
+            final int targetSize,
+            final Consumer<K, V> consumer) throws Exception {
+        requireNonNull(consumer);
+
+        final List<V> values = new ArrayList<>();
+
+        int i = 0;
+        while(values.size() < targetSize && i < pollIterations) {
+            for(final ConsumerRecord<K, V> record : consumer.poll(pollMs)) {
+                values.add( record.value() );
+            }
+            i++;
+        }
+
+        return values;
+    }
+}
\ No newline at end of file