You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:20:56 UTC

[10/22] incubator-rya git commit: Rya 451 Query manager

Rya 451 Query manager

QueryManager with tests
updated InMemoryQueryRepository and its tests

Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e355f73a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e355f73a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e355f73a

Branch: refs/heads/master
Commit: e355f73ae6c56eb439e1d56722cd20c5287cbe04
Parents: a11ca4a
Author: Andrew Smith <sm...@gmail.com>
Authored: Tue Jan 30 14:01:54 2018 -0500
Committer: Valiyil <Pu...@parsons.com>
Committed: Fri Mar 9 12:59:47 2018 -0500

----------------------------------------------------------------------
 .../rya/streams/api/entity/StreamsQuery.java    |  15 ++
 .../api/queries/InMemoryQueryRepository.java    |   7 +-
 .../api/queries/QueryChangeLogListener.java     |   8 +-
 .../queries/InMemoryQueryRepositoryTest.java    |  55 ++--
 extras/rya.streams/query-manager/pom.xml        |   2 +-
 .../rya/streams/querymanager/QueryManager.java  | 255 +++++++++++++++++++
 .../streams/querymanager/QueryManagerTest.java  | 195 ++++++++++++++
 7 files changed, 504 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
index bd750a6..7194834 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
@@ -85,4 +85,19 @@ public class StreamsQuery {
         }
         return false;
     }
+    
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("ID: ");
+        sb.append(getQueryId().toString() + "\n");
+        sb.append("Query: ");
+        sb.append(getSparql() + "\n");
+        sb.append("Is ");
+        if (!isActive) {
+            sb.append(" Not ");
+        }
+        sb.append("Running.\n");
+        return sb.toString();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index dca040f..5fb0297 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -188,8 +188,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
 
             // Our internal cache is already up to date, so just return its values.
             return queriesCache.values()
-                        .stream()
-                        .collect(Collectors.toSet());
+                    .stream()
+                    .collect(Collectors.toSet());
 
         } finally {
             lock.unlock();
@@ -253,7 +253,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
                         break;
                 }
 
-                listeners.forEach(listener -> listener.notify(entry));
+                final Optional<StreamsQuery> newQueryState = Optional.ofNullable(queriesCache.get(queryId));
+                listeners.forEach(listener -> listener.notify(entry, newQueryState));
 
                 cachePosition = Optional.of( entry.getPosition() );
             }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
index 2b61227..2f0bcc3 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
@@ -18,6 +18,10 @@
  */
 package org.apache.rya.streams.api.queries;
 
