You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/18 21:00:28 UTC

incubator-rya git commit: RYA-442 Implementing the Start and Stop Query interactors. Closes #265.

Repository: incubator-rya
Updated Branches:
  refs/heads/master 6ec8cd2aa -> 3d4a5d0e6


RYA-442 Implementing the Start and Stop Query interactors. Closes #265.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3d4a5d0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3d4a5d0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3d4a5d0e

Branch: refs/heads/master
Commit: 3d4a5d0e6e5766c2285fc5e359fb0d645818a8e9
Parents: 6ec8cd2
Author: kchilton2 <ke...@gmail.com>
Authored: Fri Jan 12 14:48:55 2018 -0500
Committer: caleb <ca...@parsons.com>
Committed: Thu Jan 18 14:28:24 2018 -0500

----------------------------------------------------------------------
 .../rya/streams/api/entity/StreamsQuery.java    |  19 ++-
 .../rya/streams/api/interactor/AddQuery.java    |   4 +-
 .../interactor/defaults/DefaultAddQuery.java    |   4 +-
 .../interactor/defaults/DefaultStartQuery.java  |  54 +++++++
 .../interactor/defaults/DefaultStopQuery.java   |  54 +++++++
 .../api/queries/InMemoryQueryRepository.java    | 119 ++++++++++-----
 .../rya/streams/api/queries/QueryChange.java    |  46 +++++-
 .../streams/api/queries/QueryRepository.java    |  15 +-
 .../defaults/DefaultAddQueryTest.java           |   6 +-
 .../queries/InMemoryQueryRepositoryTest.java    |  39 +++--
 .../streams/client/command/AddQueryCommand.java |   6 +-
 .../client/command/AddQueryCommandIT.java       |   6 +-
 .../client/command/DeleteQueryCommandIT.java    | 146 +++++++++----------
 .../client/command/ListQueryCommandIT.java      |  12 +-
 .../client/command/RunQueryCommandIT.java       |   2 +-
 .../kafka/queries/KafkaQueryChangeLog.java      |  25 ++--
 .../kafka/interactor/KafkaRunQueryIT.java       |   2 +-
 .../kafka/queries/KafkaQueryChangeLogIT.java    |  38 ++++-
 18 files changed, 424 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
