You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:49:01 UTC

[44/50] [abbrv] incubator-rya git commit: RYA-377 Implement a command for running a Rya Streams query out of the command line client.

RYA-377 Implement a command for running a Rya Streams query out of the command line client.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/94423229
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/94423229
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/94423229

Branch: refs/heads/master
Commit: 94423229ebe7b34e0fb6c17fbe022e080cfe79d9
Parents: a5e3618
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Nov 14 18:32:53 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500

----------------------------------------------------------------------
 .../rya/streams/api/interactor/RunQuery.java    |  41 ++++
 .../api/queries/InMemoryQueryRepository.java    |  21 +-
 .../streams/api/queries/QueryRepository.java    |  14 +-
 .../queries/InMemoryQueryRepositoryTest.java    | 121 ++++++-----
 .../apache/rya/streams/client/CLIDriver.java    |   2 +
 .../client/command/LoadStatementsCommand.java   |   1 +
 .../streams/client/command/RunQueryCommand.java | 155 ++++++++++++++
 .../client/command/StreamResultsCommand.java    |   2 +-
 .../client/command/AddQueryCommandIT.java       |  55 ++---
 .../client/command/DeleteQueryCommandIT.java    | 183 +++++++---------
 .../client/command/ListQueryCommandIT.java      |  56 ++---
 .../client/command/LoadStatementsCommandIT.java |  78 ++-----
 .../client/command/RunQueryCommandIT.java       | 196 +++++++++++++++++
 extras/rya.streams/kafka/pom.xml                |  10 +
 .../apache/rya/streams/kafka/KafkaTopics.java   |  39 ++++
 .../kafka/interactor/KafkaLoadStatements.java   |   5 +-
 .../streams/kafka/interactor/KafkaRunQuery.java | 136 ++++++++++++
 .../processors/join/KeyValueJoinStateStore.java |   5 +-
 .../apache/rya/streams/kafka/KafkaTestUtil.java | 211 -------------------
 .../rya/streams/kafka/RyaStreamsTestUtil.java   | 124 +++++++++++
 .../interactor/KafkaGetQueryResultStreamIT.java |   2 +-
 .../kafka/interactor/KafkaLoadStatementsIT.java |   5 +-
 .../kafka/interactor/KafkaRunQueryIT.java       | 170 +++++++++++++++
 .../processors/StatementPatternProcessorIT.java |  10 +-
 .../processors/filter/FilterProcessorIT.java    |   4 +-
 .../kafka/processors/join/JoinProcessorIT.java  |  12 +-
 .../projection/MultiProjectionProcessorIT.java  |   4 +-
 .../projection/ProjectionProcessorIT.java       |   4 +-
 .../kafka/queries/KafkaQueryChangeLogIT.java    |   2 +-
 .../VisibilityBindingSetKafkaIT.java            |   2 +-
 .../VisibilityStatementKafkaIT.java             |   2 +-
 test/kafka/pom.xml                              |   5 +
 .../rya/test/kafka/KafkaTestInstanceRule.java   |   7 +
 .../apache/rya/test/kafka/KafkaTestUtil.java    | 126 +++++++++++
 34 files changed, 1269 insertions(+), 541 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java
