You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:20:56 UTC
[10/22] incubator-rya git commit: Rya 451 Query manager
Rya 451 Query manager
QueryManager with tests
updated InMemoryQueryRepository and its tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e355f73a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e355f73a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e355f73a
Branch: refs/heads/master
Commit: e355f73ae6c56eb439e1d56722cd20c5287cbe04
Parents: a11ca4a
Author: Andrew Smith <sm...@gmail.com>
Authored: Tue Jan 30 14:01:54 2018 -0500
Committer: Valiyil <Pu...@parsons.com>
Committed: Fri Mar 9 12:59:47 2018 -0500
----------------------------------------------------------------------
.../rya/streams/api/entity/StreamsQuery.java | 15 ++
.../api/queries/InMemoryQueryRepository.java | 7 +-
.../api/queries/QueryChangeLogListener.java | 8 +-
.../queries/InMemoryQueryRepositoryTest.java | 55 ++--
extras/rya.streams/query-manager/pom.xml | 2 +-
.../rya/streams/querymanager/QueryManager.java | 255 +++++++++++++++++++
.../streams/querymanager/QueryManagerTest.java | 195 ++++++++++++++
7 files changed, 504 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
index bd750a6..7194834 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
@@ -85,4 +85,19 @@ public class StreamsQuery {
}
return false;
}
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("ID: ");
+ sb.append(getQueryId().toString() + "\n");
+ sb.append("Query: ");
+ sb.append(getSparql() + "\n");
+ sb.append("Is ");
+ if (!isActive) {
+ sb.append(" Not ");
+ }
+ sb.append("Running.\n");
+ return sb.toString();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index dca040f..5fb0297 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -188,8 +188,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
// Our internal cache is already up to date, so just return its values.
return queriesCache.values()
- .stream()
- .collect(Collectors.toSet());
+ .stream()
+ .collect(Collectors.toSet());
} finally {
lock.unlock();
@@ -253,7 +253,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
break;
}
- listeners.forEach(listener -> listener.notify(entry));
+ final Optional<StreamsQuery> newQueryState = Optional.ofNullable(queriesCache.get(queryId));
+ listeners.forEach(listener -> listener.notify(entry, newQueryState));
cachePosition = Optional.of( entry.getPosition() );
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
index 2b61227..2f0bcc3 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
@@ -18,6 +18,10 @@
*/
package org.apache.rya.streams.api.queries;
+import java.util.Optional;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -36,6 +40,8 @@ public interface QueryChangeLogListener {
* on the repository.
*
* @param queryChangeEvent - The event that occurred. (not null)
+ * @param newQueryState - The new state of the query after the query change event, this will be
+ * absent if the change type is DELETE. (not null)
*/
- public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent);
+ public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 76c3216..3b3d48a 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -165,13 +165,14 @@ public class InMemoryQueryRepositoryTest {
// Add a query to it.
final StreamsQuery query = queries.add("query 1", true);
- final Set<StreamsQuery> existing = queries.subscribe(new QueryChangeLogListener() {
- @Override
- public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
- final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L,
- QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
- assertEquals(expected, queryChangeEvent);
- }
+ final Set<StreamsQuery> existing = queries.subscribe((queryChangeEvent, newQueryState) -> {
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L,
+ QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ final Optional<StreamsQuery> expectedQueryState = Optional.of(
+ new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+
+ assertEquals(expected, queryChangeEvent);
+ assertEquals(expectedQueryState, newQueryState);
});
assertEquals(Sets.newHashSet(query), existing);
@@ -195,26 +196,28 @@ public class InMemoryQueryRepositoryTest {
//show listener on repo that query was added to is being notified of the new query.
final CountDownLatch repo1Latch = new CountDownLatch(1);
- queries.subscribe(new QueryChangeLogListener() {
- @Override
- public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
- final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
- QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
- assertEquals(expected, queryChangeEvent);
- repo1Latch.countDown();
- }
+ queries.subscribe((queryChangeEvent, newQueryState) -> {
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+ QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ final Optional<StreamsQuery> expectedQueryState = Optional.of(
+ new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+
+ assertEquals(expected, queryChangeEvent);
+ assertEquals(expectedQueryState, newQueryState);
+ repo1Latch.countDown();
});
//show listener not on the repo that query was added to is being notified as well.
final CountDownLatch repo2Latch = new CountDownLatch(1);
- queries2.subscribe(new QueryChangeLogListener() {
- @Override
- public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
- final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
- QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
- assertEquals(expected, queryChangeEvent);
- repo2Latch.countDown();
- }
+ queries2.subscribe((queryChangeEvent, newQueryState) -> {
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+ QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ final Optional<StreamsQuery> expectedQueryState = Optional.of(
+ new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+
+ assertEquals(expected, queryChangeEvent);
+ assertEquals(expectedQueryState, newQueryState);
+ repo2Latch.countDown();
});
queries.add("query 2", true);
@@ -222,7 +225,6 @@ public class InMemoryQueryRepositoryTest {
assertTrue(repo1Latch.await(5, TimeUnit.SECONDS));
assertTrue(repo2Latch.await(5, TimeUnit.SECONDS));
} catch(final InterruptedException e ) {
- System.out.println("PING");
} finally {
queries.stop();
queries2.stop();
@@ -233,10 +235,7 @@ public class InMemoryQueryRepositoryTest {
public void subscribe_notStarted() throws Exception {
// Setup a totally in memory QueryRepository.
final QueryRepository queries = new InMemoryQueryRepository(new InMemoryQueryChangeLog(), SCHEDULE);
- queries.subscribe(new QueryChangeLogListener() {
- @Override
- public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {}
- });
+ queries.subscribe((queryChangeEvent, newQueryState) -> {});
queries.add("query 2", true);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/query-manager/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/pom.xml b/extras/rya.streams/query-manager/pom.xml
index d321ab5..2141a3a 100644
--- a/extras/rya.streams/query-manager/pom.xml
+++ b/extras/rya.streams/query-manager/pom.xml
@@ -39,7 +39,7 @@ under the License.
<groupId>org.apache.rya</groupId>
<artifactId>rya.streams.kafka</artifactId>
</dependency>
-
+
<!-- Apache Daemon dependencies -->
<dependency>
<groupId>commons-daemon</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
new file mode 100644
index 0000000..30b4538
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChange.ChangeType;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams system.
+ * <p>
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractIdleService {
+ private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class);
+
+ private final QueryExecutor queryExecutor;
+ private final Scheduler scheduler;
+
+ /**
+ * Map of Rya Instance name to {@link QueryRepository}.
+ */
+ private final Map<String, QueryRepository> queryRepos = new HashMap<>();
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private final QueryChangeLogSource source;
+
+ /**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null)
+ * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null)
+ * @param scheduler - The {@link Scheduler} used to discover query changes
+ * within the {@link QueryChangeLog}s (not null)
+ */
+ public QueryManager(final QueryExecutor queryExecutor, final QueryChangeLogSource source, final Scheduler scheduler) {
+ this.source = requireNonNull(source);
+ this.queryExecutor = requireNonNull(queryExecutor);
+ this.scheduler = requireNonNull(scheduler);
+ }
+
+ /**
+ * Starts running a query.
+ *
+ * @param ryaInstanceName - The Rya instance the query belongs to. (not null)
+ * @param query - The query to run.(not null)
+ */
+ private void runQuery(final String ryaInstanceName, final StreamsQuery query) {
+ requireNonNull(ryaInstanceName);
+ requireNonNull(query);
+ LOG.info("Starting Query: " + query.toString() + " on the rya instance: " + ryaInstanceName);
+
+ try {
+ queryExecutor.startQuery(ryaInstanceName, query);
+ } catch (final QueryExecutorException e) {
+ LOG.error("Failed to start query.", e);
+ }
+ }
+
+ /**
+ * Stops the specified query from running.
+ *
+ * @param queryId - The ID of the query to stop running. (not null)
+ */
+ private void stopQuery(final UUID queryId) {
+ requireNonNull(queryId);
+
+ LOG.info("Stopping query: " + queryId.toString());
+
+ try {
+ queryExecutor.stopQuery(queryId);
+ } catch (final QueryExecutorException e) {
+ LOG.error("Failed to stop query.", e);
+ }
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ lock.lock();
+ try {
+ LOG.info("Starting Query Manager.");
+ queryExecutor.startAndWait();
+ source.startAndWait();
+
+ // subscribe to the sources to be notified of changes.
+ source.subscribe(new QueryManagerSourceListener());
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ lock.lock();
+ try {
+ LOG.info("Stopping Query Manager.");
+ source.stopAndWait();
+ queryExecutor.stopAndWait();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * An implementation of {@link QueryChangeLogListener} for the
+ * {@link QueryManager}.
+ * <p>
+ * When notified of a {@link ChangeType} performs one of the following:
+ * <li>{@link ChangeType#CREATE}: Creates a new query using the
+ * {@link QueryExecutor} provided to the {@link QueryManager}</li>
+ * <li>{@link ChangeType#DELETE}: Deletes a running query by stopping the
+ * {@link QueryExecutor} service of the queryID in the event</li>
+ * <li>{@link ChangeType#UPDATE}: If the query is running and the update is
+ * to stop the query, stops the query. Otherwise, if the query is not
+ * running, it is removed.</li>
+ */
+ private class QueryExecutionForwardingListener implements QueryChangeLogListener {
+ private final String ryaInstanceName;
+
+ /**
+ * Creates a new {@link QueryExecutionForwardingListener}.
+ *
+ * @param ryaInstanceName - The rya instance the query change is
+ * performed on. (not null)
+ */
+ public QueryExecutionForwardingListener(final String ryaInstanceName) {
+ this.ryaInstanceName = requireNonNull(ryaInstanceName);
+ }
+
+ @Override
+ public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
+ LOG.debug("New query change event.");
+ final QueryChange entry = queryChangeEvent.getEntry();
+
+ lock.lock();
+ try {
+
+ switch (entry.getChangeType()) {
+ case CREATE:
+ if(!newQueryState.isPresent()) {
+ LOG.error("The query with ID: " + entry.getQueryId() + " must be present with the change to be created.");
+ LOG.debug("newQueryState is not allowed to be absent with a CREATE QueryChange, there might be a bug in the QueryRepository.");
+ } else {
+ runQuery(ryaInstanceName, newQueryState.get());
+ }
+ break;
+ case DELETE:
+ stopQuery(entry.getQueryId());
+ break;
+ case UPDATE:
+ if (!newQueryState.isPresent()) {
+ LOG.error("The query with ID: " + entry.getQueryId() + " must be provided with the update, cannot perform update.");
+ LOG.debug("newQueryState is not allowed to be absent with a UPDATE QueryChange, there might be a bug in the QueryRepository.");
+ } else {
+ final StreamsQuery updatedQuery = newQueryState.get();
+ if (updatedQuery.isActive()) {
+ runQuery(ryaInstanceName, updatedQuery);
+ LOG.info("Starting query: " + updatedQuery.toString());
+ } else {
+ stopQuery(updatedQuery.getQueryId());
+ LOG.info("Stopping query: " + updatedQuery.toString());
+ }
+ }
+ break;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Listener used by the {@link QueryManager} to be notified when
+ * {@link QueryChangeLog}s are created or deleted.
+ */
+ private class QueryManagerSourceListener implements SourceListener {
+ @Override
+ public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
+ lock.lock();
+ try {
+ LOG.info("Discovered new Query Change Log for Rya Instance " + ryaInstanceName + ".");
+ final QueryRepository repo = new InMemoryQueryRepository(log, scheduler);
+ repo.startAndWait();
+ final Set<StreamsQuery> queries = repo.subscribe(new QueryExecutionForwardingListener(ryaInstanceName));
+ queries.forEach(query -> {
+ if (query.isActive()) {
+ try {
+ queryExecutor.startQuery(ryaInstanceName, query);
+ } catch (IllegalStateException | QueryExecutorException e) {
+ LOG.error("Unable to start query for rya instance " + ryaInstanceName, e);
+ }
+ }
+ });
+ queryRepos.put(ryaInstanceName, repo);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void notifyDelete(final String ryaInstanceName) {
+ lock.lock();
+ try {
+ LOG.info("Notified of deleting QueryChangeLog, stopping all queries belonging to the change log for "
+ + ryaInstanceName + ".");
+ queryExecutor.stopAll(ryaInstanceName);
+ } catch (final QueryExecutorException e) {
+ LOG.error("Failed to stop all queries belonging to: " + ryaInstanceName, e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
new file mode 100644
index 0000000..a1203a0
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
+/**
+ * Test for the {@link QueryManager}
+ */
+public class QueryManagerTest {
+ private static final Scheduler TEST_SCHEDULER = Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS);
+
+ /**
+ * Tests when the query manager is notified to create a new query, the query
+ * is created and started.
+ */
+ @Test
+ public void testCreateQuery() throws Exception {
+ //The new QueryChangeLog
+ final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
+ final String ryaInstance = "ryaTestInstance";
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+
+ // when the query executor is told to start the test query on the test
+ // rya instance, count down on the countdown latch
+ final QueryExecutor qe = mock(QueryExecutor.class);
+ when(qe.isRunning()).thenReturn(true);
+
+ final CountDownLatch queryStarted = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ queryStarted.countDown();
+ return null;
+ }).when(qe).startQuery(eq(ryaInstance), eq(query));
+ final QueryChangeLogSource source = mock(QueryChangeLogSource.class);
+
+ //When the QueryChangeLogSource is subscribed to in the QueryManager, mock notify of a new QueryChangeLog
+ doAnswer(invocation -> {
+ //The listener created by the Query Manager
+ final SourceListener listener = (SourceListener) invocation.getArguments()[0];
+ listener.notifyCreate(ryaInstance, newChangeLog);
+ newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+ return null;
+ }).when(source).subscribe(any(SourceListener.class));
+
+ final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+ try {
+ qm.startAndWait();
+ queryStarted.await(5, TimeUnit.SECONDS);
+ verify(qe).startQuery(ryaInstance, query);
+ } finally {
+ qm.stopAndWait();
+ }
+ }
+
+ /**
+ * Tests when the query manager is notified to delete a new query, the query
+ * is stopped and deleted.
+ */
+ @Test
+ public void testDeleteQuery() throws Exception {
+ //The new QueryChangeLog
+ final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+ final String ryaInstance = "ryaTestInstance";
+
+ // when the query executor is told to start the test query on the test
+ // rya instance, count down on the countdown latch
+ final QueryExecutor qe = mock(QueryExecutor.class);
+ when(qe.isRunning()).thenReturn(true);
+
+ final CountDownLatch queryStarted = new CountDownLatch(1);
+ final CountDownLatch queryDeleted = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ queryDeleted.countDown();
+ return null;
+ }).when(qe).stopQuery(query.getQueryId());
+ final QueryChangeLogSource source = mock(QueryChangeLogSource.class);
+
+ // when the query executor is told to start the test query on the test
+ // rya instance, count down on the countdown latch
+ doAnswer(invocation -> {
+ queryStarted.countDown();
+ return null;
+ }).when(qe).startQuery(eq(ryaInstance), eq(query));
+
+ //When the QueryChangeLogSource is subscribed to in the QueryManager, mock notify of a new QueryChangeLog
+ // add the query, so it can be removed
+ doAnswer(invocation -> {
+ //The listener created by the Query Manager
+ final SourceListener listener = (SourceListener) invocation.getArguments()[0];
+ listener.notifyCreate(ryaInstance, newChangeLog);
+ Thread.sleep(1000);
+ newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+ queryStarted.await(5, TimeUnit.SECONDS);
+ newChangeLog.write(QueryChange.delete(query.getQueryId()));
+ return null;
+ }).when(source).subscribe(any(SourceListener.class));
+
+ final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+ try {
+ qm.startAndWait();
+ queryDeleted.await(5, TimeUnit.SECONDS);
+ verify(qe).stopQuery(query.getQueryId());
+ } finally {
+ qm.stopAndWait();
+ }
+ }
+
+ /**
+ * Tests when the query manager is notified to update an existing query, the
+ * query is stopped.
+ */
+ @Test
+ public void testUpdateQuery() throws Exception {
+ // The new QueryChangeLog
+ final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+ final String ryaInstance = "ryaTestInstance";
+
+ // when the query executor is told to start the test query on the test
+ // rya instance, count down on the countdown latch
+ final QueryExecutor qe = mock(QueryExecutor.class);
+ when(qe.isRunning()).thenReturn(true);
+
+ final CountDownLatch queryStarted = new CountDownLatch(1);
+ final CountDownLatch queryDeleted = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ queryDeleted.countDown();
+ return null;
+ }).when(qe).stopQuery(query.getQueryId());
+ final QueryChangeLogSource source = mock(QueryChangeLogSource.class);
+
+ // when the query executor is told to start the test query on the test
+ // rya instance, count down on the countdown latch
+ doAnswer(invocation -> {
+ queryStarted.countDown();
+ return null;
+ }).when(qe).startQuery(eq(ryaInstance), eq(query));
+
+ // When the QueryChangeLogSource is subscribed to in the QueryManager,
+ // mock notify of a new QueryChangeLog
+ // add the query, so it can be removed
+ doAnswer(invocation -> {
+ // The listener created by the Query Manager
+ final SourceListener listener = (SourceListener) invocation.getArguments()[0];
+ listener.notifyCreate(ryaInstance, newChangeLog);
+ Thread.sleep(1000);
+ newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+ queryStarted.await(5, TimeUnit.SECONDS);
+ newChangeLog.write(QueryChange.update(query.getQueryId(), false));
+ return null;
+ }).when(source).subscribe(any(SourceListener.class));
+
+ final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+ try {
+ qm.startAndWait();
+ queryDeleted.await(10, TimeUnit.SECONDS);
+ verify(qe).stopQuery(query.getQueryId());
+ } finally {
+ qm.stopAndWait();
+ }
+ }
+}