index 8239025..bd750a6 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -34,16 +34,19 @@ public class StreamsQuery {
 
     private final UUID queryId;
     private final String sparql;
+    private final boolean isActive;
 
     /**
      * Constructs an instance of {@link StreamsQuery}.
      *
      * @param queryId - Uniquely identifies the query within Rya Streams. (not null)
      * @param sparql - The SPARQL query that defines how statements will be processed. (not null)
+     * @param isActive - {@code true} if Rya Streams should process this query; otherwise {@code false}.
      */
-    public StreamsQuery(final UUID queryId, final String sparql) {
+    public StreamsQuery(final UUID queryId, final String sparql, final boolean isActive) {
         this.queryId = requireNonNull(queryId);
         this.sparql = requireNonNull(sparql);
+        this.isActive = isActive;
     }
 
     /**
@@ -60,9 +63,16 @@ public class StreamsQuery {
         return sparql;
     }
 
+    /**
+     * @return {@code true} if Rya Streams should process this query; otherwise {@code false}.
+     */
+    public boolean isActive() {
+        return isActive;
+    }
+
     @Override
     public int hashCode() {
-        return Objects.hash(queryId, sparql);
+        return Objects.hash(queryId, sparql, isActive);
     }
 
     @Override
@@ -70,7 +80,8 @@ public class StreamsQuery {
         if(o instanceof StreamsQuery) {
             final StreamsQuery other = (StreamsQuery) o;
             return Objects.equals(queryId, other.queryId) &&
-                    Objects.equals(sparql, other.sparql);
+                    Objects.equals(sparql, other.sparql) &&
+                    isActive == other.isActive;
         }
         return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
index 8915d98..9889fd0 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
@@ -34,8 +34,10 @@ public interface AddQuery {
      * Adds a query to the Rya Streams system.
      *
      * @param query - The SPARQL query that will be added. (not null)
+     * @param isActive - {@code true} if the query needs to be maintained by
+     *   Rya Streams; otherwise {@code false}.
      * @return The {@link StreamsQuery} used by Rya Streams for this query.
      * @throws RyaStreamsException The query could not be added to Rya Streams.
      */
-    public StreamsQuery addQuery(final String query) throws RyaStreamsException;
+    public StreamsQuery addQuery(final String query, boolean isActive) throws RyaStreamsException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
index f94835c..edd90fd 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
@@ -49,7 +49,7 @@ public class DefaultAddQuery implements AddQuery {
     }
 
     @Override
-    public StreamsQuery addQuery(final String query) throws RyaStreamsException {
+    public StreamsQuery addQuery(final String query, final boolean isActive) throws RyaStreamsException {
         requireNonNull(query);
 
         // Make sure the SPARQL is valid.
@@ -60,6 +60,6 @@ public class DefaultAddQuery implements AddQuery {
         }
 
         // If it is, then store it in the repository.
-        return repository.add(query);
+        return repository.add(query, isActive);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java
new file mode 100644
index 0000000..3ee693e
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.UUID;
+
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.StartQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Start a query that is managed by Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public class DefaultStartQuery implements StartQuery {
+
+    private final QueryRepository repository;
+
+    /**
+     * Constructs an instance of {@link DefaultStartQuery}.
+     *
+     * @param repository - The {@link QueryRepository} that will be interacted with. (not null)
+     */
+    public DefaultStartQuery(final QueryRepository repository) {
+        this.repository = requireNonNull(repository);
+    }
+
+    @Override
+    public void start(final UUID queryId) throws RyaStreamsException {
+        requireNonNull(queryId);
+        repository.updateIsActive(queryId, true);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java
new file mode 100644
index 0000000..382b0f1
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.UUID;
+
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.StopQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Stop a query that is managed by Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public class DefaultStopQuery implements StopQuery {
+
+    private final QueryRepository repository;
+
+    /**
+     * Constructs an instance of {@link DefaultStopQuery}.
+     *
+     * @param repository - The {@link QueryRepository} that will be interacted with. (not null)
+     */
+    public DefaultStopQuery(final QueryRepository repository) {
+        this.repository = requireNonNull(repository);
+    }
+
+    @Override
+    public void stop(final UUID queryId) throws RyaStreamsException {
+        requireNonNull(queryId);
+        repository.updateIsActive(queryId, false);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index 80678de..f4b7b25 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -33,16 +33,15 @@ import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 import info.aduna.iteration.CloseableIteration;
 
 /**
- * An in memory implementation of {@link QueryRepository}. It is lazily initialized the first time one of its
- * functions is invoked.
+ * An in memory implementation of {@link QueryRepository}. It is lazily
+ * initialized the first time one of its functions is invoked and it updates
+ * its view of the {@link QueryChangeLog} any time a method is invoked that
+ * requires the latest view of the queries.
  * </p>
  * Thread safe.
  */
@@ -51,39 +50,47 @@ public class InMemoryQueryRepository implements QueryRepository {
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class);
 
     private final ReentrantLock lock = new ReentrantLock();
-    private final Supplier<Map<UUID, StreamsQuery>> queriesCache;
 
+    /**
+     * The change log that is the ground truth for describing what the queries look like.
+     */
     private final QueryChangeLog changeLog;
 
     /**
+     * Represents the position within the {@link QueryChangeLog} that {@code queriesCache} represents.
+     */
+    private Optional<Long> cachePosition = Optional.empty();
+
+    /**
+     * The most recently cached view of the queries within this repository.
+     */
+    private final Map<UUID, StreamsQuery> queriesCache = new HashMap<>();
+
+    /**
      * Constructs an instance of {@link InMemoryQueryRepository}.
      *
      * @param changeLog - The change log that this repository will maintain and be based on. (not null)
      */
     public InMemoryQueryRepository(final QueryChangeLog changeLog) {
         this.changeLog = requireNonNull(changeLog);
-
-        // Lazily initialize the queries cache the first time you try to use it.
-        queriesCache = Suppliers.memoize(() -> initializeCache(changeLog));
     }
 
     @Override
-    public StreamsQuery add(final String query) throws QueryRepositoryException {
+    public StreamsQuery add(final String query, final boolean isActive) throws QueryRepositoryException {
         requireNonNull(query);
 
         lock.lock();
         try {
             // First record the change to the log.
             final UUID queryId = UUID.randomUUID();
-            final QueryChange change = QueryChange.create(queryId, query);
+            final QueryChange change = QueryChange.create(queryId, query, isActive);
             changeLog.write(change);
 
-            // Then update the view of the change log within the repository.
-            final StreamsQuery streamsQuery = new StreamsQuery(queryId, query);
-            queriesCache.get().put(queryId, streamsQuery);
+            // Update the cache to represent what is currently in the log.
+            updateCache();
 
-            // Return the SreamsQuery that represents the just added query.
-            return streamsQuery;
+            // Return the StreamsQuery that represents the just added query.
+            return queriesCache.get(queryId);
 
         } catch (final QueryChangeLogException e) {
             throw new QueryRepositoryException("Could not create a Rya Streams query for the SPARQL string: " + query, e);
@@ -98,7 +105,35 @@ public class InMemoryQueryRepository implements QueryRepository {
 
         lock.lock();
         try {
-            return Optional.ofNullable( queriesCache.get().get(queryId) );
+            // Update the cache to represent what is currently in the log.
+            updateCache();
+
+            return Optional.ofNullable( queriesCache.get(queryId) );
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void updateIsActive(final UUID queryId, final boolean isActive) throws QueryRepositoryException {
+        requireNonNull(queryId);
+
+        lock.lock();
+        try {
+            // Update the cache to represent what is currently in the log.
+            updateCache();
+
+            // Ensure the query is in the log.
+            if(!queriesCache.containsKey(queryId)) {
+                throw new QueryRepositoryException("No query exists for ID " + queryId + ".");
+            }
+
+            // First record the change to the log.
+            final QueryChange change = QueryChange.update(queryId, isActive);
+            changeLog.write(change);
+
+        } catch (final QueryChangeLogException e) {
+            throw new QueryRepositoryException("Could not update the Rya Streams query for with ID: " + queryId, e);
         } finally {
             lock.unlock();
         }
@@ -114,9 +149,6 @@ public class InMemoryQueryRepository implements QueryRepository {
             final QueryChange change = QueryChange.delete(queryId);
             changeLog.write(change);
 
-            // Then update the view of the change log within the repository.
-            queriesCache.get().remove(queryId);
-
         } catch (final QueryChangeLogException e) {
             throw new QueryRepositoryException("Could not delete a Rya Streams query for the Query ID: " + queryId, e);
         } finally {
@@ -128,8 +160,11 @@ public class InMemoryQueryRepository implements QueryRepository {
     public Set<StreamsQuery> list() throws QueryRepositoryException {
         lock.lock();
         try {
-            // Our internal cache is already up to date, so just return it's values.
-            return queriesCache.get().values()
+            // Update the cache to represent what is currently in the log.
+            updateCache();
+
+            // Our internal cache is already up to date, so just return its values.
+            return queriesCache.values()
                         .stream()
                         .collect(Collectors.toSet());
 
@@ -149,22 +184,19 @@ public class InMemoryQueryRepository implements QueryRepository {
     }
 
     /**
-     * A {@link Map} from query id to the {@link StreamsQuery} that is represented by that id based on what
-     * is already in a {@link QueryChangeLog}.
-     *
-     * @param changeLog - The change log the cache will represent. (not null)
-     * @return The most recent view of the change log.
+     * Updates the {@link #queriesCache} to reflect the latest position within the {@link #changeLog}.
      */
-    private static Map<UUID, StreamsQuery> initializeCache(final QueryChangeLog changeLog) {
+    private void updateCache() {
         requireNonNull(changeLog);
 
-        // The Map that will be initialized and then returned by this supplier.
-        final Map<UUID, StreamsQuery> queriesCache = new HashMap<>();
-
         CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> it = null;
         try {
-            // Iterate over everything that is already in the change log.
-            it = changeLog.readFromStart();
+            // Iterate over everything since the last position that was handled within the change log.
+            if(cachePosition.isPresent()) {
+                it = changeLog.readFromPosition(cachePosition.get() + 1);
+            } else {
+                it = changeLog.readFromStart();
+            }
 
             // Apply each change to the cache.
             while(it.hasNext()) {
@@ -174,18 +206,31 @@ public class InMemoryQueryRepository implements QueryRepository {
 
                 switch(change.getChangeType()) {
                     case CREATE:
-                        final StreamsQuery query = new StreamsQuery(queryId, change.getSparql().get());
+                        final StreamsQuery query = new StreamsQuery(
+                                queryId,
+                                change.getSparql().get(),
+                                change.getIsActive().get());
                         queriesCache.put(queryId, query);
                         break;
 
+                    case UPDATE:
+                        if(queriesCache.containsKey(queryId)) {
+                            final StreamsQuery old = queriesCache.get(queryId);
+                            final StreamsQuery updated = new StreamsQuery(
+                                    old.getQueryId(),
+                                    old.getSparql(),
+                                    change.getIsActive().get());
+                            queriesCache.put(queryId, updated);
+                        }
+                        break;
+
                     case DELETE:
                         queriesCache.remove(queryId);
                         break;
                 }
-            }
 
-            // Return the initialized cache.
-            return queriesCache;
+                cachePosition = Optional.of( entry.getPosition() );
+            }
 
         } catch (final QueryChangeLogException e) {
             // Rethrow the exception because the object the supplier tried to create could not be created.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
index 90af79c..d283957 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -37,9 +37,11 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 @DefaultAnnotation(NonNull.class)
 public final class QueryChange implements Serializable {
     private static final long serialVersionUID = 1L;
+
     private final UUID queryId;
     private final ChangeType changeType;
     private final Optional<String> sparql;
+    private final Optional<Boolean> isActive;
 
     /**
      * Constructs an instance of {@link QueryChange}. Use the {@link #create(UUID, String)} or {@link #delete(UUID)}
@@ -48,14 +50,18 @@ public final class QueryChange implements Serializable {
      * @param queryId - Uniquely identifies the query within Rya Streams. (not null)
      * @param changeType - Indicates the type of change this object represents. (not null)
      * @param sparql - If this is a create change, then the SPARQL query that will be evaluated within Rya Streams. (not null)
+     * @param isActive - If this is a create or update change, then the active state that defines if the
+     *   query will be evaluated by RyaStreams. (not null)
      */
     private QueryChange(
             final UUID queryId,
             final ChangeType changeType,
-            final Optional<String> sparql) {
+            final Optional<String> sparql,
+            final Optional<Boolean> isActive) {
         this.queryId = requireNonNull(queryId);
         this.changeType = requireNonNull(changeType);
         this.sparql = requireNonNull(sparql);
+        this.isActive = requireNonNull(isActive);
     }
 
     /**
@@ -79,9 +85,17 @@ public final class QueryChange implements Serializable {
         return sparql;
     }
 
+    /**
+     * @return If this is a create or update change, then the active state that defines if the
+     *   query will be evaluated by RyaStreams. (not null)
+     */
+    public Optional<Boolean> getIsActive() {
+        return isActive;
+    }
+
     @Override
     public int hashCode() {
-        return Objects.hash(queryId, changeType, sparql);
+        return Objects.hash(queryId, changeType, sparql, isActive);
     }
 
     @Override
@@ -90,7 +104,8 @@ public final class QueryChange implements Serializable {
             final QueryChange change = (QueryChange) o;
             return Objects.equals(queryId, change.queryId) &&
                     Objects.equals(changeType, change.changeType) &&
-                    Objects.equals(sparql, change.sparql);
+                    Objects.equals(sparql, change.sparql) &&
+                    Objects.equals(isActive, change.isActive);
         }
         return false;
     }
@@ -100,10 +115,22 @@ public final class QueryChange implements Serializable {
      *
      * @param queryId - Uniquely identifies the query within the streaming system. (not null)
      * @param sparql - The query that will be evaluated. (not null)
+     * @param isActive - The active state that defines if the query will be evaluated by RyaStreams. (not null)
+     * @return A {@link QueryChange} built using the provided values.
+     */
+    public static QueryChange create(final UUID queryId, final String sparql, final boolean isActive) {
+        return new QueryChange(queryId, ChangeType.CREATE, Optional.of(sparql), Optional.of(isActive));
+    }
+
+    /**
+     * Create a {@link QueryChange} that represents a query in Rya Streams whose active state has changed.
+     *
+     * @param queryId - Uniquely identifies the query within the streaming system. (not null)
+     * @param isActive - The active state that defines if the query will be evaluated by RyaStreams. (not null)
      * @return A {@link QueryChange} built using the provided values.
      */
-    public static QueryChange create(final UUID queryId, final String sparql) {
-        return new QueryChange(queryId, ChangeType.CREATE, Optional.of(sparql));
+    public static QueryChange update(final UUID queryId, final boolean isActive) {
+        return new QueryChange(queryId, ChangeType.UPDATE, Optional.absent(), Optional.of(isActive));
     }
 
     /**
@@ -113,7 +140,7 @@ public final class QueryChange implements Serializable {
      * @return A {@link QueryChange} built using the provided values.
      */
     public static QueryChange delete(final UUID queryId) {
-        return new QueryChange(queryId, ChangeType.DELETE, Optional.absent());
+        return new QueryChange(queryId, ChangeType.DELETE, Optional.absent(), Optional.absent());
     }
 
     /**
@@ -126,6 +153,11 @@ public final class QueryChange implements Serializable {
         CREATE,
 
         /**
+         * The {@link QueryChange} indicates something about a registered query changed.
+         */
+        UPDATE,
+
+        /**
          * The {@link QueryChange} indicates a SPARQL query no longer needs to be processed by Rya Streams.
          */
         DELETE;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
index 7269588..fd51b2f 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
@@ -38,10 +38,23 @@ public interface QueryRepository extends AutoCloseable {
      * Adds a new query to Rya Streams.
      *
      * @param query - The SPARQL query to add. (not null)
+     * @param isActive - {@code true} if the query should be processed after it is added
+     *   otherwise {@code false}.
      * @return The {@link StreamsQuery} used in Rya Streams.
      * @throws QueryRepositoryException Could not add the query.
      */
-    public StreamsQuery add(final String query) throws QueryRepositoryException;
+    public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException;
+
+    /**
+     * Updates the isActive state of a {@link StreamsQuery}. Setting this value to {@code true}
+     * means Rya Streams will start processing the query. Setting it to {@code false} will stop
+     * the processing.
+     *
+     * @param queryId - Identifies which query will be updated. (not null)
+     * @param isActive - The new isActive state for the query.
+     * @throws QueryRepositoryException If the query does not exist or something else caused the change to fail.
+     */
+    public void updateIsActive(UUID queryId, boolean isActive) throws QueryRepositoryException;
 
     /**
      * Get an existing query from Rya Streams.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
index 88be6e7..77a0a15 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
@@ -43,10 +43,10 @@ public class DefaultAddQueryTest {
         final AddQuery addQuery = new DefaultAddQuery(repo);
 
         // Add the query.
-        addQuery.addQuery(sparql);
+        addQuery.addQuery(sparql, true);
 
         // Verify the call was forwarded to the repository.
-        verify(repo, times(1)).add(eq(sparql));
+        verify(repo, times(1)).add(eq(sparql), eq(true));
     }
 
     @Test(expected = RyaStreamsException.class)
@@ -59,6 +59,6 @@ public class DefaultAddQueryTest {
         final AddQuery addQuery = new DefaultAddQuery(repo);
 
         // Add the query.
-        addQuery.addQuery(sparql);
+        addQuery.addQuery(sparql, true);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 92193ca..22e616d 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -43,9 +43,9 @@ public class InMemoryQueryRepositoryTest {
         try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
             // Add some queries to it.
             final Set<StreamsQuery> expected = new HashSet<>();
-            expected.add( queries.add("query 1") );
-            expected.add( queries.add("query 2") );
-            expected.add( queries.add("query 3") );
+            expected.add( queries.add("query 1", true) );
+            expected.add( queries.add("query 2", false) );
+            expected.add( queries.add("query 3", true) );
 
             // Show they are in the list of all queries.
             final Set<StreamsQuery> stored = queries.list();
@@ -59,9 +59,9 @@ public class InMemoryQueryRepositoryTest {
         try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
             // Add some queries to it. The second one we will delete.
             final Set<StreamsQuery> expected = new HashSet<>();
-            expected.add( queries.add("query 1") );
-            final UUID deletedMeId = queries.add("query 2").getQueryId();
-            expected.add( queries.add("query 3") );
+            expected.add( queries.add("query 1", true) );
+            final UUID deletedMeId = queries.add("query 2", false).getQueryId();
+            expected.add( queries.add("query 3", true) );
 
             // Delete the second query.
             queries.delete( deletedMeId );
@@ -73,15 +73,15 @@ public class InMemoryQueryRepositoryTest {
     }
 
     @Test
-    public void initializedWithPopulatedChnageLog() throws Exception {
+    public void initializedWithPopulatedChangeLog() throws Exception {
         // Setup a totally in memory QueryRepository. Hold onto the change log so that we can use it again later.
         final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
         try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) {
             // Add some queries and deletes to it.
             final Set<StreamsQuery> expected = new HashSet<>();
-            expected.add( queries.add("query 1") );
-            final UUID deletedMeId = queries.add("query 2").getQueryId();
-            expected.add( queries.add("query 3") );
+            expected.add( queries.add("query 1", true) );
+            final UUID deletedMeId = queries.add("query 2", false).getQueryId();
+            expected.add( queries.add("query 3", true) );
             queries.delete( deletedMeId );
 
             // Create a new totally in memory QueryRepository.
@@ -110,7 +110,7 @@ public class InMemoryQueryRepositoryTest {
         // Setup a totally in memory QueryRepository.
         try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
             // Add a query to it.
-            final StreamsQuery query = queries.add("query 1");
+            final StreamsQuery query = queries.add("query 1", true);
 
             // Show the fetched query matches the expected ones.
             final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
@@ -129,4 +129,21 @@ public class InMemoryQueryRepositoryTest {
             assertFalse(query.isPresent());
         }
     }
+
+    @Test
+    public void update() throws Exception {
+        // Setup a totally in memory QueryRepository.
+        try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+            // Add a query to it.
+            final StreamsQuery query = queries.add("query 1", true);
+
+            // Change the isActive state of that query.
+            queries.updateIsActive(query.getQueryId(), false);
+
+            // Show the fetched query matches the expected one.
+            final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
+            final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false);
+            assertEquals(expected, fetched.get());
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
index c72e6a2..275a975 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
@@ -52,6 +52,9 @@ public class AddQueryCommand implements RyaStreamsCommand {
         @Parameter(names = { "--query", "-q" }, required = true, description = "The SPARQL query to add to Rya Streams.")
         private String query;
 
+        @Parameter(names = {"--isActive", "-a"}, required = false, description = "True if the added query will be started.")
+        private String isActive;
+
         @Override
         public String toString() {
             final StringBuilder parameters = new StringBuilder();
@@ -60,6 +63,7 @@ public class AddQueryCommand implements RyaStreamsCommand {
             if (!Strings.isNullOrEmpty(query)) {
                 parameters.append("\tQuery: " + query + "\n");
             }
+            parameters.append("\tIs Active: " + isActive + "\n");
             return parameters.toString();
         }
     }
@@ -115,7 +119,7 @@ public class AddQueryCommand implements RyaStreamsCommand {
         try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
             final AddQuery addQuery = new DefaultAddQuery(queryRepo);
             try {
-                final StreamsQuery query = addQuery.addQuery(params.query);
+                final StreamsQuery query = addQuery.addQuery(params.query, Boolean.parseBoolean(params.isActive));
                 System.out.println("Added query: " + query.getSparql());
             } catch (final RyaStreamsException e) {
                 System.err.println("Unable to parse query: " + params.query);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index 3a412d2..8b4f074 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -80,7 +80,8 @@ public class AddQueryCommandIT {
                 "-r", "" + ryaInstance,
                 "-i", kafka.getKafkaHostname(),
                 "-p", kafka.getKafkaPort(),
-                "-q", query
+                "-q", query,
+                "-a", "true"
         };
 
         // Execute the command.
@@ -101,7 +102,8 @@ public class AddQueryCommandIT {
                 "--ryaInstance", "" + ryaInstance,
                 "--kafkaHostname", kafka.getKafkaHostname(),
                 "--kafkaPort", kafka.getKafkaPort(),
-                "--query", query
+                "--query", query,
+                "--isActive", "true"
         };
 
         // Execute the command.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index 91647f2..6083543 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,7 +18,6 @@
  */
 package org.apache.rya.streams.client.command;
 
-import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
@@ -40,6 +39,8 @@ import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.apache.rya.test.kafka.KafkaTestUtil;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -48,103 +49,90 @@ import org.junit.Test;
  */
 public class DeleteQueryCommandIT {
 
+    private final String ryaInstance = UUID.randomUUID().toString();
+    private QueryRepository queryRepo;
+
     @Rule
     public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
 
-    /**
-     * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need
-     * to re-create the repo outside of the command to ensure it has the most up to date values inside of it.
-     *
-     * @param ryaInstance - The rya instance the repository is connected to. (not null)
-     * @param createTopic - Set this to true if the topic doesn't exist yet.
-     */
-    private QueryRepository makeQueryRepository(final String ryaInstance, final boolean createTopic) {
-        requireNonNull(ryaInstance);
-
+    @Before
+    public void setup() {
         // Make sure the topic that the change log uses exists.
-        final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
-        if(createTopic) {
-            kafka.createTopic(changeLogTopic);
-        }
+        final String changeLogTopic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+        System.out.println("Test Change Log Topic: " + changeLogTopic);
+        kafka.createTopic(changeLogTopic);
 
         // Setup the QueryRepository used by the test.
         final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
         final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
-        return new InMemoryQueryRepository(changeLog);
+        queryRepo = new InMemoryQueryRepository(changeLog);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        queryRepo.close();
     }
 
     @Test
     public void shortParams() throws Exception {
-        final String ryaInstance = UUID.randomUUID().toString();
-
         // Add a few queries to Rya Streams.
-        try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) {
-            repo.add("query1");
-            final UUID query2Id = repo.add("query2").getQueryId();
-            repo.add("query3");
-
-            // Show that all three of the queries were added.
-            Set<StreamsQuery> queries = repo.list();
-            assertEquals(3, queries.size());
-
-            // Delete query 2 using the delete query command.
-            final String[] deleteArgs = new String[] {
-                    "-r", "" + ryaInstance,
-                    "-i", kafka.getKafkaHostname(),
-                    "-p", kafka.getKafkaPort(),
-                    "-q", query2Id.toString()
-            };
-
-            final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
-            deleteCommand.execute(deleteArgs);
-
-            // Show query2 was deleted.
-            try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) {
-                queries = repo2.list();
-                assertEquals(2, queries.size());
-
-                for(final StreamsQuery query : queries) {
-                    assertNotEquals(query2Id, query.getQueryId());
-                }
-            }
+        queryRepo.add("query1", true);
+        final UUID query2Id = queryRepo.add("query2", false).getQueryId();
+        queryRepo.add("query3", true);
+
+        // Show that all three of the queries were added.
+        Set<StreamsQuery> queries = queryRepo.list();
+        assertEquals(3, queries.size());
+
+        // Delete query 2 using the delete query command.
+        final String[] deleteArgs = new String[] {
+                "-r", ryaInstance,
+                "-i", kafka.getKafkaHostname(),
+                "-p", kafka.getKafkaPort(),
+                "-q", query2Id.toString()
+        };
+
+        final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
+        deleteCommand.execute(deleteArgs);
+
+        // Show query2 was deleted.
+        queries = queryRepo.list();
+        assertEquals(2, queries.size());
+
+        for(final StreamsQuery query : queries) {
+            assertNotEquals(query2Id, query.getQueryId());
         }
     }
 
     @Test
     public void longParams() throws Exception {
-        final String ryaInstance = UUID.randomUUID().toString();
-
         // Add a few queries to Rya Streams.
-        try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) {
-            repo.add("query1");
-            final UUID query2Id = repo.add("query2").getQueryId();
-            repo.add("query3");
-
-            // Show that all three of the queries were added.
-            Set<StreamsQuery> queries = repo.list();
-            assertEquals(3, queries.size());
-
-            // Delete query 2 using the delete query command.
-            final String[] deleteArgs = new String[] {
-                    "--ryaInstance", "" + ryaInstance,
-                    "--kafkaHostname", kafka.getKafkaHostname(),
-                    "--kafkaPort", kafka.getKafkaPort(),
-                    "--queryID", query2Id.toString()
-            };
-
-            final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
-            deleteCommand.execute(deleteArgs);
-
-            // Show query2 was deleted.
-            try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) {
-                queries = repo2.list();
-                assertEquals(2, queries.size());
-
-                for(final StreamsQuery query : queries) {
-                    assertNotEquals(query2Id, query.getQueryId());
-                }
-            }
+        queryRepo.add("query1", true);
+        final UUID query2Id = queryRepo.add("query2", false).getQueryId();
+        queryRepo.add("query3", true);
+
+        // Show that all three of the queries were added.
+        Set<StreamsQuery> queries = queryRepo.list();
+        assertEquals(3, queries.size());
+
+        // Delete query 2 using the delete query command.
+        final String[] deleteArgs = new String[] {
+                "--ryaInstance", "" + ryaInstance,
+                "--kafkaHostname", kafka.getKafkaHostname(),
+                "--kafkaPort", kafka.getKafkaPort(),
+                "--queryID", query2Id.toString()
+        };
+
+        final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
+        deleteCommand.execute(deleteArgs);
+
+        // Show query2 was deleted.
+        queries = queryRepo.list();
+        assertEquals(2, queries.size());
+
+        for(final StreamsQuery query : queries) {
+            assertNotEquals(query2Id, query.getQueryId());
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index 00b4ce0..1399142 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -71,9 +71,9 @@ public class ListQueryCommandIT {
     @Test
     public void shortParams() throws Exception {
         // Add a few queries to Rya Streams.
-        queryRepo.add("query1");
-        queryRepo.add("query2");
-        queryRepo.add("query3");
+        queryRepo.add("query1", true);
+        queryRepo.add("query2", false);
+        queryRepo.add("query3", true);
 
         // Execute the List Queries command.
         final String[] args = new String[] {
@@ -89,9 +89,9 @@ public class ListQueryCommandIT {
     @Test
     public void longParams() throws Exception {
         // Add a few queries to Rya Streams.
-        queryRepo.add("query1");
-        queryRepo.add("query2");
-        queryRepo.add("query3");
+        queryRepo.add("query1", true);
+        queryRepo.add("query2", false);
+        queryRepo.add("query3", true);
 
         // Execute the List Queries command.
         final String[] args = new String[] {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index f2100e8..3389d6b 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -114,7 +114,7 @@ public class RunQueryCommandIT {
     @Test
     public void runQuery() throws Exception {
         // Register a query with the Query Repository.
-        final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }");
+        final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
 
         // Arguments that run the query we just registered with Rya Streams.
         final String[] args = new String[] {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
index 9403e4b..2822272 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
@@ -140,18 +140,13 @@ public class KafkaQueryChangeLog implements QueryChangeLog {
 
         @Override
         public boolean hasNext() throws QueryChangeLogException {
-            if (iterCache == null || !iterCache.hasNext()) {
-                populateCache();
-            }
+            maybePopulateCache();
             return iterCache.hasNext();
         }
 
         @Override
         public ChangeLogEntry<QueryChange> next() throws QueryChangeLogException {
-            if (iterCache == null && iterCache.hasNext()) {
-                populateCache();
-            }
-
+            maybePopulateCache();
             if (iterCache.hasNext()) {
                 return iterCache.next();
             }
@@ -167,14 +162,14 @@ public class KafkaQueryChangeLog implements QueryChangeLog {
             consumer.unsubscribe();
         }
 
-        private void populateCache() {
-            final ConsumerRecords<?, QueryChange> records = consumer.poll(3000L);
-            final List<ChangeLogEntry<QueryChange>> changes = new ArrayList<>();
-            records.forEach(
-                    record ->
-                        changes.add(new ChangeLogEntry<>(record.offset(), record.value()))
-                    );
-            iterCache = changes.iterator();
+        private void maybePopulateCache() {
+            // If the cache isn't initialized yet, or it is empty, then check to see if there is more to put into it.
+            if (iterCache == null || !iterCache.hasNext()) {
+                final ConsumerRecords<?, QueryChange> records = consumer.poll(3000L);
+                final List<ChangeLogEntry<QueryChange>> changes = new ArrayList<>();
+                records.forEach(record -> changes.add(new ChangeLogEntry<>(record.offset(), record.value())));
+                iterCache = changes.iterator();
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index 33b3a92..9a773f0 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -86,7 +86,7 @@ public class KafkaRunQueryIT {
         final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() );
 
         // Add the query to the query repository.
-        final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }");
+        final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
         final UUID queryId = sQuery.getQueryId();
         final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
index 04c81ed..c2b821f 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
@@ -19,6 +19,7 @@
 package org.apache.rya.streams.kafka.queries;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -78,7 +79,7 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
     public void testWrite() throws Exception {
         final String sparql = "SOME QUERY HERE";
         final UUID uuid = UUID.randomUUID();
-        final QueryChange newChange = QueryChange.create(uuid, sparql);
+        final QueryChange newChange = QueryChange.create(uuid, sparql, true);
         changeLog.write(newChange);
 
         consumer.subscribe(Lists.newArrayList(topic));
@@ -90,6 +91,17 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
     }
 
     @Test
+    public void readSingleWrite() throws Exception {
+        // Write a single change to the log.
+        final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true);
+        changeLog.write(change);
+
+        // Read that entry from the log.
+        final QueryChange readChange = changeLog.readFromStart().next().getEntry();
+        assertEquals(change, readChange);
+    }
+
+    @Test
     public void readFromBegining() throws Exception {
         final List<QueryChange> expected = write10ChangesToChangeLog();
 
@@ -175,12 +187,34 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
         assertEquals(0, count);
     }
 
+    @Test
+    public void multipleClients() throws Exception {
+        // Create a second KafkaQueryChangeLog objects that connect to the same change log.
+        final Producer<?, QueryChange> producer2 = KafkaTestUtil.makeProducer(rule, StringSerializer.class, QueryChangeSerializer.class);
+        final Consumer<?, QueryChange> consumer2 = KafkaTestUtil.fromStartConsumer(rule, StringDeserializer.class, QueryChangeDeserializer.class);
+        try(final KafkaQueryChangeLog changeLog2 = new KafkaQueryChangeLog(producer2, consumer2, topic)) {
+            // Show both of them report empty.
+            assertFalse( changeLog.readFromStart().hasNext() );
+            assertFalse( changeLog2.readFromStart().hasNext() );
+
+            // Write a change to the first log.
+            final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true);
+            changeLog.write(change);
+
+            // Show it's in the first log.
+            assertEquals(change, changeLog.readFromStart().next().getEntry());
+
+            // Show it is also seen in the second log.
+            assertEquals(change, changeLog2.readFromStart().next().getEntry());
+        }
+    }
+
     private List<QueryChange> write10ChangesToChangeLog() throws Exception {
         final List<QueryChange> changes = new ArrayList<>();
         for (int ii = 0; ii < 10; ii++) {
             final String sparql = "SOME QUERY HERE_" + ii;
             final UUID uuid = UUID.randomUUID();
-            final QueryChange newChange = QueryChange.create(uuid, sparql);
+            final QueryChange newChange = QueryChange.create(uuid, sparql, true);
             changeLog.write(newChange);
             changes.add(newChange);
         }