new file mode 100644
index 0000000..7f47095
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor;
+
+import java.util.UUID;
+
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Runs a Rya Streams processing topology on the machine this class is invoked on.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface RunQuery {
+
+    /**
+     * Runs the specified query on the machine this method was invoked on.
+     *
+     * @param queryId - The id of the query that will be processed. (not null)
+     * @throws RyaStreamsException The query could not processed.
+     */
+    public void run(UUID queryId) throws RyaStreamsException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index c1048fc..80678de 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantLock;
@@ -63,13 +64,7 @@ public class InMemoryQueryRepository implements QueryRepository {
         this.changeLog = requireNonNull(changeLog);
 
         // Lazily initialize the queries cache the first time you try to use it.
-        queriesCache = Suppliers.memoize(new Supplier<Map<UUID, StreamsQuery>>() {
-            @Override
-            public Map<UUID, StreamsQuery> get() {
-                // Initialize the queries cache using the current state of the change log.
-                return initializeCache(changeLog);
-            }
-        });
+        queriesCache = Suppliers.memoize(() -> initializeCache(changeLog));
     }
 
     @Override
@@ -98,6 +93,18 @@ public class InMemoryQueryRepository implements QueryRepository {
     }
 
     @Override
+    public Optional<StreamsQuery> get(final UUID queryId) throws QueryRepositoryException {
+        requireNonNull(queryId);
+
+        lock.lock();
+        try {
+            return Optional.ofNullable( queriesCache.get().get(queryId) );
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
     public void delete(final UUID queryId) throws QueryRepositoryException {
         requireNonNull(queryId);
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
index 850b2bc..7269588 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
@@ -18,6 +18,7 @@
  */
 package org.apache.rya.streams.api.queries;
 
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 
@@ -32,6 +33,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  */
 @DefaultAnnotation(NonNull.class)
 public interface QueryRepository extends AutoCloseable {
+
     /**
      * Adds a new query to Rya Streams.
      *
@@ -42,6 +44,15 @@ public interface QueryRepository extends AutoCloseable {
     public StreamsQuery add(final String query) throws QueryRepositoryException;
 
     /**
+     * Get an existing query from Rya Streams.
+     *
+     * @param queryId - Identifies which query will be fetched.
+     * @return the {@link StreamsQuery} for the id if one exists; otherwise empty.
+     * @throws QueryRepositoryException The query could not be fetched.
+     */
+    public Optional<StreamsQuery> get(UUID queryId) throws QueryRepositoryException;
+
+    /**
      * Removes an existing query from Rya Streams.
      *
      * @param queryID - The {@link UUID} of the query to remove. (not null)
@@ -53,8 +64,7 @@ public interface QueryRepository extends AutoCloseable {
      * Lists all existing queries in Rya Streams.
      *
      * @return - A List of the current {@link StreamsQuery}s
-     * @throws QueryRepositoryException The {@link StreamsQuery}s could not be
-     *         listed.
+     * @throws QueryRepositoryException The {@link StreamsQuery}s could not be listed.
      */
     public Set<StreamsQuery> list() throws QueryRepositoryException;
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 25cbab2..92193ca 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -19,16 +19,17 @@
 package org.apache.rya.streams.api.queries;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
-import org.apache.rya.streams.api.queries.QueryRepository.QueryRepositoryException;
 import org.junit.Test;
 
 /**
@@ -37,69 +38,95 @@ import org.junit.Test;
 public class InMemoryQueryRepositoryTest {
 
     @Test
-    public void canReadAddedQueries() throws QueryRepositoryException {
+    public void canReadAddedQueries() throws Exception {
         // Setup a totally in memory QueryRepository.
-        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() );
-
-        // Add some queries to it.
-        final Set<StreamsQuery> expected = new HashSet<>();
-        expected.add( queries.add("query 1") );
-        expected.add( queries.add("query 2") );
-        expected.add( queries.add("query 3") );
-
-        // Show they are in the list of all queries.
-        final Set<StreamsQuery> stored = queries.list();
-        assertEquals(expected, stored);
+        try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+            // Add some queries to it.
+            final Set<StreamsQuery> expected = new HashSet<>();
+            expected.add( queries.add("query 1") );
+            expected.add( queries.add("query 2") );
+            expected.add( queries.add("query 3") );
+
+            // Show they are in the list of all queries.
+            final Set<StreamsQuery> stored = queries.list();
+            assertEquals(expected, stored);
+        }
     }
 
     @Test
-    public void deletedQueriesDisappear() throws QueryRepositoryException {
+    public void deletedQueriesDisappear() throws Exception {
         // Setup a totally in memory QueryRepository.
-        final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() );
-
-        // Add some queries to it. The second one we will delete.
-        final Set<StreamsQuery> expected = new HashSet<>();
-        expected.add( queries.add("query 1") );
-        final UUID deletedMeId = queries.add("query 2").getQueryId();
-        expected.add( queries.add("query 3") );
-
-        // Delete the second query.
-        queries.delete( deletedMeId );
-
-        // Show only queries 1 and 3 are in the list.
-        final Set<StreamsQuery> stored = queries.list();
-        assertEquals(expected, stored);
+        try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+            // Add some queries to it. The second one we will delete.
+            final Set<StreamsQuery> expected = new HashSet<>();
+            expected.add( queries.add("query 1") );
+            final UUID deletedMeId = queries.add("query 2").getQueryId();
+            expected.add( queries.add("query 3") );
+
+            // Delete the second query.
+            queries.delete( deletedMeId );
+
+            // Show only queries 1 and 3 are in the list.
+            final Set<StreamsQuery> stored = queries.list();
+            assertEquals(expected, stored);
+        }
     }
 
     @Test
-    public void initializedWithPopulatedChnageLog() throws QueryRepositoryException {
+    public void initializedWithPopulatedChnageLog() throws Exception {
         // Setup a totally in memory QueryRepository. Hold onto the change log so that we can use it again later.
         final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
-        final QueryRepository queries = new InMemoryQueryRepository( changeLog );
-
-        // Add some queries and deletes to it.
-        final Set<StreamsQuery> expected = new HashSet<>();
-        expected.add( queries.add("query 1") );
-        final UUID deletedMeId = queries.add("query 2").getQueryId();
-        expected.add( queries.add("query 3") );
-        queries.delete( deletedMeId );
-
-        // Create a new totally in memory QueryRepository.
-        final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog );
-
-        // Listing the queries should work using an initialized change log.
-        final Set<StreamsQuery> stored = initializedQueries.list();
-        assertEquals(expected, stored);
+        try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) {
+            // Add some queries and deletes to it.
+            final Set<StreamsQuery> expected = new HashSet<>();
+            expected.add( queries.add("query 1") );
+            final UUID deletedMeId = queries.add("query 2").getQueryId();
+            expected.add( queries.add("query 3") );
+            queries.delete( deletedMeId );
+
+            // Create a new totally in memory QueryRepository.
+            try(final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog )) {
+                // Listing the queries should work using an initialized change log.
+                final Set<StreamsQuery> stored = initializedQueries.list();
+                assertEquals(expected, stored);
+            }
+        }
     }
 
     @Test(expected = RuntimeException.class)
-    public void changeLogThrowsExceptions() throws QueryChangeLogException, QueryRepositoryException {
+    public void changeLogThrowsExceptions() throws Exception {
         // Create a mock change log that throws an exception when you try to list what is in it.
         final QueryChangeLog changeLog = mock(QueryChangeLog.class);
         when(changeLog.readFromStart()).thenThrow(new QueryChangeLogException("Mocked exception."));
 
         // Create the QueryRepository and invoke one of the methods.
-        final QueryRepository queries = new InMemoryQueryRepository( changeLog );
-        queries.list();
+        try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) {
+            queries.list();
+        }
+    }
+
+    @Test
+    public void get_present() throws Exception {
+        // Setup a totally in memory QueryRepository.
+        try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+            // Add a query to it.
+            final StreamsQuery query = queries.add("query 1");
+
+            // Show the fetched query matches the expected ones.
+            final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
+            assertEquals(query, fetched.get());
+        }
+    }
+
+    @Test
+    public void get_notPresent() throws Exception {
+        // Setup a totally in memory QueryRepository.
+        try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+            // Fetch a query that was never added to the repository.
+            final Optional<StreamsQuery> query = queries.get(UUID.randomUUID());
+
+            // Show it could not be found.
+            assertFalse(query.isPresent());
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
index 5c0816f..05e75d9 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
@@ -30,6 +30,7 @@ import org.apache.rya.streams.client.command.AddQueryCommand;
 import org.apache.rya.streams.client.command.DeleteQueryCommand;
 import org.apache.rya.streams.client.command.ListQueriesCommand;
 import org.apache.rya.streams.client.command.LoadStatementsCommand;
+import org.apache.rya.streams.client.command.RunQueryCommand;
 import org.apache.rya.streams.client.command.StreamResultsCommand;
 
 import com.google.common.collect.ImmutableMap;
@@ -63,6 +64,7 @@ public class CLIDriver {
         commandClasses.add(DeleteQueryCommand.class);
         commandClasses.add(ListQueriesCommand.class);
         commandClasses.add(LoadStatementsCommand.class);
+        commandClasses.add(RunQueryCommand.class);
         commandClasses.add(StreamResultsCommand.class);
         final ImmutableMap.Builder<String, RyaStreamsCommand> builder = ImmutableMap.builder();
         for(final Class<? extends RyaStreamsCommand> commandClass : commandClasses) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
index 9414b28..42020b3 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
@@ -127,6 +127,7 @@ public class LoadStatementsCommand implements RyaStreamsCommand {
         final Properties producerProps = buildProperties(params);
         try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(producerProps)) {
             final LoadStatements statements = new KafkaLoadStatements(KafkaTopics.statementsTopic(params.ryaInstance), producer);
+            System.out.printf("Loading statements from file `%s` using visibilities `%s`.\n", statementsPath, params.visibilities);
             statements.fromFile(statementsPath, params.visibilities);
         } catch (final Exception e) {
             System.err.println("Unable to parse statements file: " + statementsPath.toString());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
new file mode 100644
index 0000000..8f7f162
--- /dev/null
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.interactor.KafkaRunQuery;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.base.Strings;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A command that runs a Rya Streams processing topology on the node the client is executed on until it has finished.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RunQueryCommand implements RyaStreamsCommand {
+
+    private class RunParameters extends RyaStreamsCommand.KafkaParameters {
+        @Parameter(names = { "--queryID", "-q" }, required = true, description = "The ID of the query to run.")
+        private String queryId;
+
+        @Parameter(names = {"--zookeepers", "-z"}, required = true, description = "The servers that Zookeeper runs on.")
+        private String zookeeperServers;
+
+        @Override
+        public String toString() {
+            final StringBuilder parameters = new StringBuilder();
+            parameters.append(super.toString());
+
+            if (!Strings.isNullOrEmpty(queryId)) {
+                parameters.append("\tQueryID: " + queryId);
+                parameters.append("\n");
+            }
+            return parameters.toString();
+        }
+    }
+
+    @Override
+    public String getCommand() {
+        return "run-query";
+    }
+
+    @Override
+    public String getDescription() {
+        return "Runs a Rya Streams query until the command is killed. This command also creates the input and output " +
+                "topics required to execute the query.";
+    }
+
+    @Override
+    public String getUsage() {
+        final JCommander parser = new JCommander(new RunParameters());
+
+        final StringBuilder usage = new StringBuilder();
+        parser.usage(usage);
+        return usage.toString();
+    }
+
+    @Override
+    public boolean validArguments(final String[] args) {
+        boolean valid = true;
+        try {
+            new JCommander(new RunParameters(), args);
+        } catch(final ParameterException e) {
+            valid = false;
+        }
+        return valid;
+    }
+
+    @Override
+    public void execute(final String[] args) throws ArgumentsException, ExecutionException {
+        requireNonNull(args);
+
+        // Parse the command line arguments.
+        final RunParameters params = new RunParameters();
+        try {
+            new JCommander(params, args);
+        } catch(final ParameterException e) {
+            throw new ArgumentsException("Could not add a new query because of invalid command line parameters.", e);
+        }
+
+        // Create the Kafka backed QueryChangeLog.
+        final String bootstrapServers = params.kafkaIP + ":" + params.kafkaPort;
+        final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
+        final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
+
+        // Look up the query to be executed from the change log.
+        try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+            try {
+                final UUID queryId = UUID.fromString( params.queryId );
+                final Optional<StreamsQuery> query = queryRepo.get(queryId);
+
+                if(!query.isPresent()) {
+                    throw new ArgumentsException("There is no registered query for queryId " + params.queryId);
+                }
+
+                // Make sure the topics required by the application exists for the specified Rya instances.
+                final Set<String> topics = new HashSet<>();
+                topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
+                topics.add( KafkaTopics.queryResultsTopic(queryId) );
+                KafkaTopics.createTopic(params.zookeeperServers, topics, 1, 1);
+
+                // Run the query that uses those topics.
+                final KafkaRunQuery runQuery = new KafkaRunQuery(
+                        params.kafkaIP,
+                        params.kafkaPort,
+                        KafkaTopics.statementsTopic(params.ryaInstance),
+                        KafkaTopics.queryResultsTopic(queryId),
+                        queryRepo,
+                        new TopologyFactory());
+                runQuery.run(queryId);
+            } catch(final Exception e) {
+                throw new ExecutionException("Could not execute the Run Query command.", e);
+            }
+        } catch(final ArgumentsException | ExecutionException e) {
+            // Rethrow the exceptions that are advertised by execute.
+            throw e;
+        } catch (final Exception e) {
+            throw new ExecutionException("Problem encountered while closing the QueryRepository.", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 9de978b..64f78a3 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -126,7 +126,7 @@ public class StreamResultsCommand implements RyaStreamsCommand {
         // Execute the command.
         final GetQueryResultStream getQueryResultStream = new KafkaGetQueryResultStream(params.kafkaIP, params.kafkaPort);
 
-        try (final QueryResultStream stream = getQueryResultStream.fromNow(queryId)) {
+        try (final QueryResultStream stream = getQueryResultStream.fromStart(queryId)) {
             while(!finished.get()) {
                 for(final VisibilityBindingSet visBs : stream.poll(1000)) {
                     System.out.println(visBs);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index ee4378e..3a412d2 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -20,17 +20,11 @@ package org.apache.rya.streams.client.command;
 
 import static org.junit.Assert.assertEquals;
 
-import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.streams.api.entity.StreamsQuery;
@@ -43,6 +37,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -54,49 +49,27 @@ import org.junit.Test;
 public class AddQueryCommandIT {
 
     private final String ryaInstance = UUID.randomUUID().toString();
-
-    private String kafkaIp;
-    private String kafkaPort;
     private QueryRepository queryRepo;
 
-    private Producer<?, QueryChange> queryProducer = null;
-    private Consumer<?, QueryChange> queryConsumer = null;
-
     @Rule
-    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
 
     @Before
     public void setup() {
-        final Properties props = rule.createBootstrapServerConfig();
-        final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
-        final String[] tokens = location.split(":");
-
-        kafkaIp = tokens[0];
-        kafkaPort = tokens[1];
-
-        // Initialize the QueryRepository.
-        final Properties producerProperties = new Properties();
-        producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
-        producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
-
-        final Properties consumerProperties = new Properties();
-        consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
-        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
-
-        queryProducer = new KafkaProducer<>(producerProperties);
-        queryConsumer = new KafkaConsumer<>(consumerProperties);
-
+        // Make sure the topic that the change log uses exists.
         final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+        kafka.createTopic(changeLogTopic);
+
+        // Setup the QueryRepository used by the test.
+        final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
+        final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
         queryRepo = new InMemoryQueryRepository(changeLog);
     }
 
     @After
-    public void cleanup() {
-        queryProducer.close();
-        queryConsumer.close();
+    public void cleanup() throws Exception {
+        queryRepo.close();
     }
 
     @Test
@@ -105,8 +78,8 @@ public class AddQueryCommandIT {
         final String query = "SELECT * WHERE { ?person <urn:name> ?name }";
         final String[] args = new String[] {
                 "-r", "" + ryaInstance,
-                "-i", kafkaIp,
-                "-p", kafkaPort,
+                "-i", kafka.getKafkaHostname(),
+                "-p", kafka.getKafkaPort(),
                 "-q", query
         };
 
@@ -126,8 +99,8 @@ public class AddQueryCommandIT {
         final String query = "SELECT * WHERE { ?person <urn:name> ?name }";
         final String[] args = new String[] {
                 "--ryaInstance", "" + ryaInstance,
-                "--kafkaHostname", kafkaIp,
-                "--kafkaPort", kafkaPort,
+                "--kafkaHostname", kafka.getKafkaHostname(),
+                "--kafkaPort", kafka.getKafkaPort(),
                 "--query", query
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index c5dad3d..91647f2 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -18,20 +18,15 @@
  */
 package org.apache.rya.streams.client.command;
 
+import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
-import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.streams.api.entity.StreamsQuery;
@@ -44,8 +39,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.rya.test.kafka.KafkaTestUtil;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -54,124 +48,103 @@ import org.junit.Test;
  */
 public class DeleteQueryCommandIT {
 
-    private final String ryaInstance = UUID.randomUUID().toString();
-
-    private String kafkaIp;
-    private String kafkaPort;
-
-    private Producer<?, QueryChange> queryProducer = null;
-    private Consumer<?, QueryChange> queryConsumer = null;
-
     @Rule
-    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
-
-    @Before
-    public void setup() {
-        final Properties props = rule.createBootstrapServerConfig();
-        final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
-        final String[] tokens = location.split(":");
-
-        kafkaIp = tokens[0];
-        kafkaPort = tokens[1];
-    }
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
 
     /**
      * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need
      * to re-create the repo outside of the command to ensure it has the most up to date values inside of it.
+     *
+     * @param ryaInstance - The rya instance the repository is connected to. (not null)
+     * @param createTopic - Set this to true if the topic doesn't exist yet.
      */
-    private QueryRepository makeQueryRepository() {
-        final Properties producerProperties = new Properties();
-        producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
-        producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
-
-        final Properties consumerProperties = new Properties();
-        consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
-        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
-
-        cleanup();
-        queryProducer = new KafkaProducer<>(producerProperties);
-        queryConsumer = new KafkaConsumer<>(consumerProperties);
+    private QueryRepository makeQueryRepository(final String ryaInstance, final boolean createTopic) {
+        requireNonNull(ryaInstance);
 
+        // Make sure the topic that the change log uses exists.
         final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+        if(createTopic) {
+            kafka.createTopic(changeLogTopic);
+        }
+
+        // Setup the QueryRepository used by the test.
+        final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
+        final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
         return new InMemoryQueryRepository(changeLog);
     }
 
-    @After
-    public void cleanup() {
-        if(queryProducer != null) {
-            queryProducer.close();
-        }
-        if(queryConsumer != null) {
-            queryConsumer.close();
-        }
-    }
-
     @Test
     public void shortParams() throws Exception {
+        final String ryaInstance = UUID.randomUUID().toString();
+
         // Add a few queries to Rya Streams.
-        QueryRepository repo = makeQueryRepository();
-        repo.add("query1");
-        final UUID query2Id = repo.add("query2").getQueryId();
-        repo.add("query3");
-
-        // Show that all three of the queries were added.
-        Set<StreamsQuery> queries = repo.list();
-        assertEquals(3, queries.size());
-
-        // Delete query 2 using the delete query command.
-        final String[] deleteArgs = new String[] {
-                "-r", "" + ryaInstance,
-                "-i", kafkaIp,
-                "-p", kafkaPort,
-                "-q", query2Id.toString()
-        };
-
-        final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
-        deleteCommand.execute(deleteArgs);
-
-        // Show query2 was deleted.
-        repo = makeQueryRepository();
-        queries = repo.list();
-        assertEquals(2, queries.size());
-
-        for(final StreamsQuery query : queries) {
-            assertNotEquals(query2Id, query.getQueryId());
+        try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) {
+            repo.add("query1");
+            final UUID query2Id = repo.add("query2").getQueryId();
+            repo.add("query3");
+
+            // Show that all three of the queries were added.
+            Set<StreamsQuery> queries = repo.list();
+            assertEquals(3, queries.size());
+
+            // Delete query 2 using the delete query command.
+            final String[] deleteArgs = new String[] {
+                    "-r", "" + ryaInstance,
+                    "-i", kafka.getKafkaHostname(),
+                    "-p", kafka.getKafkaPort(),
+                    "-q", query2Id.toString()
+            };
+
+            final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
+            deleteCommand.execute(deleteArgs);
+
+            // Show query2 was deleted.
+            try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) {
+                queries = repo2.list();
+                assertEquals(2, queries.size());
+
+                for(final StreamsQuery query : queries) {
+                    assertNotEquals(query2Id, query.getQueryId());
+                }
+            }
         }
     }
 
     @Test
     public void longParams() throws Exception {
+        final String ryaInstance = UUID.randomUUID().toString();
+
         // Add a few queries to Rya Streams.
-        QueryRepository repo = makeQueryRepository();
-        repo.add("query1");
-        final UUID query2Id = repo.add("query2").getQueryId();
-        repo.add("query3");
-
-        // Show that all three of the queries were added.
-        Set<StreamsQuery> queries = repo.list();
-        assertEquals(3, queries.size());
-
-        // Delete query 2 using the delete query command.
-        final String[] deleteArgs = new String[] {
-                "--ryaInstance", "" + ryaInstance,
-                "--kafkaHostname", kafkaIp,
-                "--kafkaPort", kafkaPort,
-                "--queryID", query2Id.toString()
-        };
-
-        final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
-        deleteCommand.execute(deleteArgs);
-
-        // Show query2 was deleted.
-        repo = makeQueryRepository();
-        queries = repo.list();
-        assertEquals(2, queries.size());
-
-        for(final StreamsQuery query : queries) {
-            assertNotEquals(query2Id, query.getQueryId());
+        try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) {
+            repo.add("query1");
+            final UUID query2Id = repo.add("query2").getQueryId();
+            repo.add("query3");
+
+            // Show that all three of the queries were added.
+            Set<StreamsQuery> queries = repo.list();
+            assertEquals(3, queries.size());
+
+            // Delete query 2 using the delete query command.
+            final String[] deleteArgs = new String[] {
+                    "--ryaInstance", "" + ryaInstance,
+                    "--kafkaHostname", kafka.getKafkaHostname(),
+                    "--kafkaPort", kafka.getKafkaPort(),
+                    "--queryID", query2Id.toString()
+            };
+
+            final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
+            deleteCommand.execute(deleteArgs);
+
+            // Show query2 was deleted.
+            try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) {
+                queries = repo2.list();
+                assertEquals(2, queries.size());
+
+                for(final StreamsQuery query : queries) {
+                    assertNotEquals(query2Id, query.getQueryId());
+                }
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index b32967e..00b4ce0 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -18,16 +18,10 @@
  */
 package org.apache.rya.streams.client.command;
 
-import java.util.Properties;
 import java.util.UUID;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
@@ -39,6 +33,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -50,52 +45,29 @@ import org.junit.Test;
 public class ListQueryCommandIT {
 
     private final String ryaInstance = UUID.randomUUID().toString();
-
-    private String kafkaIp;
-    private String kafkaPort;
     private QueryRepository queryRepo;
 
-    private Producer<?, QueryChange> queryProducer = null;
-    private Consumer<?, QueryChange> queryConsumer = null;
-
     @Rule
-    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
 
     @Before
     public void setup() {
-        final Properties props = rule.createBootstrapServerConfig();
-        final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
-        final String[] tokens = location.split(":");
-
-        kafkaIp = tokens[0];
-        kafkaPort = tokens[1];
-
-        // Initialize the QueryRepository.
-        final Properties producerProperties = new Properties();
-        producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
-        producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
-
-        final Properties consumerProperties = new Properties();
-        consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
-        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
-
-        queryProducer = new KafkaProducer<>(producerProperties);
-        queryConsumer = new KafkaConsumer<>(consumerProperties);
-
+        // Make sure the topic that the change log uses exists.
         final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+        kafka.createTopic(changeLogTopic);
+
+        // Setup the QueryRepository used by the test.
+        final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
+        final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
         queryRepo = new InMemoryQueryRepository(changeLog);
     }
 
     @After
-    public void cleanup() {
-        queryProducer.close();
-        queryConsumer.close();
+    public void cleanup() throws Exception {
+        queryRepo.close();
     }
 
-
     @Test
     public void shortParams() throws Exception {
         // Add a few queries to Rya Streams.
@@ -106,8 +78,8 @@ public class ListQueryCommandIT {
         // Execute the List Queries command.
         final String[] args = new String[] {
                 "-r", "" + ryaInstance,
-                "-i", kafkaIp,
-                "-p", kafkaPort
+                "-i", kafka.getKafkaHostname(),
+                "-p", kafka.getKafkaPort()
         };
 
         final ListQueriesCommand command = new ListQueriesCommand();
@@ -124,8 +96,8 @@ public class ListQueryCommandIT {
         // Execute the List Queries command.
         final String[] args = new String[] {
                 "--ryaInstance", "" + ryaInstance,
-                "--kafkaHostname", kafkaIp,
-                "--kafkaPort", kafkaPort
+                "--kafkaHostname", kafka.getKafkaHostname(),
+                "--kafkaPort", kafka.getKafkaPort()
         };
 
         final ListQueriesCommand command = new ListQueriesCommand();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
index 95a4876..03c31b4 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
@@ -26,21 +26,16 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Properties;
 import java.util.UUID;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Before;
+import org.apache.rya.test.kafka.KafkaTestUtil;
 import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.ValueFactory;
@@ -55,21 +50,8 @@ public class LoadStatementsCommandIT {
 
     private final String ryaInstance = UUID.randomUUID().toString();
 
-    private String kafkaIp;
-    private String kafkaPort;
-
     @Rule
-    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
-
-    @Before
-    public void setup() {
-        final Properties props = rule.createBootstrapServerConfig();
-        final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
-        final String[] tokens = location.split(":");
-
-        kafkaIp = tokens[0];
-        kafkaPort = tokens[1];
-    }
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
 
     @Test
     public void shortParams() throws Exception {
@@ -77,35 +59,27 @@ public class LoadStatementsCommandIT {
         final String visibilities = "a|b|c";
         final String[] args = new String[] {
                 "-r", "" + ryaInstance,
-                "-i", kafkaIp,
-                "-p", kafkaPort,
+                "-i", kafka.getKafkaHostname(),
+                "-p", kafka.getKafkaPort(),
                 "-f", TURTLE_FILE.toString(),
                 "-v", visibilities
         };
 
+        // Load the file of statements into the Statements topic.
         new LoadStatementsCommand().execute(args);
 
         // Show that the statements were loaded into the topic.
-        // Read a VisibilityBindingSet from the test topic.
         final List<VisibilityStatement> read = new ArrayList<>();
 
-        final Properties consumerProps = new Properties();
-        consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
-        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
-        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName());
-
-        try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) {
-            final String topic = KafkaTopics.statementsTopic(ryaInstance);
-            consumer.subscribe(Arrays.asList(topic));
-            final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000);
+        try(final Consumer<String, VisibilityStatement> consumer =
+                KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityStatementDeserializer.class)) {
+            // Subscribe for messages.
+            consumer.subscribe( Arrays.asList(KafkaTopics.statementsTopic(ryaInstance)) );
 
-            assertEquals(3, records.count());
-            final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator();
+            // Read the messages and extract their values.
+            final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = consumer.poll(3000).iterator();
             while(iter.hasNext()) {
-                final VisibilityStatement visiSet = iter.next().value();
-                read.add(visiSet);
+                read.add( iter.next().value() );
             }
         }
 
@@ -131,35 +105,27 @@ public class LoadStatementsCommandIT {
         final String visibilities = "a|b|c";
         final String[] args = new String[] {
                 "--ryaInstance", "" + ryaInstance,
-                "--kafkaHostname", kafkaIp,
-                "--kafkaPort", kafkaPort,
+                "--kafkaHostname", kafka.getKafkaHostname(),
+                "--kafkaPort", kafka.getKafkaPort(),
                 "--statementsFile", TURTLE_FILE.toString(),
                 "--visibilities", visibilities
         };
 
+        // Load the file of statements into the Statements topic.
         new LoadStatementsCommand().execute(args);
 
         // Show that the statements were loaded into the topic.
-        // Read a VisibilityBindingSet from the test topic.
         final List<VisibilityStatement> read = new ArrayList<>();
 
-        final Properties consumerProps = new Properties();
-        consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
-        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
-        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName());
-
-        try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) {
-            final String topic = KafkaTopics.statementsTopic(ryaInstance);
-            consumer.subscribe(Arrays.asList(topic));
-            final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000);
+        try(final Consumer<String, VisibilityStatement> consumer =
+                KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityStatementDeserializer.class)) {
+            // Subscribe for messages.
+            consumer.subscribe( Arrays.asList(KafkaTopics.statementsTopic(ryaInstance)) );
 
-            assertEquals(3, records.count());
-            final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator();
+            // Read the messages and extract their values.
+            final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = consumer.poll(3000).iterator();
             while(iter.hasNext()) {
-                final VisibilityStatement visiSet = iter.next().value();
-                read.add(visiSet);
+                read.add( iter.next().value() );
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
new file mode 100644
index 0000000..788b41f
--- /dev/null
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.command;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.interactor.LoadStatements;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.client.RyaStreamsCommand.ArgumentsException;
+import org.apache.rya.streams.client.RyaStreamsCommand.ExecutionException;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration tests the methods of {@link RunQueryCommand}.
+ */
+public class RunQueryCommandIT {
+
+    private final String ryaInstance = UUID.randomUUID().toString();
+
+    private QueryRepository queryRepo;
+    private Producer<String, VisibilityStatement> stmtProducer = null;
+    private Consumer<String, VisibilityBindingSet> resultConsumer = null;
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Before
+    public void setup() {
+        // Make sure the topic that the change log uses exists.
+        final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+        kafka.createTopic(changeLogTopic);
+
+        // Setup the QueryRepository used by the test.
+        final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
+        final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
+        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+        queryRepo = new InMemoryQueryRepository(changeLog);
+
+        // Initialize the Statements Producer and the Results Consumer.
+        stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
+        resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
+    }
+
+    @After
+    public void cleanup() throws Exception{
+        stmtProducer.close();
+        resultConsumer.close();
+        queryRepo.close();
+    }
+
+    @Test(expected = ExecutionException.class)
+    public void runUnregisteredQuery() throws Exception {
+        // Arguments that run a query that is not registered with Rya Streams.
+        final String[] args = new String[] {
+                "--ryaInstance", "" + ryaInstance,
+                "--kafkaHostname", kafka.getKafkaHostname(),
+                "--kafkaPort", kafka.getKafkaPort(),
+                "--queryID", UUID.randomUUID().toString()
+        };
+
+        // Run the test. This will throw an exception.
+        final RunQueryCommand command = new RunQueryCommand();
+        command.execute(args);
+    }
+
+    @Test
+    public void runQuery() throws Exception {
+        // Register a query with the Query Repository.
+        final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }");
+
+        // Arguments that run the query we just registered with Rya Streams.
+        final String[] args = new String[] {
+                "--ryaInstance", "" + ryaInstance,
+                "--kafkaHostname", kafka.getKafkaHostname(),
+                "--kafkaPort", kafka.getKafkaPort(),
+                "--queryID", sQuery.getQueryId().toString(),
+                "--zookeepers", kafka.getZookeeperServers()
+        };
+
+        // Create a new Thread that runs the command.
+        final Thread commandThread = new Thread() {
+            @Override
+            public void run() {
+                final RunQueryCommand command = new RunQueryCommand();
+                try {
+                    command.execute(args);
+                } catch (ArgumentsException | ExecutionException e) {
+                    // Do nothing. Test will still fail because the expected results will be missing.
+                }
+            }
+        };
+
+        // Create the statements that will be loaded.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(vf.createStatement(
+                vf.createURI("urn:Alice"),
+                vf.createURI("urn:worksAt"),
+                vf.createURI("urn:BurgerJoint")), "a"));
+        statements.add(new VisibilityStatement(vf.createStatement(
+                vf.createURI("urn:Bob"),
+                vf.createURI("urn:worksAt"),
+                vf.createURI("urn:TacoShop")), "a"));
+        statements.add(new VisibilityStatement(vf.createStatement(
+                vf.createURI("urn:Charlie"),
+                vf.createURI("urn:worksAt"),
+                vf.createURI("urn:TacoShop")), "a"));
+
+        // Create the expected results.
+        final List<VisibilityBindingSet> expected = new ArrayList<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoShop"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Charlie"));
+        bs.addBinding("business", vf.createURI("urn:TacoShop"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+
+        // Execute the test. This will result in a set of results that were read from the results topic.
+        final List<VisibilityBindingSet> results;
+        try {
+            // Wait for the program to start.
+            commandThread.start();
+            Thread.sleep(5000);
+
+            // Write some statements to the program.
+            final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+            final LoadStatements loadStatements = new KafkaLoadStatements(statementsTopic, stmtProducer);
+            loadStatements.fromCollection(statements);
+
+            // Read the output of the streams program.
+            final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+            resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
+            results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
+        } finally {
+            // Tear down the test.
+            commandThread.interrupt();
+            commandThread.join(3000);
+        }
+
+        // Show the read results matched the expected ones.
+        assertEquals(expected, results);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 16a8b8e..0ccbb6e 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -55,6 +55,16 @@ under the License.
         <!-- Kafka dependencies -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index a8fbf23..3e0df50 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -20,12 +20,19 @@ package org.apache.rya.streams.kafka;
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 
+import org.I0Itec.zkclient.ZkClient;
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
 
 /**
  * Creates the Kafka topic names that are used for Rya Streams systems.
@@ -66,4 +73,36 @@ public class KafkaTopics {
         requireNonNull(queryId);
         return "QueryResults-" + queryId.toString();
     }
+
+    /**
+     * Creates a set of Kafka topics for each topic that does not already exist.
+     *
+     * @param zookeeperServers - The Zookeeper servers that are used by the Kafka Streams program. (not null)
+     * @param topicNames - The topics that will be created. (not null)
+     * @param partitions - The number of partitions that each of the topics will have.
+     * @param replicationFactor - The replication factor of the topics that are created.
+     */
+    public static void createTopic(
+            final String zookeeperServers,
+            final Set<String> topicNames,
+            final int partitions,
+            final int replicationFactor) {
+        requireNonNull(zookeeperServers);
+        requireNonNull(topicNames);
+
+        ZkUtils zkUtils = null;
+        try {
+            zkUtils = ZkUtils.apply(new ZkClient(zookeeperServers, 30000, 30000, ZKStringSerializer$.MODULE$), false);
+            for(final String topicName : topicNames) {
+                if(!AdminUtils.topicExists(zkUtils, topicName)) {
+                    AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties(), RackAwareMode.Disabled$.MODULE$);
+                }
+            }
+        }
+        finally {
+            if(zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
index 8ab3ab6..d3ec650 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
@@ -64,7 +64,6 @@ public class KafkaLoadStatements implements LoadStatements {
         this.producer = requireNonNull(producer);
     }
 
-
     @Override
     public void fromFile(final Path statementsPath, final String visibilities) throws RyaStreamsException {
         requireNonNull(statementsPath);
@@ -77,7 +76,7 @@ public class KafkaLoadStatements implements LoadStatements {
         parser.setRDFHandler(new RDFHandlerBase() {
             @Override
             public void startRDF() throws RDFHandlerException {
-                log.trace("starting loading statements.");
+                log.trace("Starting loading statements.");
             }
 
             @Override
@@ -89,7 +88,7 @@ public class KafkaLoadStatements implements LoadStatements {
             @Override
             public void endRDF() throws RDFHandlerException {
                 producer.flush();
-                log.trace("done.");
+                log.trace("Done.");
             }
         });
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
new file mode 100644
index 0000000..e587998
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.RunQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Streams implementation of {@link RunQuery}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaRunQuery implements RunQuery {
+    private static final Logger log = LoggerFactory.getLogger(KafkaRunQuery.class);
+
+    private final String kafkaHostname;
+    private final String kafkaPort;
+    private final String statementsTopic;
+    private final String resultsTopic;
+    private final TopologyBuilderFactory topologyFactory;
+    private final QueryRepository queryRepo;
+
+    /**
+     * Constructs an instance of {@link KafkaRunQuery}.
+     *
+     * @param kafkaHostname - The hostname of the Kafka Broker to connect to. (not null)
+     * @param kafkaPort - The port of the Kafka Broker to connect to. (not null)
+     * @param statementsTopic - The name of the topic that statements will be read from. (not null)
+     * @param resultsTopic - The name of the topic that query results will be writen to. (not null)
+     * @param queryRepo - The query repository that holds queries that are registered. (not null)
+     * @param topologyFactory - Builds Kafka Stream processing topologies from SPARQL. (not null)
+     */
+    public KafkaRunQuery(
+            final String kafkaHostname,
+            final String kafkaPort,
+            final String statementsTopic,
+            final String resultsTopic,
+            final QueryRepository queryRepo,
+            final TopologyBuilderFactory topologyFactory) {
+        this.kafkaHostname = requireNonNull( kafkaHostname );
+        this.kafkaPort = requireNonNull( kafkaPort );
+        this.statementsTopic = requireNonNull(statementsTopic );
+        this.resultsTopic = requireNonNull( resultsTopic );
+        this.topologyFactory = requireNonNull( topologyFactory );
+        this.queryRepo = requireNonNull( queryRepo );
+    }
+
+    @Override
+    public void run(final UUID queryId) throws RyaStreamsException {
+        requireNonNull(queryId);
+
+        // Fetch the query from the repository. Throw an exception if it isn't present.
+        final Optional<StreamsQuery> query = queryRepo.get(queryId);
+        if(!query.isPresent()) {
+            throw new RyaStreamsException("Could not run the Query with ID " + queryId + " because no such query " +
+                    "is currently registered.");
+        }
+
+        // Build a processing topology using the SPARQL, provided statements topic, and provided results topic.
+        final String sparql = query.get().getSparql();
+        final TopologyBuilder topologyBuilder;
+        try {
+            topologyBuilder = topologyFactory.build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+        } catch (final Exception e) {
+            throw new RyaStreamsException("Could not run the Query with ID " + queryId + " because a processing " +
+                    "topolgoy could not be built for the SPARQL " + sparql, e);
+        }
+
+        // Setup the Kafka Stream program.
+        final Properties streamsProps = new Properties();
+        streamsProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kafkaPort);
+        streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" + UUID.randomUUID());
+
+        final KafkaStreams streams = new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));
+
+        // If an unhandled exception is thrown, rethrow it.
+        streams.setUncaughtExceptionHandler((t, e) -> {
+            // Log the problem and kill the program.
+            log.error("Unhandled exception while processing the Rya Streams query. Shutting down.", e);
+            System.exit(1);
+        });
+
+        // Setup a shutdown hook that kills the streams program at shutdown.
+        final CountDownLatch awaitTermination = new CountDownLatch(1);
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                awaitTermination.countDown();
+            }
+        });
+
+        // Run the streams program and wait for termination.
+        streams.start();
+        try {
+            awaitTermination.await();
+        } catch (final InterruptedException e) {
+            log.warn("Interrupted while waiting for termination. Shutting down.");
+        }
+        streams.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
index d73b40e..d12957a 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
@@ -34,6 +34,7 @@ import org.openrdf.query.impl.MapBindingSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -70,12 +71,12 @@ public class KeyValueJoinStateStore implements JoinStateStore {
     /**
      * This is the minimum value in UTF-8 character.
      */
-    private static final String START_RANGE_SUFFIX = new String(new byte[] { 0x00 } );
+    private static final String START_RANGE_SUFFIX = new String(new byte[] { 0x00 }, Charsets.UTF_8);
 
     /**
      * This is the maximum value of a UTF-8 character.
      */
-    private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF } );
+    private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF }, Charsets.UTF_8);
 
     /**
      * A default empty value that is stored for a start of range or end of range marker.