+import java.util.Optional;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
@@ -36,6 +40,8 @@ public interface QueryChangeLogListener {
      * on the repository.
      *
      * @param queryChangeEvent - The event that occurred. (not null)
+     * @param newQueryState - The new state of the query after the query change event, this will be
+     *     absent if the change type is DELETE. (not null)
      */
-    public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent);
+    public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 76c3216..3b3d48a 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -165,13 +165,14 @@ public class InMemoryQueryRepositoryTest {
             // Add a query to it.
             final StreamsQuery query = queries.add("query 1", true);
 
-            final Set<StreamsQuery> existing = queries.subscribe(new QueryChangeLogListener() {
-                @Override
-                public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
-                    final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L,
-                            QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
-                    assertEquals(expected, queryChangeEvent);
-                }
+            final Set<StreamsQuery> existing = queries.subscribe((queryChangeEvent, newQueryState) -> {
+                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L,
+                        QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                final Optional<StreamsQuery> expectedQueryState = Optional.of(
+                        new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+
+                assertEquals(expected, queryChangeEvent);
+                assertEquals(expectedQueryState, newQueryState);
             });
 
             assertEquals(Sets.newHashSet(query), existing);
@@ -195,26 +196,28 @@ public class InMemoryQueryRepositoryTest {
 
             //show listener on repo that query was added to is being notified of the new query.
             final CountDownLatch repo1Latch = new CountDownLatch(1);
-            queries.subscribe(new QueryChangeLogListener() {
-                @Override
-                public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
-                    final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
-                            QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
-                    assertEquals(expected, queryChangeEvent);
-                    repo1Latch.countDown();
-                }
+            queries.subscribe((queryChangeEvent, newQueryState) -> {
+                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+                        QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                final Optional<StreamsQuery> expectedQueryState = Optional.of(
+                        new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+
+                assertEquals(expected, queryChangeEvent);
+                assertEquals(expectedQueryState, newQueryState);
+                repo1Latch.countDown();
             });
 
             //show listener not on the repo that query was added to is being notified as well.
             final CountDownLatch repo2Latch = new CountDownLatch(1);
-            queries2.subscribe(new QueryChangeLogListener() {
-                @Override
-                public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
-                    final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
-                            QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
-                    assertEquals(expected, queryChangeEvent);
-                    repo2Latch.countDown();
-                }
+            queries2.subscribe((queryChangeEvent, newQueryState) -> {
+                final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+                        QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                final Optional<StreamsQuery> expectedQueryState = Optional.of(
+                        new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+
+                assertEquals(expected, queryChangeEvent);
+                assertEquals(expectedQueryState, newQueryState);
+                repo2Latch.countDown();
             });
 
             queries.add("query 2", true);
@@ -222,7 +225,6 @@ public class InMemoryQueryRepositoryTest {
             assertTrue(repo1Latch.await(5, TimeUnit.SECONDS));
             assertTrue(repo2Latch.await(5, TimeUnit.SECONDS));
         } catch(final InterruptedException e ) {
-            System.out.println("PING");
         } finally {
             queries.stop();
             queries2.stop();
@@ -233,10 +235,7 @@ public class InMemoryQueryRepositoryTest {
     public void subscribe_notStarted() throws Exception {
         // Setup a totally in memory QueryRepository.
         final QueryRepository queries = new InMemoryQueryRepository(new InMemoryQueryChangeLog(), SCHEDULE);
-        queries.subscribe(new QueryChangeLogListener() {
-            @Override
-            public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {}
-        });
+        queries.subscribe((queryChangeEvent, newQueryState) -> {});
 
         queries.add("query 2", true);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/query-manager/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/pom.xml b/extras/rya.streams/query-manager/pom.xml
index d321ab5..2141a3a 100644
--- a/extras/rya.streams/query-manager/pom.xml
+++ b/extras/rya.streams/query-manager/pom.xml
@@ -39,7 +39,7 @@ under the License.
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.streams.kafka</artifactId>
         </dependency>
-        
+
         <!-- Apache Daemon dependencies -->
         <dependency>
             <groupId>commons-daemon</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
new file mode 100644
index 0000000..30b4538
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChange.ChangeType;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams system.
+ * <p>
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractIdleService {
+    private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class);
+
+    private final QueryExecutor queryExecutor;
+    private final Scheduler scheduler;
+
+    /**
+     * Map of Rya Instance name to {@link QueryRepository}.
+     */
+    private final Map<String, QueryRepository> queryRepos = new HashMap<>();
+
+    private final ReentrantLock lock = new ReentrantLock();
+
+    private final QueryChangeLogSource source;
+
+    /**
+     * Creates a new {@link QueryManager}.
+     *
+     * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null)
+     * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null)
+     * @param scheduler - The {@link Scheduler} used to discover query changes
+     *      within the {@link QueryChangeLog}s (not null)
+     */
+    public QueryManager(final QueryExecutor queryExecutor, final QueryChangeLogSource source, final Scheduler scheduler) {
+        this.source = requireNonNull(source);
+        this.queryExecutor = requireNonNull(queryExecutor);
+        this.scheduler = requireNonNull(scheduler);
+    }
+
+    /**
+     * Starts running a query.
+     *
+     * @param ryaInstanceName - The Rya instance the query belongs to. (not null)
+     * @param query - The query to run.(not null)
+     */
+    private void runQuery(final String ryaInstanceName, final StreamsQuery query) {
+        requireNonNull(ryaInstanceName);
+        requireNonNull(query);
+        LOG.info("Starting Query: " + query.toString() + " on the rya instance: " + ryaInstanceName);
+
+        try {
+            queryExecutor.startQuery(ryaInstanceName, query);
+        } catch (final QueryExecutorException e) {
+            LOG.error("Failed to start query.", e);
+        }
+    }
+
+    /**
+     * Stops the specified query from running.
+     *
+     * @param queryId - The ID of the query to stop running. (not null)
+     */
+    private void stopQuery(final UUID queryId) {
+        requireNonNull(queryId);
+
+        LOG.info("Stopping query: " + queryId.toString());
+
+        try {
+            queryExecutor.stopQuery(queryId);
+        } catch (final QueryExecutorException e) {
+            LOG.error("Failed to stop query.", e);
+        }
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+        lock.lock();
+        try {
+            LOG.info("Starting Query Manager.");
+            queryExecutor.startAndWait();
+            source.startAndWait();
+
+            // subscribe to the sources to be notified of changes.
+            source.subscribe(new QueryManagerSourceListener());
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+        lock.lock();
+        try {
+            LOG.info("Stopping Query Manager.");
+            source.stopAndWait();
+            queryExecutor.stopAndWait();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * An implementation of {@link QueryChangeLogListener} for the
+     * {@link QueryManager}.
+     * <p>
+     * When notified of a {@link ChangeType} performs one of the following:
+     * <li>{@link ChangeType#CREATE}: Creates a new query using the
+     * {@link QueryExecutor} provided to the {@link QueryManager}</li>
+     * <li>{@link ChangeType#DELETE}: Deletes a running query by stopping the
+     * {@link QueryExecutor} service of the queryID in the event</li>
+     * <li>{@link ChangeType#UPDATE}: If the query is running and the update is
+     * to stop the query, stops the query. Otherwise, if the query is not
+     * running, it is removed.</li>
+     */
+    private class QueryExecutionForwardingListener implements QueryChangeLogListener {
+        private final String ryaInstanceName;
+
+        /**
+         * Creates a new {@link QueryExecutionForwardingListener}.
+         *
+         * @param ryaInstanceName - The rya instance the query change is
+         *        performed on. (not null)
+         */
+        public QueryExecutionForwardingListener(final String ryaInstanceName) {
+            this.ryaInstanceName = requireNonNull(ryaInstanceName);
+        }
+
+        @Override
+        public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
+            LOG.debug("New query change event.");
+            final QueryChange entry = queryChangeEvent.getEntry();
+
+            lock.lock();
+            try {
+
+                switch (entry.getChangeType()) {
+                    case CREATE:
+                        if(!newQueryState.isPresent()) {
+                            LOG.error("The query with ID: " + entry.getQueryId() + " must be present with the change to be created.");
+                            LOG.debug("newQueryState is not allowed to be absent with a CREATE QueryChange, there might be a bug in the QueryRepository.");
+                        } else {
+                            runQuery(ryaInstanceName, newQueryState.get());
+                        }
+                        break;
+                    case DELETE:
+                        stopQuery(entry.getQueryId());
+                        break;
+                    case UPDATE:
+                        if (!newQueryState.isPresent()) {
+                            LOG.error("The query with ID: " + entry.getQueryId() + " must be provided with the update, cannot perform update.");
+                            LOG.debug("newQueryState is not allowed to be absent with a UPDATE QueryChange, there might be a bug in the QueryRepository.");
+                        } else {
+                            final StreamsQuery updatedQuery = newQueryState.get();
+                            if (updatedQuery.isActive()) {
+                                runQuery(ryaInstanceName, updatedQuery);
+                                LOG.info("Starting query: " + updatedQuery.toString());
+                            } else {
+                                stopQuery(updatedQuery.getQueryId());
+                                LOG.info("Stopping query: " + updatedQuery.toString());
+                            }
+                        }
+                        break;
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Listener used by the {@link QueryManager} to be notified when
+     * {@link QueryChangeLog}s are created or deleted.
+     */
+    private class QueryManagerSourceListener implements SourceListener {
+        @Override
+        public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+            lock.lock();
+            try {
+                LOG.info("Discovered new Query Change Log for Rya Instance " + ryaInstanceName + ".");
+                final QueryRepository repo = new InMemoryQueryRepository(log, scheduler);
+                repo.startAndWait();
+                final Set<StreamsQuery> queries = repo.subscribe(new QueryExecutionForwardingListener(ryaInstanceName));
+                queries.forEach(query -> {
+                    if (query.isActive()) {
+                        try {
+                            queryExecutor.startQuery(ryaInstanceName, query);
+                        } catch (IllegalStateException | QueryExecutorException e) {
+                            LOG.error("Unable to start query for rya instance " + ryaInstanceName, e);
+                        }
+                    }
+                });
+                queryRepos.put(ryaInstanceName, repo);
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        @Override
+        public void notifyDelete(final String ryaInstanceName) {
+            lock.lock();
+            try {
+                LOG.info("Notified of deleting QueryChangeLog, stopping all queries belonging to the change log for "
+                        + ryaInstanceName + ".");
+                queryExecutor.stopAll(ryaInstanceName);
+            } catch (final QueryExecutorException e) {
+                LOG.error("Failed to stop all queries belonging to: " + ryaInstanceName, e);
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
new file mode 100644
index 0000000..a1203a0
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
+/**
+ * Test for the {@link QueryManager}
+ */
+public class QueryManagerTest {
+    private static final Scheduler TEST_SCHEDULER = Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS);
+
+    /**
+     * Tests when the query manager is notified to create a new query, the query
+     * is created and started.
+     */
+    @Test
+    public void testCreateQuery() throws Exception {
+        //The new QueryChangeLog
+        final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
+        final String ryaInstance = "ryaTestInstance";
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+
+        // when the query executor is told to start the test query on the test
+        // rya instance, count down on the countdown latch
+        final QueryExecutor qe = mock(QueryExecutor.class);
+        when(qe.isRunning()).thenReturn(true);
+
+        final CountDownLatch queryStarted = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            queryStarted.countDown();
+            return null;
+        }).when(qe).startQuery(eq(ryaInstance), eq(query));
+        final QueryChangeLogSource source = mock(QueryChangeLogSource.class);
+
+        //When the QueryChangeLogSource is subscribed to in the QueryManager, mock notify of a new QueryChangeLog
+        doAnswer(invocation -> {
+            //The listener created by the Query Manager
+            final SourceListener listener = (SourceListener) invocation.getArguments()[0];
+            listener.notifyCreate(ryaInstance, newChangeLog);
+            newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+            return null;
+        }).when(source).subscribe(any(SourceListener.class));
+
+        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        try {
+            qm.startAndWait();
+            queryStarted.await(5, TimeUnit.SECONDS);
+            verify(qe).startQuery(ryaInstance, query);
+        } finally {
+            qm.stopAndWait();
+        }
+    }
+
+    /**
+     * Tests when the query manager is notified to delete a new query, the query
+     * is stopped and deleted.
+     */
+    @Test
+    public void testDeleteQuery() throws Exception {
+        //The new QueryChangeLog
+        final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+        final String ryaInstance = "ryaTestInstance";
+
+        // when the query executor is told to start the test query on the test
+        // rya instance, count down on the countdown latch
+        final QueryExecutor qe = mock(QueryExecutor.class);
+        when(qe.isRunning()).thenReturn(true);
+
+        final CountDownLatch queryStarted = new CountDownLatch(1);
+        final CountDownLatch queryDeleted = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            queryDeleted.countDown();
+            return null;
+        }).when(qe).stopQuery(query.getQueryId());
+        final QueryChangeLogSource source = mock(QueryChangeLogSource.class);
+
+        // when the query executor is told to start the test query on the test
+        // rya instance, count down on the countdown latch
+        doAnswer(invocation -> {
+            queryStarted.countDown();
+            return null;
+        }).when(qe).startQuery(eq(ryaInstance), eq(query));
+
+        //When the QueryChangeLogSource is subscribed to in the QueryManager, mock notify of a new QueryChangeLog
+        // add the query, so it can be removed
+        doAnswer(invocation -> {
+            //The listener created by the Query Manager
+            final SourceListener listener = (SourceListener) invocation.getArguments()[0];
+            listener.notifyCreate(ryaInstance, newChangeLog);
+            Thread.sleep(1000);
+            newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+            queryStarted.await(5, TimeUnit.SECONDS);
+            newChangeLog.write(QueryChange.delete(query.getQueryId()));
+            return null;
+        }).when(source).subscribe(any(SourceListener.class));
+
+        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        try {
+            qm.startAndWait();
+            queryDeleted.await(5, TimeUnit.SECONDS);
+            verify(qe).stopQuery(query.getQueryId());
+        } finally {
+            qm.stopAndWait();
+        }
+    }
+
+    /**
+     * Tests when the query manager is notified to update an existing query, the
+     * query is stopped.
+     */
+    @Test
+    public void testUpdateQuery() throws Exception {
+        // The new QueryChangeLog
+        final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+        final String ryaInstance = "ryaTestInstance";
+
+        // when the query executor is told to start the test query on the test
+        // rya instance, count down on the countdown latch
+        final QueryExecutor qe = mock(QueryExecutor.class);
+        when(qe.isRunning()).thenReturn(true);
+
+        final CountDownLatch queryStarted = new CountDownLatch(1);
+        final CountDownLatch queryDeleted = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            queryDeleted.countDown();
+            return null;
+        }).when(qe).stopQuery(query.getQueryId());
+        final QueryChangeLogSource source = mock(QueryChangeLogSource.class);
+
+        // when the query executor is told to start the test query on the test
+        // rya instance, count down on the countdown latch
+        doAnswer(invocation -> {
+            queryStarted.countDown();
+            return null;
+        }).when(qe).startQuery(eq(ryaInstance), eq(query));
+
+        // When the QueryChangeLogSource is subscribed to in the QueryManager,
+        // mock notify of a new QueryChangeLog
+        // add the query, so it can be removed
+        doAnswer(invocation -> {
+            // The listener created by the Query Manager
+            final SourceListener listener = (SourceListener) invocation.getArguments()[0];
+            listener.notifyCreate(ryaInstance, newChangeLog);
+            Thread.sleep(1000);
+            newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+            queryStarted.await(5, TimeUnit.SECONDS);
+            newChangeLog.write(QueryChange.update(query.getQueryId(), false));
+            return null;
+        }).when(source).subscribe(any(SourceListener.class));
+
+        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        try {
+            qm.startAndWait();
+            queryDeleted.await(10, TimeUnit.SECONDS);
+            verify(qe).stopQuery(query.getQueryId());
+        } finally {
+            qm.stopAndWait();
+        }
+    }
+}