You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:21:02 UTC

[16/22] incubator-rya git commit: RYA-451 Fixing threading issues with the QueryManager class.

RYA-451 Fixing threading issues with the QueryManager class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/1cd8db32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/1cd8db32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/1cd8db32

Branch: refs/heads/master
Commit: 1cd8db32e04f116934fef22fe9b465f7cd807755
Parents: a342fe2
Author: kchilton2 <ke...@gmail.com>
Authored: Fri Feb 2 22:47:59 2018 -0500
Committer: Valiyil <Pu...@parsons.com>
Committed: Fri Mar 9 12:59:55 2018 -0500

----------------------------------------------------------------------
 .../rya/streams/api/entity/StreamsQuery.java    |   4 +-
 .../api/queries/InMemoryQueryRepository.java    |  30 +-
 .../rya/streams/api/queries/QueryChange.java    |  10 +
 .../queries/InMemoryQueryRepositoryTest.java    |  16 +-
 .../client/command/ListQueriesCommand.java      |  12 +-
 .../streams/client/command/RunQueryCommand.java |   2 +-
 .../apache/rya/streams/kafka/KafkaTopics.java   |   4 +-
 .../kafka/SingleThreadKafkaStreamsFactory.java  |   2 +-
 .../kafka/interactor/CreateKafkaTopic.java      |  60 ++
 .../query-manager/src/main/README.txt           |   7 +-
 .../src/main/config/configuration.xml           |  10 +-
 .../query-manager/src/main/config/log4j.xml     |  17 +
 .../querymanager/QueryChangeLogSource.java      |   5 +-
 .../rya/streams/querymanager/QueryManager.java  | 929 ++++++++++++++++---
 .../querymanager/QueryManagerDaemon.java        |   8 +-
 .../kafka/KafkaQueryChangeLogSource.java        |  46 +-
 .../querymanager/kafka/LocalQueryExecutor.java  |  31 +-
 .../src/main/xsd/QueryManagerConfig.xsd         |  14 +
 .../querymanager/LogEventWorkGeneratorTest.java | 136 +++
 .../querymanager/LogEventWorkerTest.java        | 245 +++++
 .../QueryEventWorkGeneratorTest.java            | 265 ++++++
 .../querymanager/QueryEventWorkerTest.java      | 172 ++++
 .../streams/querymanager/QueryManagerTest.java  |  41 +-
 .../rya/streams/querymanager/ThreadUtil.java    |  48 +
 .../kafka/LocalQueryExecutorIT.java             |   4 +-
 .../kafka/LocalQueryExecutorTest.java           |  25 +-
 .../xml/QueryManagerConfigMarshallerTest.java   |   5 +
 27 files changed, 1907 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
index 7194834..11423bd 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
@@ -85,7 +85,7 @@ public class StreamsQuery {
         }
         return false;
     }
-    
+
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder();
@@ -95,7 +95,7 @@ public class StreamsQuery {
         sb.append(getSparql() + "\n");
         sb.append("Is ");
         if (!isActive) {
-            sb.append(" Not ");
+            sb.append("Not ");
         }
         sb.append("Running.\n");
         return sb.toString();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index 5fb0297..95c1922 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -51,9 +51,9 @@ import info.aduna.iteration.CloseableIteration;
  */
 @DefaultAnnotation(NonNull.class)
 public class InMemoryQueryRepository extends AbstractScheduledService implements QueryRepository {
-    private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class);
+    private static final Logger log = LoggerFactory.getLogger(InMemoryQueryRepository.class);
 
-    private final ReentrantLock lock = new ReentrantLock();
+    private final ReentrantLock lock = new ReentrantLock(true);
 
     /**
      * The change log that is the ground truth for describing what the queries look like.
@@ -198,7 +198,6 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
 
     @Override
     protected void shutDown() throws Exception {
-        super.shutDown();
         lock.lock();
         try {
             changeLog.close();
@@ -211,11 +210,12 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
      * Updates the {@link #queriesCache} to reflect the latest position within the {@link #changeLog}.
      */
     private void updateCache() {
-        requireNonNull(changeLog);
+        log.trace("updateCache() - Enter");
 
         CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> it = null;
         try {
             // Iterate over everything since the last position that was handled within the change log.
+            log.debug("Starting cache position:" + cachePosition);
             if(cachePosition.isPresent()) {
                 it = changeLog.readFromPosition(cachePosition.get() + 1);
             } else {
@@ -228,6 +228,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
                 final QueryChange change = entry.getEntry();
                 final UUID queryId = change.getQueryId();
 
+                log.debug("Updating the cache to reflect:\n" + change);
+
                 switch(change.getChangeType()) {
                     case CREATE:
                         final StreamsQuery query = new StreamsQuery(
@@ -253,15 +255,17 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
                         break;
                 }
 
+                log.debug("Notifying listeners with the updated state.");
                 final Optional<StreamsQuery> newQueryState = Optional.ofNullable(queriesCache.get(queryId));
                 listeners.forEach(listener -> listener.notify(entry, newQueryState));
 
                 cachePosition = Optional.of( entry.getPosition() );
+                log.debug("New chache position: " + cachePosition);
             }
 
         } catch (final QueryChangeLogException e) {
             // Rethrow the exception because the object the supplier tried to create could not be created.
-            throw new RuntimeException("Could not initialize the " + InMemoryQueryRepository.class.getName(), e);
+            throw new RuntimeException("Could not update the cache of " + InMemoryQueryRepository.class.getName(), e);
 
         } finally {
             // Try to close the iteration if it was opened.
@@ -270,18 +274,22 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
                     it.close();
                 }
             } catch (final QueryChangeLogException e) {
-                LOG.error("Could not close the " + CloseableIteration.class.getName(), e);
+                log.error("Could not close the " + CloseableIteration.class.getName(), e);
             }
+
+            log.trace("updateCache() - Exit");
         }
     }
 
     @Override
     protected void runOneIteration() throws Exception {
+        log.trace("runOneIteration() - Enter");
         lock.lock();
         try {
             updateCache();
         } finally {
             lock.unlock();
+            log.trace("runOneIteration() - Exit");
         }
     }
 
