You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:21:01 UTC

[15/22] incubator-rya git commit: RYA-451 Fixing threading issues with the QueryManager class.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
index 947a215..3a59636 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -32,10 +32,15 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory.KafkaStreamsFactoryException;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.querymanager.QueryExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractIdleService;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -50,6 +55,7 @@ import kafka.consumer.KafkaStream;
  */
 @DefaultAnnotation(NonNull.class)
 public class LocalQueryExecutor extends AbstractIdleService implements QueryExecutor {
+    private static final Logger log = LoggerFactory.getLogger(LocalQueryExecutor.class);
 
     /**
      * Provides thread safety when interacting with this class.
@@ -72,6 +78,11 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec
     private final Map<UUID, KafkaStreams> byQueryId = new HashMap<>();
 
     /**
+     * Used to create the input and output topics for a Kafka Streams job.
+     */
+    private final CreateKafkaTopic createKafkaTopic;
+
+    /**
      * Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s.
      */
     private final KafkaStreamsFactory streamsFactory;
@@ -79,23 +90,31 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec
     /**
      * Constructs an instance of {@link LocalQueryExecutor}.
      *
+     * @param createKafkaTopic - Used to create the input and output topics for a Kafka Streams job. (not null)
      * @param streamsFactory - Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s. (not null)
      */
-    public LocalQueryExecutor(final KafkaStreamsFactory streamsFactory) {
+    public LocalQueryExecutor(
+            final CreateKafkaTopic createKafkaTopic,
+            final KafkaStreamsFactory streamsFactory) {
+        this.createKafkaTopic = requireNonNull(createKafkaTopic);
         this.streamsFactory = requireNonNull(streamsFactory);
     }
 
     @Override
     protected void startUp() throws Exception {
-        // Nothing to do.
+        log.info("Local Query Executor starting up.");
     }
 
     @Override
     protected void shutDown() throws Exception {
+        log.info("Local Query Executor shutting down. Stopping all jobs...");
+
         // Stop all of the running queries.
         for(final KafkaStreams job : byQueryId.values()) {
             job.close();
         }
+
+        log.info("Local Query Executor shut down.");
     }
 
     @Override
@@ -106,6 +125,14 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec
 
         lock.lock();
         try {
+            // Make sure the Statements topic exists for the query.
+            final Set<String> topics = Sets.newHashSet(
+                    KafkaTopics.statementsTopic(ryaInstance),
+                    KafkaTopics.queryResultsTopic(query.getQueryId()));
+
+            // Make sure the Query Results topic exists for the query.
+            createKafkaTopic.createTopics(topics, 1, 1);
+
             // Setup the Kafka Streams job that will execute.
             final KafkaStreams streams = streamsFactory.make(ryaInstance, query);
             streams.start();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
index c1285d4..21170bb 100644
--- a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
+++ b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
@@ -30,6 +30,13 @@ under the License.
             </xs:choice>
           </xs:complexType>
         </xs:element>
+        <xs:element name="queryExecutor">
+            <xs:complexType>
+              <xs:choice>
+                <xs:element name="localKafkaStreams" type="localKafkaStreams"/>
+              </xs:choice>
+            </xs:complexType>
+        </xs:element>
         <xs:element name="performanceTunning">
           <xs:complexType>
             <xs:sequence>
@@ -65,6 +72,13 @@ under the License.
     </xs:sequence>
   </xs:complexType>
   
+  <!-- Define what a local Kafka Streams query executor looks like. -->
+  <xs:complexType name="localKafkaStreams">
+    <xs:sequence>
+      <xs:element name="zookeepers" type="xs:string"/>
+    </xs:sequence>
+  </xs:complexType>
+  
   <!-- Define the legal range for a TCP port. -->
   <xs:simpleType name="tcpPort">
     <xs:restriction base="xs:int">

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
new file mode 100644
index 0000000..da9be78
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent.LogEventType;
+import org.apache.rya.streams.querymanager.QueryManager.LogEventWorkGenerator;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link LogEventWorkGenerator}.
+ */
+public class LogEventWorkGeneratorTest {
+
+    @Test
+    public void shutdownSignalKillsThread() {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the LogEventWorkGenerator work.
+        final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created change log.
+        final Thread notifyThread = new Thread(() -> {
+            generator.notifyCreate("rya", mock(QueryChangeLog.class));
+        });
+
+        // Fill the queue so that nothing may be offered to it.
+        queue.offer(LogEvent.delete("rya"));
+
+        // Start the thread and show that it is still alive after the offer period.
+        notifyThread.start();
+        assertTrue( ThreadUtil.stillAlive(notifyThread, 200) );
+
+        // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) );
+    }
+
+    @Test
+    public void notifyCreate() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the LogEventWorkGenerator work.
+        final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created change log.
+        final CountDownLatch notified = new CountDownLatch(1);
+        final Thread notifyThread = new Thread(() -> {
+            generator.notifyCreate("rya", mock(QueryChangeLog.class));
+            notified.countDown();
+        });
+
+        try {
+            // Start the thread that performs the notification.
+            notifyThread.start();
+
+            // Wait for the thread to indicate it has notified and check the queue for the value.
+            notified.await(200, TimeUnit.MILLISECONDS);
+            final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS);
+            assertEquals(LogEventType.CREATE, event.getEventType());
+            assertEquals("rya", event.getRyaInstanceName());
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyDelete() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the LogEventWorkGenerator work.
+        final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a deleted change log.
+        final CountDownLatch notified = new CountDownLatch(1);
+        final Thread notifyThread = new Thread(() -> {
+            generator.notifyDelete("rya");
+            notified.countDown();
+        });
+
+        try {
+            // Start the thread that performs the notification.
+            notifyThread.start();
+
+            // Wait for the thread to indicate it has notified and check the queue for the value.
+            notified.await(200, TimeUnit.MILLISECONDS);
+            final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS);
+            assertEquals(LogEventType.DELETE, event.getEventType());
+            assertEquals("rya", event.getRyaInstanceName());
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
new file mode 100644
index 0000000..cb708ed
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent;
+import org.apache.rya.streams.querymanager.QueryManager.LogEventWorker;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link LogEventWorker}.
+ */
+public class LogEventWorkerTest {
+
+    @Test
+    public void shutdownSignalKillsThread() {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The thread that will perform the LogEventWorker task.
+        final Thread logEventWorker = new Thread(new LogEventWorker(new ArrayBlockingQueue<>(1),
+                new ArrayBlockingQueue<>(1), 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        // Wait longer than the poll time to see if the thread died. Show that it is still running.
+        assertTrue(ThreadUtil.stillAlive(logEventWorker, 200));
+
+        // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse(ThreadUtil.stillAlive(logEventWorker, 500));
+    }
+
+    @Test
+    public void nofity_logCreated_doesNotExist() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The Query Change Log that will be watched.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+        // Write a message that indicates a new query should be active.
+        final UUID firstQueryId = UUID.randomUUID();
+        changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+
+        // Write a message that adds an active query, but then makes it inactive. Because both of these
+        // events are written to the log before the worker subscribes to the repository for updates, they
+        // must result in a single query stopped event.
+        final UUID secondQueryId = UUID.randomUUID();
+        changeLog.write(QueryChange.create(secondQueryId, "select * where { ?d ?e ?f . }", true));
+        changeLog.write(QueryChange.update(secondQueryId, false));
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was created.
+            final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+            logEventQueue.offer(createLogEvent);
+
+            // We must see the following Query Events added to the work queue.
+            // Query 1, executing.
+            // Query 2, stopped.
+            Set<QueryEvent> expectedEvents = new HashSet<>();
+            expectedEvents.add(QueryEvent.executing("rya",
+                    new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+            expectedEvents.add(QueryEvent.stopped("rya", secondQueryId));
+
+            Set<QueryEvent> queryEvents = new HashSet<>();
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+            assertEquals(expectedEvents, queryEvents);
+
+            // Write an event to the change log that stops the first query.
+            changeLog.write(QueryChange.update(firstQueryId, false));
+
+            // Show it was also reflected in the changes.
+            // Query 1, stopped.
+            expectedEvents = new HashSet<>();
+            expectedEvents.add(QueryEvent.stopped("rya", firstQueryId));
+
+            queryEvents = new HashSet<>();
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+            assertEquals(expectedEvents, queryEvents);
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+
+    @Test
+    public void nofity_logCreated_exists() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The Query Change Log that will be watched.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+        // Write a message that indicates a new query should be active.
+        final UUID firstQueryId = UUID.randomUUID();
+        changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was created.
+            final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+            logEventQueue.offer(createLogEvent);
+
+            // Say the same log was created a second time.
+            logEventQueue.offer(createLogEvent);
+
+            // Show that only a single unit of work was added for the log. This indicates the
+            // second message was effectively skipped as it would have add its work added twice otherwise.
+            final Set<QueryEvent> expectedEvents = new HashSet<>();
+            expectedEvents.add(QueryEvent.executing("rya",
+                    new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+
+            final Set<QueryEvent> queryEvents = new HashSet<>();
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+            assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+            assertEquals(expectedEvents, queryEvents);
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+
+    @Test
+    public void notify_logDeleted_exists() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The Query Change Log that will be watched.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was created.
+            final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+            logEventQueue.offer(createLogEvent);
+
+            // Write a unit of work that indicates a log was deleted.
+            logEventQueue.offer(LogEvent.delete("rya"));
+
+            // Show that a single unit of work was created for deleting everything for "rya".
+            assertEquals(QueryEvent.stopALL("rya"),  queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+            assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+
+    @Test
+    public void notify_logDeleted_doesNotExist() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was deleted. Since it was never created,
+            // this will not cause anything to be written to the QueryEvent queue.
+            logEventQueue.offer(LogEvent.delete("rya"));
+
+            // Show that a single unit of work was created for deleting everything for "rya".
+            assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
new file mode 100644
index 0000000..4495e19
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorkGenerator;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link QueryEventWorkGenerator}.
+ */
+public class QueryEventWorkGeneratorTest {
+
+    @Test
+    public void shutdownSignalKillsThread() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", new CountDownLatch(1), queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created query.
+        final Thread notifyThread = new Thread(() -> {
+            generator.notify(mock(ChangeLogEntry.class), Optional.empty());
+        });
+
+        // Fill the queue so that nothing may be offered to it.
+        queue.offer(QueryEvent.stopALL("rya"));
+
+        // Start the thread and show that it is still alive after the offer period.
+        notifyThread.start();
+        assertTrue( ThreadUtil.stillAlive(notifyThread, 200) );
+
+        // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) );
+    }
+
+    @Test
+    public void waitsForSubscriptionWork() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created query.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Wait longer than the blocking period and show the thread is still alive and nothing has been added
+            // to the work queue.
+            Thread.sleep(150);
+            assertTrue( notifyThread.isAlive() );
+
+            // Count down the latch.
+            latch.countDown();
+
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyCreate() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created query.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyDelete() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a deleted query.
+        final UUID queryId = UUID.randomUUID();
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.delete(queryId);
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+            generator.notify(entry, Optional.empty());
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.stopped("rya", queryId);
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyUpdate_isActive() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with an update query change.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.update(queryId, true);
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyUpdate_isNotActive() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with an update query change.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", false);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.update(queryId, false);
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.stopped("rya", queryId);
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
new file mode 100644
index 0000000..33c0719
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorker;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link QueryManager.QueryEventWorker}.
+ */
+public class QueryEventWorkerTest {
+
+    @Test
+    public void shutdownSignalKillsThread() {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The thread that will perform the QueryEventWorker task.
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(new ArrayBlockingQueue<>(1),
+                mock(QueryExecutor.class), 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        queryEventWorker.start();
+
+        // Wait longer than the poll time to see if the thread died. Show that it is still running.
+        assertTrue(ThreadUtil.stillAlive(queryEventWorker, 200));
+
+        // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse(ThreadUtil.stillAlive(queryEventWorker, 1000));
+    }
+
+    @Test
+    public void executingWork() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to send the execute work to the thread.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The message that indicates a query needs to be executed.
+        final String ryaInstance = "rya";
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "sparql", true);
+        final QueryEvent executingEvent = QueryEvent.executing(ryaInstance, query);
+
+        // Release a latch if the startQuery method on the queryExecutor is invoked with the correct values.
+        final CountDownLatch startQueryInvoked = new CountDownLatch(1);
+        final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+        doAnswer(invocation -> {
+            startQueryInvoked.countDown();
+            return null;
+        }).when(queryExecutor).startQuery(ryaInstance, query);
+
+        // The thread that will perform the QueryEventWorker task.
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+                queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        try {
+            queryEventWorker.start();
+
+            // Provide a message indicating a query needs to be executing.
+            queue.put(executingEvent);
+
+            // Verify the Query Executor was told to start the query.
+            assertTrue( startQueryInvoked.await(150, TimeUnit.MILLISECONDS) );
+        } finally {
+            shutdownSignal.set(true);
+            queryEventWorker.join();
+        }
+    }
+
+    @Test
+    public void stoppedWork() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to send the execute work to the thread.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The message that indicates a query needs to be stopped.
+        final UUID queryId = UUID.randomUUID();
+        final QueryEvent stoppedEvent = QueryEvent.stopped("rya", queryId);
+
+        // Release a latch if the stopQuery method on the queryExecutor is invoked with the correct values.
+        final CountDownLatch stopQueryInvoked = new CountDownLatch(1);
+        final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+        doAnswer(invocation -> {
+            stopQueryInvoked.countDown();
+            return null;
+        }).when(queryExecutor).stopQuery(queryId);
+
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+                queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        try {
+            // The thread that will perform the QueryEventWorker task.
+            queryEventWorker.start();
+
+            // Provide a message indicating a query needs to be executing.
+            queue.put(stoppedEvent);
+
+            // Verify the Query Executor was told to stop the query.
+            assertTrue( stopQueryInvoked.await(150, TimeUnit.MILLISECONDS) );
+        } finally {
+            shutdownSignal.set(true);
+            queryEventWorker.join();
+        }
+    }
+
+    @Test
+    public void stopAllWork() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to send the execute work to the thread.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The message that indicates all queries for a rya instance need to be stopped.
+        final String ryaInstance = "rya";
+        final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
+
+        // Release a latch if the stopQuery method on the queryExecutor is invoked with the correct values.
+        final CountDownLatch testMethodInvoked = new CountDownLatch(1);
+        final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+        doAnswer(invocation -> {
+            testMethodInvoked.countDown();
+            return null;
+        }).when(queryExecutor).stopAll(ryaInstance);
+
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+                queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        try {
+            // The thread that will perform the QueryEventWorker task.
+            queryEventWorker.start();
+
+            // Provide a message indicating a query needs to be executing.
+            queue.put(stopAllEvent);
+
+            // Verify the Query Executor was told to stop all the queries.
+            assertTrue( testMethodInvoked.await(150, TimeUnit.MILLISECONDS) );
+        } finally {
+            shutdownSignal.set(true);
+            queryEventWorker.join();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
index a1203a0..04e70c0 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.rya.streams.querymanager;
 
@@ -34,13 +36,10 @@ import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
 import org.junit.Test;
 
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
-
 /**
- * Test for the {@link QueryManager}
+ * Unit tests the methods of {@link QueryManager}.
  */
 public class QueryManagerTest {
-    private static final Scheduler TEST_SCHEDULER = Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS);
 
     /**
      * Tests when the query manager is notified to create a new query, the query
@@ -74,7 +73,7 @@ public class QueryManagerTest {
             return null;
         }).when(source).subscribe(any(SourceListener.class));
 
-        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
         try {
             qm.startAndWait();
             queryStarted.await(5, TimeUnit.SECONDS);
@@ -128,7 +127,7 @@ public class QueryManagerTest {
             return null;
         }).when(source).subscribe(any(SourceListener.class));
 
-        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
         try {
             qm.startAndWait();
             queryDeleted.await(5, TimeUnit.SECONDS);
@@ -183,7 +182,7 @@ public class QueryManagerTest {
             return null;
         }).when(source).subscribe(any(SourceListener.class));
 
-        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
         try {
             qm.startAndWait();
             queryDeleted.await(10, TimeUnit.SECONDS);
@@ -192,4 +191,4 @@ public class QueryManagerTest {
             qm.stopAndWait();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
new file mode 100644
index 0000000..9896e31
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Utilities that are useful for interacting with {@link Thread}s while testing.
+ */
+public class ThreadUtil {
+
+    /**
+     * Private constructor to prevent instantiation.
+     */
+    private ThreadUtil() { }
+
+    /**
+     * A utility function that returns whether a thread is alive or not after waiting
+     * some specified period of time to join it.
+     *
+     * @param thread - The thread that will be joined. (not null)
+     * @param millis - How long to wait to join the thread.
+     * @return {@code true} if the thread is still alive, otherwise {@code false}.
+     */
+    public static boolean stillAlive(final Thread thread, final long millis) {
+        requireNonNull(thread);
+        try {
+            thread.join(millis);
+        } catch (final InterruptedException e) { }
+        return thread.isAlive();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index 3cbe894..f9c8a03 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -35,6 +35,7 @@ import org.apache.rya.streams.api.interactor.LoadStatements;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
@@ -119,9 +120,10 @@ public class LocalQueryExecutorIT {
         expected.add(new VisibilityBindingSet(bs, "a"));
 
         // Start the executor that will be tested.
+        final CreateKafkaTopic createKafkaTopic = new CreateKafkaTopic( kafka.getZookeeperServers() );
         final String kafkaServers = kafka.getKafkaHostname() + ":" + kafka.getKafkaPort();
         final KafkaStreamsFactory jobFactory = new SingleThreadKafkaStreamsFactory(kafkaServers);
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(createKafkaTopic, jobFactory);
         executor.startAndWait();
         try {
             // Start the query.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
index 0df5794..c0f888e 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
@@ -32,6 +32,7 @@ import java.util.UUID;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.querymanager.QueryExecutor;
 import org.junit.Test;
 
@@ -44,7 +45,7 @@ public class LocalQueryExecutorTest {
 
     @Test(expected = IllegalStateException.class)
     public void startQuery_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true));
     }
 
@@ -60,7 +61,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the query.
@@ -75,14 +76,14 @@ public class LocalQueryExecutorTest {
 
     @Test(expected = IllegalStateException.class)
     public void stopQuery_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.stopQuery(UUID.randomUUID());
     }
 
     @Test
     public void stopQuery_queryNotRunning() throws Exception {
         // Start an executor.
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.startAndWait();
         try {
             // Try to stop a query that was never stareted.
@@ -104,7 +105,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the query.
@@ -122,7 +123,7 @@ public class LocalQueryExecutorTest {
 
     @Test(expected = IllegalStateException.class)
     public void stopAll_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.stopAll("rya");
     }
 
@@ -141,7 +142,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(queryJob2);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the queries.
@@ -180,7 +181,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance2), eq(query2))).thenReturn(queryJob2);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the queries.
@@ -205,14 +206,14 @@ public class LocalQueryExecutorTest {
 
     @Test(expected = IllegalStateException.class)
     public void getRunningQueryIds_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.getRunningQueryIds();
     }
 
     @Test
     public void getRunningQueryIds_noneStarted() throws Exception {
         // Start an executor.
-        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
         executor.startAndWait();
         try {
             // Get the list of running queries.
@@ -240,7 +241,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Start the queries.
@@ -275,7 +276,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Start the queries.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
index f2b50ab..de6b9f3 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
@@ -45,6 +45,11 @@ public class QueryManagerConfigMarshallerTest {
                 "            <port>6</port>\n" +
                 "        </kafka>\n" +
                 "    </queryChangeLogSource>\n" +
+                "    <queryExecutor>\n" +
+                "        <localKafkaStreams>\n" +
+                "            <zookeepers>zoo1,zoo2,zoo3</zookeepers>\n" +
+                "        </localKafkaStreams>\n" +
+                "    </queryExecutor>\n" +
                 "    <performanceTunning>\n" +
                 "        <queryChanngeLogDiscoveryPeriod>\n" +
                 "            <value>1</value>\n" +