@@ -292,17 +300,25 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
 
     @Override
     public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener) {
+        log.trace("subscribe(listener) - Enter");
+
         //locks to prevent the current state from changing while subscribing.
         lock.lock();
+        log.trace("subscribe(listener) - Acquired lock");
         try {
             listeners.add(listener);
+            log.trace("subscribe(listener) - Listener Registered");
 
             //return the current state of the query repository
-            return queriesCache.values()
+            final Set<StreamsQuery> queries = queriesCache.values()
                     .stream()
                     .collect(Collectors.toSet());
+            log.trace("subscribe(listener) - Returning " + queries.size() + " existing queries");
+            return queries;
         } finally {
+            log.trace("subscribe(listener) - Releasing lock");
             lock.unlock();
+            log.trace("subscribe(listener) - Exit");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
index d283957..d34a394 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
@@ -110,6 +110,16 @@ public final class QueryChange implements Serializable {
         return false;
     }
 
+    @Override
+    public String toString() {
+        return "QueryChange: {" +
+               "    Query ID: " + queryId + ",\n" +
+               "    Change Type: " + changeType + ",\n" +
+               "    Is Active: " + isActive + ",\n" +
+               "    SPARQL: " + sparql + "\n" +
+               "}";
+    }
+
     /**
      * Create a {@link QueryChange} that represents a new SPARQL query that will be managed by Rya Streams.
      *

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 3b3d48a..d7e116b 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -93,13 +93,9 @@ public class InMemoryQueryRepositoryTest {
 
             // Create a new totally in memory QueryRepository.
             final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE );
-            try {
-                // Listing the queries should work using an initialized change log.
-                final Set<StreamsQuery> stored = initializedQueries.list();
-                assertEquals(expected, stored);
-            } finally {
-                queries.stop();
-            }
+            // Listing the queries should work using an initialized change log.
+            final Set<StreamsQuery> stored = initializedQueries.list();
+            assertEquals(expected, stored);
         } finally {
             queries.stop();
         }
@@ -166,7 +162,7 @@ public class InMemoryQueryRepositoryTest {
             final StreamsQuery query = queries.add("query 1", true);
 
             final Set<StreamsQuery> existing = queries.subscribe((queryChangeEvent, newQueryState) -> {
-                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L,
+                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(1L,
                         QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
                 final Optional<StreamsQuery> expectedQueryState = Optional.of(
                         new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
@@ -197,7 +193,7 @@ public class InMemoryQueryRepositoryTest {
             //show listener on repo that query was added to is being notified of the new query.
             final CountDownLatch repo1Latch = new CountDownLatch(1);
             queries.subscribe((queryChangeEvent, newQueryState) -> {
-                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
                         QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
                 final Optional<StreamsQuery> expectedQueryState = Optional.of(
                         new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
@@ -210,7 +206,7 @@ public class InMemoryQueryRepositoryTest {
             //show listener not on the repo that query was added to is being notified as well.
             final CountDownLatch repo2Latch = new CountDownLatch(1);
             queries2.subscribe((queryChangeEvent, newQueryState) -> {
-                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
                         QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
                 final Optional<StreamsQuery> expectedQueryState = Optional.of(
                         new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index cd78975..a5507a6 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -23,6 +23,7 @@ import static java.util.Objects.requireNonNull;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.ListQueries;
@@ -112,12 +113,11 @@ public class ListQueriesCommand implements RyaStreamsCommand {
         sb.append("Queries in Rya Streams:\n");
         sb.append("---------------------------------------------------------\n");
         queries.forEach(query -> {
-            sb.append("ID: ");
-            sb.append(query.getQueryId());
-            sb.append("\t\t");
-            sb.append("Query: ");
-            sb.append(query.getSparql());
-            sb.append("\n");
+            sb.append("ID: ").append(query.getQueryId())
+                .append("    ")
+                .append("Is Active: ").append(query.isActive())
+                .append(StringUtils.rightPad("" + query.isActive(), 9))
+                .append("Query: ").append(query.getSparql()).append("\n");
         });
         return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index ddaf647..7b311f6 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -136,7 +136,7 @@ public class RunQueryCommand implements RyaStreamsCommand {
                 final Set<String> topics = new HashSet<>();
                 topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
                 topics.add( KafkaTopics.queryResultsTopic(queryId) );
-                KafkaTopics.createTopic(params.zookeeperServers, topics, 1, 1);
+                KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1);
 
                 // Run the query that uses those topics.
                 final KafkaRunQuery runQuery = new KafkaRunQuery(

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index 095465c..989799a 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -55,7 +55,7 @@ public class KafkaTopics {
     }
 
     /**
-     * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}.
+     * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChangeLog}.
      * <p/>
      * This is the inverse function of {@link #queryChangeLogTopic(String)}.
      *
@@ -106,7 +106,7 @@ public class KafkaTopics {
      * @param partitions - The number of partitions that each of the topics will have.
      * @param replicationFactor - The replication factor of the topics that are created.
      */
-    public static void createTopic(
+    public static void createTopics(
             final String zookeeperServers,
             final Set<String> topicNames,
             final int partitions,

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
index 7ab7e90..8093951 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
@@ -83,7 +83,7 @@ public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory {
         try {
             final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory());
             return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));
-        } catch (MalformedQueryException | TopologyBuilderException e) {
+        } catch (final MalformedQueryException | TopologyBuilderException e) {
             throw new KafkaStreamsFactoryException("Could not create a KafkaStreams processing topology for query " + query.getQueryId(), e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
new file mode 100644
index 0000000..771e1c8
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Set;
+
+import org.apache.rya.streams.kafka.KafkaTopics;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates topics in Kafka.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CreateKafkaTopic {
+
+    private final String zookeeperServers;
+
+    /**
+     * Constructs an instance of {@link CreateKafkaTopic}.
+     *
+     * @param zookeeperServers - The Zookeeper servers that are used to manage the Kafka instance. (not null)
+     */
+    public CreateKafkaTopic(final String zookeeperServers) {
+        this.zookeeperServers = requireNonNull(zookeeperServers);
+    }
+
+    /**
+     * Creates a set of Kafka topics for each topic that does not already exist.
+     *
+     * @param topicNames - The topics that will be created. (not null)
+     * @param partitions - The number of partitions that each of the topics will have.
+     * @param replicationFactor - The replication factor of the topics that are created.
+     */
+    public void createTopics(
+            final Set<String> topicNames,
+            final int partitions,
+            final int replicationFactor) {
+        KafkaTopics.createTopics(zookeeperServers, topicNames, partitions, replicationFactor);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/README.txt
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/README.txt b/extras/rya.streams/query-manager/src/main/README.txt
index 3b2dbfe..93d5ac5 100644
--- a/extras/rya.streams/query-manager/src/main/README.txt
+++ b/extras/rya.streams/query-manager/src/main/README.txt
@@ -41,8 +41,11 @@ Java 8
     yum install -y ${project.artifactId}-${project.version}.noarch.rpm
     
  3. Update the configuration file:
-    Replace "[Kafka Broker Hostname]" with the IP address of the Kafka broker.
-    Replace the Kafka port if using something other than the default of 9092.
+    A. Replace "[Zookeepers used to manage Kafka. E.g.: zoo1,zoo2,zoo3]" with 
+       the zookeepers used to manage the Kafka cluster. It is a comma separated 
+       list.
+    B. Replace "[Kafka Broker Hostname]" with the IP address of the Kafka broker.
+    C. Replace the Kafka port if using something other than the default of 9092.
     
  4. Start the service:
     systemctl start rya-streams-query-manager.service

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/config/configuration.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/config/configuration.xml b/extras/rya.streams/query-manager/src/main/config/configuration.xml
index 96da501..7077125 100644
--- a/extras/rya.streams/query-manager/src/main/config/configuration.xml
+++ b/extras/rya.streams/query-manager/src/main/config/configuration.xml
@@ -18,7 +18,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 <queryManagerConfig>
-    <!-- The  Query Change Log Sources. The source defines a system where Rya
+    <!-- The Query Change Log Sources. The source defines a system where Rya
        - Streams Query Change Logs are managed. The query manager will manage 
        - queries for all Rya instances whose change logs are stored within the
        - source.
@@ -29,6 +29,14 @@ under the License.
             <port>9092</port>
         </kafka>
     </queryChangeLogSource>
+
+    <!-- The Query Executor. The executor defines a system for executing the
+         Rya Streams queries. -->    
+    <queryExecutor>
+        <localKafkaStreams>
+            <zookeepers>[Zookeepers used to manage Kafka. E.g.: zoo1,zoo2,zoo3]</zookeepers>
+        </localKafkaStreams>
+    </queryExecutor>
     
     <!-- This section defines performance related tuning values. Sensible
        - default have been provided to simplify configuration. 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/config/log4j.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/config/log4j.xml b/extras/rya.streams/query-manager/src/main/config/log4j.xml
index 2021638..96ea56c 100644
--- a/extras/rya.streams/query-manager/src/main/config/log4j.xml
+++ b/extras/rya.streams/query-manager/src/main/config/log4j.xml
@@ -28,6 +28,23 @@ under the License.
         </layout>
     </appender>
 
+    <!-- Kafka configuration configs are loud. -->
+    <logger name="org.apache.kafka.streams.StreamsConfig">
+        <level value="OFF"/>
+    </logger>
+    <logger name="org.apache.kafka.clients.consumer.ConsumerConfig">
+        <level value="OFF"/>
+    </logger>
+    <logger name="org.apache.kafka.clients.producer.ProducerConfig">
+        <level value="OFF"/>
+    </logger>
+
+    <!-- Change this level to DEBUG to see more information about what the 
+         QueryManager is doing. -->
+    <logger name="org.apache.rya.streams.querymanager.QueryManager">
+        <level value="INFO"/>
+    </logger>
+
     <root>
         <level value="INFO" />
         <appender-ref ref="console" />

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
index 73e5d12..eb5ca08 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
@@ -51,8 +51,9 @@ public interface QueryChangeLogSource extends Service {
     public void unsubscribe(final SourceListener listener);
 
     /**
-     * A listener that is notified when a {@link QueryChangeLog} has
-     * been added or removed from a {@link QueryChangeLogSource}.
+     * A listener that is notified when a {@link QueryChangeLog} has been added or
+     * removed from a {@link QueryChangeLogSource}. The listener receives the only
+     * copy of the change log and is responsible for shutting it down.
      */
     @DefaultAnnotation(NonNull.class)
     public interface SourceListener {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
index 30b4538..e6bd800 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.rya.streams.querymanager;
 
@@ -20,16 +22,23 @@ import static java.util.Objects.requireNonNull;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.queries.ChangeLogEntry;
 import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
 import org.apache.rya.streams.api.queries.QueryChange;
-import org.apache.rya.streams.api.queries.QueryChange.ChangeType;
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryChangeLogListener;
 import org.apache.rya.streams.api.queries.QueryRepository;
@@ -38,8 +47,10 @@ import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -51,205 +62,823 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * instances/rya streams instances.
  */
 @DefaultAnnotation(NonNull.class)
-public class QueryManager extends AbstractIdleService {
-    private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class);
+public class QueryManager extends AbstractService {
+    private static final Logger log = LoggerFactory.getLogger(QueryManager.class);
+
+    /**
+     * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific
+     * Rya instnace.
+     */
+    private final QueryChangeLogSource changeLogSource;
 
+    /**
+     * The engine that is responsible for executing {@link StreamsQuery}s.
+     */
     private final QueryExecutor queryExecutor;
-    private final Scheduler scheduler;
 
     /**
-     * Map of Rya Instance name to {@link QueryRepository}.
+     * How long blocking operations will be attempted before potentially trying again.
      */
-    private final Map<String, QueryRepository> queryRepos = new HashMap<>();
+    private final long blockingValue;
 
-    private final ReentrantLock lock = new ReentrantLock();
+    /**
+     * The units for {@link #blockingValue}.
+     */
+    private final TimeUnit blockingUnits;
 
-    private final QueryChangeLogSource source;
+    /**
+     * Used to inform threads that the application is shutting down, so they must stop work.
+     */
+    private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+    /**
+     * This thread pool manages the two thread used to work the {@link LogEvent}s
+     * and the {@link QueryEvent}s.
+     */
+    private final ExecutorService executor = Executors.newFixedThreadPool(2);
 
     /**
      * Creates a new {@link QueryManager}.
      *
      * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null)
      * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null)
-     * @param scheduler - The {@link Scheduler} used to discover query changes
-     *      within the {@link QueryChangeLog}s (not null)
+     * @param blockingValue - How long blocking operations will try before looping. (> 0)
+     * @param blockingUnits - The units of the {@code blockingValue}. (not null)
      */
-    public QueryManager(final QueryExecutor queryExecutor, final QueryChangeLogSource source, final Scheduler scheduler) {
-        this.source = requireNonNull(source);
+    public QueryManager(
+            final QueryExecutor queryExecutor,
+            final QueryChangeLogSource source,
+            final long blockingValue,
+            final TimeUnit blockingUnits) {
+        this.changeLogSource = requireNonNull(source);
         this.queryExecutor = requireNonNull(queryExecutor);
-        this.scheduler = requireNonNull(scheduler);
+        Preconditions.checkArgument(blockingValue > 0, "The blocking value must be > 0. Was: " + blockingValue);
+        this.blockingValue = blockingValue;
+        this.blockingUnits = requireNonNull(blockingUnits);
+    }
+
+    @Override
+    protected void doStart() {
+        log.info("Starting a QueryManager.");
+
+        // A work queue of discovered Query Change Logs that need to be handled.
+        // This queue exists so that the source notifying thread may be released
+        // immediately instead of calling into blocking functions.
+        final BlockingQueue<LogEvent> logEvents = new ArrayBlockingQueue<>(1024);
+
+        // A work queue of discovered Query Changes from the monitored Query Change Logs
+        // that need to be handled. This queue exists so that the Query Repository notifying
+        // thread may be released immediately instead of calling into blocking functions.
+        final BlockingQueue<QueryEvent> queryEvents = new ArrayBlockingQueue<>(1024);
+
+        try {
+            // Start up a LogEventWorker using the executor service.
+            executor.submit(new LogEventWorker(logEvents, queryEvents, blockingValue, blockingUnits, shutdownSignal));
+
+            // Start up a QueryEvent Worker using the executor service.
+            executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal));
+
+            // Start up the query execution framework.
+            queryExecutor.startAndWait();
+
+            // Startup the source that discovers new Query Change Logs.
+            changeLogSource.startAndWait();
+
+            // Subscribe the source a listener that writes to the LogEventWorker's work queue.
+            changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal));
+        } catch(final RejectedExecutionException | UncheckedExecutionException e) {
+            log.error("Could not start up a QueryManager.", e);
+            notifyFailed(e);
+        }
+
+        // Notify the service was successfully started.
+        notifyStarted();
+
+        log.info("QueryManager has finished starting.");
+    }
+
+    @Override
+    protected void doStop() {
+        log.info("Stopping a QueryManager.");
+
+        // Set the shutdown flag so that all components that rely on that signal will stop processing.
+        shutdownSignal.set(true);
+
+        // Stop the workers and wait for them to die.
+        executor.shutdownNow();
+        try {
+            if(!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+                log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
+            }
+        } catch (final InterruptedException e) {
+            log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
+        }
+
+        // Stop the source of new Change Logs.
+        try {
+            changeLogSource.stopAndWait();
+        } catch(final UncheckedExecutionException e) {
+            log.warn("Could not stop the Change Log Source.", e);
+        }
+
+        // Stop the query execution framework.
+        try {
+            queryExecutor.stopAndWait();
+        } catch(final UncheckedExecutionException e) {
+            log.warn("Could not stop the Query Executor", e);
+        }
+
+        // Notify the service was successfully stopped.
+        notifyStopped();
+
+        log.info("QueryManager has finished stopping.");
     }
 
     /**
-     * Starts running a query.
+     * Offer a unit of work to a blocking queue until it is either accepted, or the
+     * shutdown signal is set.
      *
-     * @param ryaInstanceName - The Rya instance the query belongs to. (not null)
-     * @param query - The query to run.(not null)
+     * @param workQueue - The blocking work queue to write to. (not null)
+     * @param event - The event that will be offered to the work queue. (not null)
+     * @param offerValue - How long to wait when offering new work.
+     * @param offerUnits - The unit for the {@code offerValue}. (not null)
+     * @param shutdownSignal - Used to signal application shutdown has started, so
+     *   this method may terminate without ever placing the event on the queue. (not null)
+     * @return {@code true} if the evet nwas added to the queue, otherwise false.
      */
-    private void runQuery(final String ryaInstanceName, final StreamsQuery query) {
-        requireNonNull(ryaInstanceName);
-        requireNonNull(query);
-        LOG.info("Starting Query: " + query.toString() + " on the rya instance: " + ryaInstanceName);
+    private static <T> boolean offerUntilAcceptedOrShutdown(
+            final BlockingQueue<T> workQueue,
+            final T event,
+            final long offerValue,
+            final TimeUnit offerUnits,
+            final AtomicBoolean shutdownSignal) {
+        requireNonNull(workQueue);
+        requireNonNull(event);
+        requireNonNull(shutdownSignal);
 
-        try {
-            queryExecutor.startQuery(ryaInstanceName, query);
-        } catch (final QueryExecutorException e) {
-            LOG.error("Failed to start query.", e);
+        boolean submitted = false;
+        while(!submitted && !shutdownSignal.get()) {
+            try {
+                submitted = workQueue.offer(event, offerValue, offerUnits);
+                if(!submitted) {
+                    log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
+                }
+            } catch (final InterruptedException e) {
+                log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
+            }
         }
+        return submitted;
     }
 
     /**
-     * Stops the specified query from running.
-     *
-     * @param queryId - The ID of the query to stop running. (not null)
+     * An observation that a {@link QueryChangeLog} was created within or
+     * removed from a {@link QueryChangeLogSource}.
      */
-    private void stopQuery(final UUID queryId) {
-        requireNonNull(queryId);
+    @DefaultAnnotation(NonNull.class)
+    static class LogEvent {
+
+        /**
+         * The types of events that may be observed.
+         */
+        static enum LogEventType {
+            /**
+             * A {@link QueryChangeLog} was created within a {@link QueryChangeLogSource}.
+             */
+            CREATE,
+
+            /**
+             * A {@link QueryChangeLog} was deleted from a {@link QueryChangeLogSource}.
+             */
+            DELETE;
+        }
 
-        LOG.info("Stopping query: " + queryId.toString());
+        private final String ryaInstance;
+        private final LogEventType eventType;
+        private final Optional<QueryChangeLog> log;
 
-        try {
-            queryExecutor.stopQuery(queryId);
-        } catch (final QueryExecutorException e) {
-            LOG.error("Failed to stop query.", e);
+        /**
+         * Constructs an instance of {@link LogEvent}.
+         *
+         * @param ryaInstance - The Rya Instance the log is/was for. (not null)
+         * @param eventType - The type of event that was observed. (not null)
+         * @param log - The log if this is a create event. (not null)
+         */
+        private LogEvent(final String ryaInstance, final LogEventType eventType, final Optional<QueryChangeLog> log) {
+            this.ryaInstance = requireNonNull(ryaInstance);
+            this.eventType = requireNonNull(eventType);
+            this.log = requireNonNull(log);
+        }
+
+        /**
+         * @return The Rya Instance whose log was either created or deleted.
+         */
+        public String getRyaInstanceName() {
+            return ryaInstance;
+        }
+
+        /**
+         * @return The type of event that was observed.
+         */
+        public LogEventType getEventType() {
+            return eventType;
+        }
+
+        /**
+         * @return The {@link QueryChangeLog} if this is a CREATE event.
+         */
+        public Optional<QueryChangeLog> getQueryChangeLog() {
+            return log;
+        }
+
+        @Override
+        public String toString() {
+            return "LogEvent {\n" +
+                   "    Rya Instance: " + ryaInstance + ",\n" +
+                   "    Event Type: " + eventType + "\n" +
+                   "}";
+        }
+
+        /**
+         * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was created within a
+         * {@link QueryChangeLogSource}.
+         *
+         * @param ryaInstance - The Rya Instance the created log is for. (not null)
+         * @param log - The created {@link QueryChangeLog. (not null)
+         * @return A {@link LogEvent} built using the provided values.
+         */
+        public static LogEvent create(final String ryaInstance, final QueryChangeLog log) {
+            return new LogEvent(ryaInstance, LogEventType.CREATE, Optional.of(log));
+        }
+
+        /**
+         * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was deleted from
+         * a {@link QueryChangeLogSource}.
+         *
+         * @param ryaInstance - The Rya Instance whose log was deleted. (not null)
+         * @return A {@link LogEvent} built using the provided values.
+         */
+        public static LogEvent delete(final String ryaInstance) {
+            return new LogEvent(ryaInstance, LogEventType.DELETE, Optional.empty());
         }
     }
 
-    @Override
-    protected void startUp() throws Exception {
-        lock.lock();
-        try {
-            LOG.info("Starting Query Manager.");
-            queryExecutor.startAndWait();
-            source.startAndWait();
+    /**
+     * An observation that a {@link StreamsQuery} needs to be executing or not
+     * via the provided {@link QueryExecutor}.
+     */
+    @DefaultAnnotation(NonNull.class)
+    static class QueryEvent {
+
+        /**
+         * The type of events that may be observed.
+         */
+        public static enum QueryEventType {
+            /**
+             * Indicates a {@link StreamsQuery} needs to be executing.
+             */
+            EXECUTING,
+
+            /**
+             * Indicates a {@link StreamsQuery} needs to be stopped.
+             */
+            STOPPED,
+
+            /**
+             * Indicates all {@link StreamsQuery}s for a Rya instance need to be stopped.
+             */
+            STOP_ALL;
+        }
+
+        private final String ryaInstance;
+        private final QueryEventType type;
+        private final Optional<UUID> queryId;
+        private final Optional<StreamsQuery> query;
+
+        /**
+         * Constructs an instance of {@link QueryEvent}.
+         *
+         * @param ryaInstance - The Rya instance that generated the event. (not null)
+         * @param type - Indicates whether the query needs to be executing or not. (not null)
+         * @param queryId - If stopped, the ID of the query that must not be running. (not null)
+         * @param query - If executing, the StreamsQuery that defines what should be executing. (not null)
+         */
+        private QueryEvent(
+                final String ryaInstance,
+                final QueryEventType type,
+                final Optional<UUID> queryId,
+                final Optional<StreamsQuery> query) {
+            this.ryaInstance = requireNonNull(ryaInstance);
+            this.type = requireNonNull(type);
+            this.queryId = requireNonNull(queryId);
+            this.query = requireNonNull(query);
+        }
+
+        /**
+         * @return The Rya instance that generated the event.
+         */
+        public String getRyaInstance() {
+            return ryaInstance;
+        }
+
+        /**
+         * @return Indicates whether the query needs to be executing or not.
+         */
+        public QueryEventType getType() {
+            return type;
+        }
+
+        /**
+         * @return If stopped, the ID of the query that must not be running. Otherwise absent.
+         */
+        public Optional<UUID> getQueryId() {
+            return queryId;
+        }
+
+        /**
+         * @return If executing, the StreamsQuery that defines what should be executing. Otherwise absent.
+         */
+        public Optional<StreamsQuery> getStreamsQuery() {
+            return query;
+        }
 
-            // subscribe to the sources to be notified of changes.
-            source.subscribe(new QueryManagerSourceListener());
-        } finally {
-            lock.unlock();
+        @Override
+        public int hashCode() {
+            return Objects.hash(ryaInstance, type, queryId, query);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if(o instanceof QueryEvent) {
+                final QueryEvent other = (QueryEvent) o;
+                return Objects.equals(ryaInstance, other.ryaInstance) &&
+                        Objects.equals(type, other.type) &&
+                        Objects.equals(queryId, other.queryId) &&
+                        Objects.equals(query, other.query);
+            }
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder string = new StringBuilder();
+            string.append("Query Event {\n")
+                  .append("    Rya Instance: ").append(ryaInstance).append(",\n")
+                  .append("    Type: ").append(type).append(",\n");
+            switch(type) {
+                case EXECUTING:
+                    append(string, query.get());
+                    break;
+                case STOPPED:
+                    string.append("    Query ID: ").append(queryId.get()).append("\n");
+                    break;
+                case STOP_ALL:
+                    break;
+                default:
+                    // Default to showing everything that is in the object.
+                    string.append("    Query ID: ").append(queryId.get()).append("\n");
+                    append(string, query.get());
+                    break;
+            }
+            string.append("}");
+            return string.toString();
+        }
+
+        private void append(final StringBuilder string, final StreamsQuery query) {
+            requireNonNull(string);
+            requireNonNull(query);
+            string.append("    Streams Query {\n")
+                  .append("        Query ID: ").append(query.getQueryId()).append(",\n")
+                  .append("        Is Active: ").append(query.isActive()).append(",\n")
+                  .append("        SPARQL: ").append(query.getSparql()).append("\n")
+                  .append("    }");
+        }
+
+        /**
+         * Create a {@link QueryEvent} that indicates a query needs to be executing.
+         *
+         * @param ryaInstance - The Rya instance that generated the event. (not null)
+         * @param query - The StreamsQuery that defines what should be executing. (not null)
+         * @return A {@link QueryEvent} built using the provided values.
+         */
+        public static QueryEvent executing(final String ryaInstance, final StreamsQuery query) {
+            return new QueryEvent(ryaInstance, QueryEventType.EXECUTING, Optional.empty(), Optional.of(query));
+        }
+
+        /**
+         * Create a {@link QueryEvent} that indicates a query needs to be stopped.
+         *
+         * @param ryaInstance - The Rya instance that generated the event. (not null)
+         * @param queryId - The ID of the query that must not be running. (not null)
+         * @return A {@link QueryEvent} built using the provided values.
+         */
+        public static QueryEvent stopped(final String ryaInstance, final UUID queryId) {
+            return new QueryEvent(ryaInstance, QueryEventType.STOPPED, Optional.of(queryId), Optional.empty());
+        }
+
+        /**
+         * Create a {@link QueryEvent} that indicates all queries for a Rya instance needs to be stopped.
+         *
+         * @param ryaInstance - The Rya instance that generated the event. (not null)
+         * @return A {@link QueryEvent} built using the provided values.
+         */
+        public static QueryEvent stopALL(final String ryaInstance) {
+            return new QueryEvent(ryaInstance, QueryEventType.STOP_ALL, Optional.empty(), Optional.empty());
         }
     }
 
-    @Override
-    protected void shutDown() throws Exception {
-        lock.lock();
-        try {
-            LOG.info("Stopping Query Manager.");
-            source.stopAndWait();
-            queryExecutor.stopAndWait();
-        } finally {
-            lock.unlock();
+    /**
+     * Listens to a {@link QueryChangeLogSource} and adds observations to the provided
+     * work queue. It does so until the provided shutdown signal is set.
+     */
+    @DefaultAnnotation(NonNull.class)
+    static class LogEventWorkGenerator implements SourceListener {
+
+        private final BlockingQueue<LogEvent> workQueue;
+        private final AtomicBoolean shutdownSignal;
+        private final long offerValue;
+        private final TimeUnit offerUnits;
+
+        /**
+         * Constructs an instance of {@link QueryManagerSourceListener}.
+         *
+         * @param workQueue - A blocking queue that will have {@link LogEvent}s offered to it. (not null)
+         * @param offerValue - How long to wait when offering new work.
+         * @param offerUnits - The unit for the {@code offerValue}. (not null)
+         * @param shutdownSignal - Indicates to this listener that it needs to stop adding events
+         *   to the work queue because the application is shutting down. (not null)
+         */
+        public LogEventWorkGenerator(
+                final BlockingQueue<LogEvent> workQueue,
+                final long offerValue,
+                final TimeUnit offerUnits,
+                final AtomicBoolean shutdownSignal) {
+            this.workQueue = requireNonNull(workQueue);
+            this.shutdownSignal = requireNonNull(shutdownSignal);
+            this.offerValue = offerValue;
+            this.offerUnits = requireNonNull(offerUnits);
+        }
+
+        @Override
+        public void notifyCreate(final String ryaInstanceName, final QueryChangeLog changeLog) {
+            log.info("A new Query Change Log has been discovered for Rya Instance " + ryaInstanceName + ". All " +
+                    "queries that are set to active within it will be started.");
+
+            // Create an event that summarizes this notification.
+            final LogEvent event = LogEvent.create(ryaInstanceName, changeLog);
+
+            // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+            offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
+        }
+
+        @Override
+        public void notifyDelete(final String ryaInstanceName) {
+            log.info("The Query Change Log for Rya Instance " + ryaInstanceName + " has been deleted. All of the " +
+                    "queries related to that instance will be stopped.");
+
+            // Create an event that summarizes this notification.
+            final LogEvent event = LogEvent.delete(ryaInstanceName);
+
+            // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+            offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
         }
     }
 
     /**
-     * An implementation of {@link QueryChangeLogListener} for the
-     * {@link QueryManager}.
-     * <p>
-     * When notified of a {@link ChangeType} performs one of the following:
-     * <li>{@link ChangeType#CREATE}: Creates a new query using the
-     * {@link QueryExecutor} provided to the {@link QueryManager}</li>
-     * <li>{@link ChangeType#DELETE}: Deletes a running query by stopping the
-     * {@link QueryExecutor} service of the queryID in the event</li>
-     * <li>{@link ChangeType#UPDATE}: If the query is running and the update is
-     * to stop the query, stops the query. Otherwise, if the query is not
-     * running, it is removed.</li>
+     * Processes a work queue of {@link LogEvent}s.
+     * <p/>
+     * Whenever a new log has been created, then it registers a {@link QueryEventWorkGenerator}
+     * that generates {@link QueryEvent}s based on the content and updates to the discovered
+     * {@link QueryChagneLog}.
+     * <p/>
+     * Whenever a log is deleted, then the generator is stopped and a stop all {@link QueryEvent}
+     * is written to the work queue.
      */
-    private class QueryExecutionForwardingListener implements QueryChangeLogListener {
-        private final String ryaInstanceName;
+    @DefaultAnnotation(NonNull.class)
+    static class LogEventWorker implements Runnable {
 
         /**
-         * Creates a new {@link QueryExecutionForwardingListener}.
+         * A map of Rya Instance name to he Query Repository for that instance.
+         */
+        private final Map<String, QueryRepository> repos = new HashMap<>();
+
+        private final BlockingQueue<LogEvent> logWorkQueue;
+        private final BlockingQueue<QueryEvent> queryWorkQueue;
+        private final long blockingValue;
+        private final TimeUnit blockingUnits;
+        private final AtomicBoolean shutdownSignal;
+
+        /**
+         * Constructs an instance of {@link LogEventWorker}.
          *
-         * @param ryaInstanceName - The rya instance the query change is
-         *        performed on. (not null)
+         * @param logWorkQueue - A queue of {@link LogEvent}s that will be worked by this object. (not null)
+         * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
+         * @param blockingValue - How long to wait when polling/offering new work.
+         * @param blockingUnits - The unit for the {@code blockingValue}. (not null)
+         * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
+         *   may exit the {@link #run()} method. (not null)
          */
-        public QueryExecutionForwardingListener(final String ryaInstanceName) {
-            this.ryaInstanceName = requireNonNull(ryaInstanceName);
+        public LogEventWorker(
+                final BlockingQueue<LogEvent> logWorkQueue,
+                final BlockingQueue<QueryEvent> queryWorkQueue,
+                final long blockingValue,
+                final TimeUnit blockingUnits,
+                final AtomicBoolean shutdownSignal) {
+            this.logWorkQueue = requireNonNull(logWorkQueue);
+            this.queryWorkQueue = requireNonNull(queryWorkQueue);
+            this.blockingValue = blockingValue;
+            this.blockingUnits = requireNonNull(blockingUnits);
+            this.shutdownSignal = requireNonNull(shutdownSignal);
         }
 
         @Override
-        public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
-            LOG.debug("New query change event.");
-            final QueryChange entry = queryChangeEvent.getEntry();
+        public void run() {
+            // Run until the shutdown signal is set.
+            while(!shutdownSignal.get()) {
+                try {
+                    // Pull a unit of work from the queue.
+                    log.debug("LogEventWorker - Polling the work queue for a new LogEvent.");
+                    final LogEvent logEvent = logWorkQueue.poll(blockingValue, blockingUnits);
+                    if(logEvent == null) {
+                        // Poll again if nothing was found.
+                        continue;
+                    }
 
-            lock.lock();
-            try {
+                    log.info("LogEventWorker - handling: \n" + logEvent);
+                    final String ryaInstance = logEvent.getRyaInstanceName();
 
-                switch (entry.getChangeType()) {
-                    case CREATE:
-                        if(!newQueryState.isPresent()) {
-                            LOG.error("The query with ID: " + entry.getQueryId() + " must be present with the change to be created.");
-                            LOG.debug("newQueryState is not allowed to be absent with a CREATE QueryChange, there might be a bug in the QueryRepository.");
-                        } else {
-                            runQuery(ryaInstanceName, newQueryState.get());
-                        }
-                        break;
-                    case DELETE:
-                        stopQuery(entry.getQueryId());
-                        break;
-                    case UPDATE:
-                        if (!newQueryState.isPresent()) {
-                            LOG.error("The query with ID: " + entry.getQueryId() + " must be provided with the update, cannot perform update.");
-                            LOG.debug("newQueryState is not allowed to be absent with a UPDATE QueryChange, there might be a bug in the QueryRepository.");
-                        } else {
-                            final StreamsQuery updatedQuery = newQueryState.get();
-                            if (updatedQuery.isActive()) {
-                                runQuery(ryaInstanceName, updatedQuery);
-                                LOG.info("Starting query: " + updatedQuery.toString());
-                            } else {
-                                stopQuery(updatedQuery.getQueryId());
-                                LOG.info("Stopping query: " + updatedQuery.toString());
+                    switch(logEvent.getEventType()) {
+                        case CREATE:
+                            // If we see a create message for a Rya Instance we are already maintaining,
+                            // then don't do anything.
+                            if(repos.containsKey(ryaInstance)) {
+                                log.warn("LogEventWorker - A repository is already being managed for the Rya Instance " +
+                                        ryaInstance + ". This message will be ignored.");
+                                continue;
                             }
-                        }
-                        break;
+
+                            // Create and start a QueryRepository for the discovered log. Hold onto the repository
+                            // so that it may be shutdown later.
+                            final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits);
+                            final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler);
+                            repo.startAndWait();
+                            repos.put(ryaInstance, repo);
+
+                            // Subscribe a worker that adds the Query Events to the queryWorkQueue queue.
+                            // A count down latch is used to ensure the returned set of queries are handled
+                            // prior to any notifications from the repository.
+                            final CountDownLatch subscriptionWorkFinished = new CountDownLatch(1);
+                            final QueryEventWorkGenerator queryWorkGenerator =
+                                    new QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue,
+                                            blockingValue, blockingUnits, shutdownSignal);
+
+                            log.debug("LogEventWorker - Setting up a QueryWorkGenerator...");
+                            final Set<StreamsQuery> queries = repo.subscribe(queryWorkGenerator);
+                            log.debug("LogEventWorker - Finished setting up a QueryWorkGenerator.");
+
+                            // Handle the view of the queries within the repository as it existed when
+                            // the subscription was registered.
+                            queries.stream()
+                            .forEach(query -> {
+                                // Create a QueryEvent that represents the active state of the existing query.
+                                final QueryEvent queryEvent = query.isActive() ?
+                                        QueryEvent.executing(ryaInstance, query) : QueryEvent.stopped(ryaInstance, query.getQueryId());
+                                log.debug("LogEventWorker - offering: " + queryEvent);
+
+                                // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+                                offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue, blockingUnits, shutdownSignal);
+                            });
+
+                            // Indicate the subscription work is finished so that the registered listener may start
+                            // adding work to the queue.
+                            log.info("LogEventWorker - Counting down the subscription work latch.");
+                            subscriptionWorkFinished.countDown();
+                            break;
+
+                        case DELETE:
+                            if(repos.containsKey(ryaInstance)) {
+                                // Shut down the query repository for the Rya instance. This ensures the listener will
+                                // not receive any more work that needs to be done.
+                                final QueryRepository deletedRepo = repos.remove(ryaInstance);
+                                deletedRepo.stopAndWait();
+
+                                // Add work that stops all of the queries related to the instance.
+                                final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
+                                offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue, blockingUnits, shutdownSignal);
+                            }
+                            break;
+                    }
+                } catch (final InterruptedException e) {
+                    log.debug("LogEventWorker did not see any new events over the past 5 seconds. Polling again...");
                 }
-            } finally {
-                lock.unlock();
             }
+
+            log.info("LogEventWorker shutting down...");
+
+            // Shutdown all of the QueryRepositories that were started.
+            repos.values().forEach(repo -> repo.stopAndWait());
+
+            log.info("LogEventWorker shut down.");
         }
     }
 
     /**
-     * Listener used by the {@link QueryManager} to be notified when
-     * {@link QueryChangeLog}s are created or deleted.
+     * Listens to a {@link QueryRepository} and adds observations to the provided work queue.
+     * It does so until the provided shutdown signal is set.
      */
-    private class QueryManagerSourceListener implements SourceListener {
+    @DefaultAnnotation(NonNull.class)
+    static class QueryEventWorkGenerator implements QueryChangeLogListener {
+
+        private final String ryaInstance;
+        private final CountDownLatch subscriptionWorkFinished;
+        private final BlockingQueue<QueryEvent> queryWorkQueue;
+        private final long blockingValue;
+        private final TimeUnit blockingUnits;
+        private final AtomicBoolean shutdownSignal;
+
+        /**
+         * Constructs an instance of {@link QueryEventWorkGenerator}.
+         *
+         * @param ryaInstance - The rya instance whose log this objects is watching. (not null)
+         * @param subscriptionWorkFinished - Indicates when work that needs to be completed before this
+         *   listener handles notifications is completed. (not null)
+         * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
+         * @param blockingValue - How long to wait when polling/offering new work.
+         * @param blockingUnits - The unit for the {@code blockingValue}. (not null)
+         * @param shutdownSignal - Indicates to this listener that it needs to stop adding events
+         *   to the work queue because the application is shutting down. (not null)
+         */
+        public QueryEventWorkGenerator(
+                final String ryaInstance,
+                final CountDownLatch subscriptionWorkFinished,
+                final BlockingQueue<QueryEvent> queryWorkQueue,
+                final long blockingValue,
+                final TimeUnit blockingUnits,
+                final AtomicBoolean shutdownSignal) {
+            this.ryaInstance = requireNonNull(ryaInstance);
+            this.subscriptionWorkFinished = requireNonNull(subscriptionWorkFinished);
+            this.queryWorkQueue = requireNonNull(queryWorkQueue);
+            this.blockingValue = blockingValue;
+            this.blockingUnits = requireNonNull(blockingUnits);
+            this.shutdownSignal = requireNonNull(shutdownSignal);
+        }
+
         @Override
-        public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
-            lock.lock();
+        public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
+            requireNonNull(queryChangeEvent);
+            requireNonNull(newQueryState);
+
+            // Wait for the subscription work to be finished.
             try {
-                LOG.info("Discovered new Query Change Log for Rya Instance " + ryaInstanceName + ".");
-                final QueryRepository repo = new InMemoryQueryRepository(log, scheduler);
-                repo.startAndWait();
-                final Set<StreamsQuery> queries = repo.subscribe(new QueryExecutionForwardingListener(ryaInstanceName));
-                queries.forEach(query -> {
-                    if (query.isActive()) {
-                        try {
-                            queryExecutor.startQuery(ryaInstanceName, query);
-                        } catch (IllegalStateException | QueryExecutorException e) {
-                            LOG.error("Unable to start query for rya instance " + ryaInstanceName, e);
+                log.debug("Waiting for Subscription Work Finished latch to release...");
+                while(!shutdownSignal.get() && !subscriptionWorkFinished.await(blockingValue, blockingUnits)) {
+                    log.debug("Still waiting...");
+                }
+                log.debug("Subscription Work Finished latch to released.");
+            } catch (final InterruptedException e) {
+                log.warn("Interrupted while waiting for the Subscription Work Finished latch to be " +
+                        "released. Shutting down?", e);
+            }
+
+            // If we left the loop because of a shutdown, return immediately.
+            if(shutdownSignal.get()) {
+                log.debug("Not processing notification. Shutting down.");
+                return;
+            }
+
+            // Generate work from the notification.
+            final QueryChange change = queryChangeEvent.getEntry();
+            switch(change.getChangeType()) {
+                case CREATE:
+                    if(newQueryState.isPresent()) {
+                        log.info("Rya Instance " + ryaInstance + " created Rya Streams query " + newQueryState + ".");
+                        final StreamsQuery newQuery = newQueryState.get();
+                        if(newQuery.isActive()) {
+                            final QueryEvent executeNewQuery = QueryEvent.executing(ryaInstance, newQuery);
+                            offerUntilAcceptedOrShutdown(queryWorkQueue, executeNewQuery, blockingValue, blockingUnits, shutdownSignal);
                         }
+                    } else {
+                        log.error("Received a CREATE QueryChange for Rya Instance: " + ryaInstance +
+                                ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
+                                "StreamsQuery representing the created query. The query will not be processed.");
                     }
-                });
-                queryRepos.put(ryaInstanceName, repo);
-            } finally {
-                lock.unlock();
+                    break;
+
+                case DELETE:
+                    final UUID deletedQueryId = change.getQueryId();
+                    log.info("Rya Instance " + ryaInstance + " deleted Rya Streams query with ID " + deletedQueryId);
+                    final QueryEvent stopDeletedQuery = QueryEvent.stopped(ryaInstance, deletedQueryId);
+                    offerUntilAcceptedOrShutdown(queryWorkQueue, stopDeletedQuery, blockingValue, blockingUnits, shutdownSignal);
+                    break;
+
+                case UPDATE:
+                    if(newQueryState.isPresent()) {
+                        final StreamsQuery updatedQuery = newQueryState.get();
+                        if(updatedQuery.isActive()) {
+                            log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
+                                    updatedQuery.getQueryId() + " to be active.");
+                            final QueryEvent executeUpdatedQuery = QueryEvent.executing(ryaInstance, updatedQuery);
+                            offerUntilAcceptedOrShutdown(queryWorkQueue, executeUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
+                        } else {
+                            log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
+                                    updatedQuery.getQueryId() + " to be inactive.");
+                            final QueryEvent stopUpdatedQuery = QueryEvent.stopped(ryaInstance, updatedQuery.getQueryId());
+                            offerUntilAcceptedOrShutdown(queryWorkQueue, stopUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
+                        }
+                    } else {
+                        log.error("Received an UPDATE QueryChange for Rya Instance: " + ryaInstance +
+                                ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
+                                "StreamsQuery representing the created query. The query will not be processed.");
+                    }
+                    break;
             }
         }
+    }
+
+    /**
+     * Processes a work queue of {@link QueryEvent}s.
+     * <p/>
+     * Each type of event maps the to corresponding method on {@link QueryExecutor} that is called into.
+     */
+    @DefaultAnnotation(NonNull.class)
+    static class QueryEventWorker implements Runnable {
+
+        private final BlockingQueue<QueryEvent> workQueue;
+        private final QueryExecutor queryExecutor;
+        private final long pollingValue;
+        private final TimeUnit pollingUnits;
+        private final AtomicBoolean shutdownSignal;
+
+        /**
+         * Constructs an instance of {@link QueryEventWorker}.
+         *
+         * @param workQueue - A queue of {@link QueryEvent}s that will be worked by this object. (not null)
+         * @param queryExecutor - Responsible for executing the {@link StreamsQuery}s. (not null)
+         * @param pollingValue - How long to wait when polling for new work.
+         * @param pollingUnits - The units for the {@code pollingValue}. (not null)
+         * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
+         *   may exit the {@link #run()} method. (not null)
+         */
+        public QueryEventWorker(
+                final BlockingQueue<QueryEvent> workQueue,
+                final QueryExecutor queryExecutor,
+                final long pollingValue,
+                final TimeUnit pollingUnits,
+                final AtomicBoolean shutdownSignal) {
+            this.workQueue = requireNonNull(workQueue);
+            this.queryExecutor = requireNonNull(queryExecutor);
+            this.pollingValue = pollingValue;
+            this.pollingUnits = requireNonNull(pollingUnits);
+            this.shutdownSignal = requireNonNull(shutdownSignal);
+        }
 
         @Override
-        public void notifyDelete(final String ryaInstanceName) {
-            lock.lock();
-            try {
-                LOG.info("Notified of deleting QueryChangeLog, stopping all queries belonging to the change log for "
-                        + ryaInstanceName + ".");
-                queryExecutor.stopAll(ryaInstanceName);
-            } catch (final QueryExecutorException e) {
-                LOG.error("Failed to stop all queries belonging to: " + ryaInstanceName, e);
-            } finally {
-                lock.unlock();
+        public void run() {
+            log.info("QueryEventWorker starting.");
+
+            // Run until the shutdown signal is set.
+            while(!shutdownSignal.get()) {
+                // Pull a unit of work from the queue.
+                try {
+                    log.debug("Polling the work queue for a new QueryEvent.");
+                    final QueryEvent event = workQueue.poll(pollingValue, pollingUnits);
+                    if(event == null) {
+                        // Poll again if nothing was found.
+                        continue;
+                    }
+
+                    log.info("QueryEventWorker handling:\n" + event);
+
+                    // Ensure the state within the executor matches the query event's state.
+                    switch(event.getType()) {
+                        case EXECUTING:
+                            try {
+                                queryExecutor.startQuery(event.getRyaInstance(), event.getStreamsQuery().get());
+                            } catch (final IllegalStateException | QueryExecutorException e) {
+                                log.error("Could not start a query represented by the following work: " + event, e);
+                            }
+                            break;
+
+                        case STOPPED:
+                            try {
+                                queryExecutor.stopQuery(event.getQueryId().get());
+                            } catch (final IllegalStateException | QueryExecutorException e) {
+                                log.error("Could not stop a query represented by the following work: " + event, e);
+                            }
+                            break;
+
+                        case STOP_ALL:
+                            try {
+                                queryExecutor.stopAll(event.getRyaInstance());
+                            } catch (final IllegalStateException | QueryExecutorException e) {
+                                log.error("Could not stop all queries represented by the following work: " + event, e);
+                            }
+                        break;
+                    }
+                } catch (final InterruptedException e) {
+                    log.debug("QueryEventWorker interrupted. Probably shutting down.");
+                }
             }
+            log.info("QueryEventWorker shut down.");
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
index 515d699..04a0382 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
@@ -33,6 +33,7 @@ import org.apache.commons.daemon.DaemonContext;
 import org.apache.commons.daemon.DaemonInitException;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
 import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
 import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
 import org.apache.rya.streams.querymanager.xml.Kafka;
@@ -91,7 +92,7 @@ public class QueryManagerDaemon implements Daemon {
 
         // Unmarshall the configuration file into an object.
         final QueryManagerConfig config;
-        try(InputStream stream = Files.newInputStream(configFile)) {
+        try(final InputStream stream = Files.newInputStream(configFile)) {
             config = QueryManagerConfigUnmarshaller.unmarshall(stream);
         } catch(final JAXBException | SAXException e) {
             throw new DaemonInitException("Unable to marshall the configuration XML file: " + configFile, e);
@@ -110,11 +111,12 @@ public class QueryManagerDaemon implements Daemon {
         final QueryChangeLogSource source = new KafkaQueryChangeLogSource(kafka.getHostname(), kafka.getPort(), scheduler);
 
         // Initialize a QueryExecutor.
+        final String zookeeperServers = config.getQueryExecutor().getLocalKafkaStreams().getZookeepers();
         final KafkaStreamsFactory streamsFactory = new SingleThreadKafkaStreamsFactory(kafka.getHostname() + ":" + kafka.getPort());
-        final QueryExecutor queryExecutor = new LocalQueryExecutor(streamsFactory);
+        final QueryExecutor queryExecutor = new LocalQueryExecutor(new CreateKafkaTopic(zookeeperServers), streamsFactory);
 
         // Initialize the QueryManager using the configured resources.
-        manager = new QueryManager(queryExecutor, source, scheduler);
+        manager = new QueryManager(queryExecutor, source, period, units);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
index 32305f5..e746baf 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
@@ -33,11 +33,12 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
 import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractScheduledService;
@@ -53,6 +54,8 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 @DefaultAnnotation(NonNull.class)
 public class KafkaQueryChangeLogSource extends AbstractScheduledService implements QueryChangeLogSource {
 
+    private static final Logger log = LoggerFactory.getLogger(KafkaQueryChangeLogSource.class);
+
     /**
      * Ensures thread safe interactions with this object.
      */
@@ -74,10 +77,10 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen
     private final Set<SourceListener> listeners = new HashSet<>();
 
     /**
-     * Maps Rya instance name to a Query Change Log for that instance. This map is used to keep
-     * track of how the change logs change over time within the Kafka Server.
+     * Maps Rya instance names to the Query Change Log topic name in Kafka. This map is used to
+     * keep track of how the change logs change over time within the Kafka Server.
      */
-    private final HashMap<String, QueryChangeLog> knownChangeLogs = new HashMap<>();
+    private final HashMap<String, String> knownChangeLogs = new HashMap<>();
 
     /**
      * A consumer that is used to poll the Kafka Server for topics.
@@ -101,6 +104,8 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen
 
     @Override
     protected void startUp() throws Exception {
+        log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " starting up...");
+
         // Setup the consumer that is used to list topics for the source.
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
@@ -108,17 +113,23 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         listTopicsConsumer = new KafkaConsumer<>(consumerProperties);
+
+        log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " started.");
     }
 
     @Override
     protected void shutDown() throws Exception {
-        // Shut down the consumer that's used to list topics.
-        listTopicsConsumer.close();
+        log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " shutting down...");
 
-        // Shut down all of the change logs that were created within this class.
-        for(final QueryChangeLog changeLog : knownChangeLogs.values()) {
-            changeLog.close();
+        lock.lock();
+        try {
+            // Shut down the consumer that's used to list topics.
+            listTopicsConsumer.close();
+        } finally {
+            lock.unlock();
         }
+
+        log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " shut down.");
     }
 
     @Override
@@ -130,8 +141,10 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen
             listeners.add(listener);
 
             // Notify it with everything that already exists.
-            for(final Entry<String, QueryChangeLog> entry : knownChangeLogs.entrySet()) {
-                listener.notifyCreate(entry.getKey(), entry.getValue());
+            for(final Entry<String, String> entry : knownChangeLogs.entrySet()) {
+                final String changeLogTopic = entry.getValue();
+                final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
+                listener.notifyCreate(entry.getKey(), changeLog);
             }
         } finally {
             lock.unlock();
@@ -174,26 +187,23 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen
             // Handle the deletes.
             for(final String deletedRyaInstance : deletedRyaInstances) {
                 // Remove the change log from the set of known logs.
-                final QueryChangeLog removed = knownChangeLogs.remove(deletedRyaInstance);
+                knownChangeLogs.remove(deletedRyaInstance);
 
-                // Notify the listeners of the update.
+                // Notify the listeners of the update so that they may close the previously provided change log.
                 for(final SourceListener listener : listeners) {
                     listener.notifyDelete(deletedRyaInstance);
                 }
-
-                // Ensure the change log is closed.
-                removed.close();
             }
 
             // Handle the adds.
             for(final String createdRyaInstance : createdRyaInstances) {
                 // Create and store the ChangeLog.
                 final String changeLogTopic = KafkaTopics.queryChangeLogTopic(createdRyaInstance);
-                final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
-                knownChangeLogs.put(createdRyaInstance, changeLog);
+                knownChangeLogs.put(createdRyaInstance, changeLogTopic);
 
                 // Notify the listeners of the update.
                 for(final SourceListener listener : listeners) {
+                    final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
                     listener.notifyCreate(createdRyaInstance, changeLog);
                 }
             }