You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:20:50 UTC
[04/22] incubator-rya git commit: Rya 452 Updated QueryRepository
Rya 452 Updated QueryRepository
Updated QueryRepository to be a Service
Updated InMemoryQueryRepository to be an AbstractScheduledService
Added listeners to InMemoryQueryRepository
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/36af1153
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/36af1153
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/36af1153
Branch: refs/heads/master
Commit: 36af1153758c943e08808b720adece86278de41f
Parents: eb07bf6
Author: Andrew Smith <sm...@gmail.com>
Authored: Tue Jan 23 15:20:50 2018 -0500
Committer: Valiyil <Pu...@parsons.com>
Committed: Fri Mar 9 12:59:37 2018 -0500
----------------------------------------------------------------------
.../api/queries/InMemoryQueryRepository.java | 90 ++++++++-
.../api/queries/QueryChangeLogListener.java | 41 ++++
.../streams/api/queries/QueryRepository.java | 38 +++-
.../queries/InMemoryQueryRepositoryTest.java | 194 ++++++++++++++-----
.../streams/client/command/AddQueryCommand.java | 8 +-
.../client/command/DeleteQueryCommand.java | 7 +-
.../client/command/ListQueriesCommand.java | 7 +-
.../streams/client/command/RunQueryCommand.java | 9 +-
.../client/command/StreamResultsCommand.java | 7 +-
.../client/command/AddQueryCommandIT.java | 11 +-
.../client/command/DeleteQueryCommandIT.java | 11 +-
.../client/command/ListQueryCommandIT.java | 11 +-
.../client/command/RunQueryCommandIT.java | 5 +-
.../kafka/interactor/KafkaRunQueryIT.java | 4 +-
14 files changed, 349 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index f4b7b25..dca040f 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -20,7 +20,9 @@ package org.apache.rya.streams.api.queries;
import static java.util.Objects.requireNonNull;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -33,6 +35,8 @@ import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import info.aduna.iteration.CloseableIteration;
@@ -46,7 +50,7 @@ import info.aduna.iteration.CloseableIteration;
* Thread safe.
*/
@DefaultAnnotation(NonNull.class)
-public class InMemoryQueryRepository implements QueryRepository {
+public class InMemoryQueryRepository extends AbstractScheduledService implements QueryRepository {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class);
private final ReentrantLock lock = new ReentrantLock();
@@ -67,20 +71,34 @@ public class InMemoryQueryRepository implements QueryRepository {
private final Map<UUID, StreamsQuery> queriesCache = new HashMap<>();
/**
+ * The listeners to be notified when new QueryChangeLogs come in.
+ */
+ private final List<QueryChangeLogListener> listeners = new ArrayList<>();
+
+ /**
+ * The {@link Scheduler} the repository uses to periodically poll for query updates.
+ */
+ private final Scheduler scheduler;
+
+ /**
* Constructs an instance of {@link InMemoryQueryRepository}.
*
* @param changeLog - The change log that this repository will maintain and be based on. (not null)
+ * @param scheduler - The {@link Scheduler} this service uses to periodically check for query updates. (not null)
*/
- public InMemoryQueryRepository(final QueryChangeLog changeLog) {
+ public InMemoryQueryRepository(final QueryChangeLog changeLog, final Scheduler scheduler) {
this.changeLog = requireNonNull(changeLog);
+ this.scheduler = requireNonNull(scheduler);
}
@Override
- public StreamsQuery add(final String query, final boolean isActive) throws QueryRepositoryException {
+ public StreamsQuery add(final String query, final boolean isActive)
+ throws QueryRepositoryException, IllegalStateException {
requireNonNull(query);
lock.lock();
try {
+ checkState();
// First record the change to the log.
final UUID queryId = UUID.randomUUID();
final QueryChange change = QueryChange.create(queryId, query, isActive);
@@ -100,11 +118,12 @@ public class InMemoryQueryRepository implements QueryRepository {
}
@Override
- public Optional<StreamsQuery> get(final UUID queryId) throws QueryRepositoryException {
+ public Optional<StreamsQuery> get(final UUID queryId) throws QueryRepositoryException, IllegalStateException {
requireNonNull(queryId);
lock.lock();
try {
+ checkState();
// Update the cache to represent what is currently in the log.
updateCache();
@@ -115,11 +134,13 @@ public class InMemoryQueryRepository implements QueryRepository {
}
@Override
- public void updateIsActive(final UUID queryId, final boolean isActive) throws QueryRepositoryException {
+ public void updateIsActive(final UUID queryId, final boolean isActive)
+ throws QueryRepositoryException, IllegalStateException {
requireNonNull(queryId);
lock.lock();
try {
+ checkState();
// Update the cache to represent what is currently in the log.
updateCache();
@@ -140,11 +161,12 @@ public class InMemoryQueryRepository implements QueryRepository {
}
@Override
- public void delete(final UUID queryId) throws QueryRepositoryException {
+ public void delete(final UUID queryId) throws QueryRepositoryException, IllegalStateException {
requireNonNull(queryId);
lock.lock();
try {
+ checkState();
// First record the change to the log.
final QueryChange change = QueryChange.delete(queryId);
changeLog.write(change);
@@ -157,9 +179,10 @@ public class InMemoryQueryRepository implements QueryRepository {
}
@Override
- public Set<StreamsQuery> list() throws QueryRepositoryException {
+ public Set<StreamsQuery> list() throws QueryRepositoryException, IllegalStateException {
lock.lock();
try {
+ checkState();
// Update the cache to represent what is currently in the log.
updateCache();
@@ -174,7 +197,8 @@ public class InMemoryQueryRepository implements QueryRepository {
}
@Override
- public void close() throws Exception {
+ protected void shutDown() throws Exception {
+ super.shutDown();
lock.lock();
try {
changeLog.close();
@@ -229,6 +253,8 @@ public class InMemoryQueryRepository implements QueryRepository {
break;
}
+ listeners.forEach(listener -> listener.notify(entry));
+
cachePosition = Optional.of( entry.getPosition() );
}
@@ -247,4 +273,52 @@ public class InMemoryQueryRepository implements QueryRepository {
}
}
}
+
+ @Override
+ protected void runOneIteration() throws Exception {
+ lock.lock();
+ try {
+ updateCache();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return scheduler;
+ }
+
+ @Override
+ public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener) {
+ //locks to prevent the current state from changing while subscribing.
+ lock.lock();
+ try {
+ listeners.add(listener);
+
+ //return the current state of the query repository
+ return queriesCache.values()
+ .stream()
+ .collect(Collectors.toSet());
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void unsubscribe(final QueryChangeLogListener listener) {
+ lock.lock();
+ try {
+ listeners.remove(listener);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void checkState() {
+ if (!super.isRunning() && !listeners.isEmpty()) {
+ throw new IllegalStateException(
+ "The Query Repository is subscribed to, but the service has not been started.");
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
new file mode 100644
index 0000000..2b61227
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.queries;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Listener to be notified when {@link QueryChange}s occur on a {@link QueryChangeLog}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface QueryChangeLogListener {
+ /**
+ * Notifies the listener that a query change event has occurred in the change log.
+ * <p>
+ * <b>Note:</b>
+ * <p>
+ * The QueryRepository blocks when notifying this listener. Long lasting operations
+ * should not be performed within this function. Doing so will block all operations
+ * on the repository.
+ *
+ * @param queryChangeEvent - The event that occurred. (not null)
+ */
+ public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
index fd51b2f..4d8b2db 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
@@ -25,14 +25,20 @@ import java.util.UUID;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
+import com.google.common.util.concurrent.Service;
+
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
* Repository for adding, deleting, and listing active queries in Rya Streams.
+ *
+ * This service only needs to be started if it is being subscribed to. An
+ * {@link IllegalStateException} will be thrown if the service is subscribed to
+ * and used without being started.
*/
@DefaultAnnotation(NonNull.class)
-public interface QueryRepository extends AutoCloseable {
+public interface QueryRepository extends Service {
/**
* Adds a new query to Rya Streams.
@@ -42,8 +48,9 @@ public interface QueryRepository extends AutoCloseable {
* otherwise {@code false}.
* @return The {@link StreamsQuery} used in Rya Streams.
* @throws QueryRepositoryException Could not add the query.
+ * @throws IllegalStateException The Service has not been started, but has been subscribed to.
*/
- public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException;
+ public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException, IllegalStateException;
/**
* Updates the isActive state of a {@link StreamsQuery}. Setting this value to {@code true}
@@ -53,8 +60,9 @@ public interface QueryRepository extends AutoCloseable {
* @param queryId - Identifies which query will be updated. (not null)
* @param isActive - The new isActive state for the query.
* @throws QueryRepositoryException If the query does not exist or something else caused the change to fail.
+ * @throws IllegalStateException The Service has not been started, but has been subscribed to.
*/
- public void updateIsActive(UUID queryId, boolean isActive) throws QueryRepositoryException;
+ public void updateIsActive(UUID queryId, boolean isActive) throws QueryRepositoryException, IllegalStateException;
/**
* Get an existing query from Rya Streams.
@@ -62,24 +70,42 @@ public interface QueryRepository extends AutoCloseable {
* @param queryId - Identifies which query will be fetched.
* @return the {@link StreamsQuery} for the id if one exists; otherwise empty.
* @throws QueryRepositoryException The query could not be fetched.
+ * @throws IllegalStateException The Service has not been started, but has been subscribed to.
*/
- public Optional<StreamsQuery> get(UUID queryId) throws QueryRepositoryException;
+ public Optional<StreamsQuery> get(UUID queryId) throws QueryRepositoryException, IllegalStateException;
/**
* Removes an existing query from Rya Streams.
*
* @param queryID - The {@link UUID} of the query to remove. (not null)
* @throws QueryRepositoryException Could not delete the query.
+ * @throws IllegalStateException The Service has not been started, but has been subscribed to.
*/
- public void delete(UUID queryID) throws QueryRepositoryException;
+ public void delete(UUID queryID) throws QueryRepositoryException, IllegalStateException;
/**
* Lists all existing queries in Rya Streams.
*
* @return - A List of the current {@link StreamsQuery}s
* @throws QueryRepositoryException The {@link StreamsQuery}s could not be listed.
+ * @throws IllegalStateException The Service has not been started, but has been subscribed to.
+ */
+ public Set<StreamsQuery> list() throws QueryRepositoryException, IllegalStateException;
+
+ /**
+ * Subscribes a {@link QueryChangeLogListener} to the {@link QueryRepository}.
+ *
+ * @param listener - The {@link QueryChangeLogListener} to subscribe to this {@link QueryRepository}. (not null)
+ * @return The current state of the repository in the form of {@link StreamsQuery}s.
+ */
+ public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener);
+
+ /**
+ * Unsubscribe a {@link QueryChangeLogListener} from the {@link QueryRepository}.
+ *
+ * @param listener - The {@link QueryChangeLogListener} to unsubscribe from this {@link QueryRepository}. (not null)
*/
- public Set<StreamsQuery> list() throws QueryRepositoryException;
+ public void unsubscribe(final QueryChangeLogListener listener);
/**
* A function of {@link QueryRepository} was unable to perform a function.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 22e616d..76c3216 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -20,6 +20,7 @@ package org.apache.rya.streams.api.queries;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -27,56 +28,62 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
import org.junit.Test;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
/**
* Unit tests the methods of {@link InMemoryQueryRepository}.
*/
public class InMemoryQueryRepositoryTest {
+ private static final Scheduler SCHEDULE = Scheduler.newFixedRateSchedule(0L, 100, TimeUnit.MILLISECONDS);
@Test
public void canReadAddedQueries() throws Exception {
// Setup a totally in memory QueryRepository.
- try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
- // Add some queries to it.
- final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1", true) );
- expected.add( queries.add("query 2", false) );
- expected.add( queries.add("query 3", true) );
-
- // Show they are in the list of all queries.
- final Set<StreamsQuery> stored = queries.list();
- assertEquals(expected, stored);
- }
+ final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+ // Add some queries to it.
+ final Set<StreamsQuery> expected = new HashSet<>();
+ expected.add( queries.add("query 1", true) );
+ expected.add( queries.add("query 2", false) );
+ expected.add( queries.add("query 3", true) );
+
+ // Show they are in the list of all queries.
+ final Set<StreamsQuery> stored = queries.list();
+ assertEquals(expected, stored);
}
@Test
public void deletedQueriesDisappear() throws Exception {
// Setup a totally in memory QueryRepository.
- try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
- // Add some queries to it. The second one we will delete.
- final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1", true) );
- final UUID deletedMeId = queries.add("query 2", false).getQueryId();
- expected.add( queries.add("query 3", true) );
-
- // Delete the second query.
- queries.delete( deletedMeId );
-
- // Show only queries 1 and 3 are in the list.
- final Set<StreamsQuery> stored = queries.list();
- assertEquals(expected, stored);
- }
+ final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+ // Add some queries to it. The second one we will delete.
+ final Set<StreamsQuery> expected = new HashSet<>();
+ expected.add( queries.add("query 1", true) );
+ final UUID deletedMeId = queries.add("query 2", false).getQueryId();
+ expected.add( queries.add("query 3", true) );
+
+ // Delete the second query.
+ queries.delete( deletedMeId );
+
+ // Show only queries 1 and 3 are in the list.
+ final Set<StreamsQuery> stored = queries.list();
+ assertEquals(expected, stored);
}
@Test
public void initializedWithPopulatedChangeLog() throws Exception {
// Setup a totally in memory QueryRepository. Hold onto the change log so that we can use it again later.
final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
- try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) {
+ final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE );
+ try {
+ queries.startAndWait();
// Add some queries and deletes to it.
final Set<StreamsQuery> expected = new HashSet<>();
expected.add( queries.add("query 1", true) );
@@ -85,11 +92,16 @@ public class InMemoryQueryRepositoryTest {
queries.delete( deletedMeId );
// Create a new totally in memory QueryRepository.
- try(final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog )) {
+ final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE );
+ try {
// Listing the queries should work using an initialized change log.
final Set<StreamsQuery> stored = initializedQueries.list();
assertEquals(expected, stored);
+ } finally {
+ queries.stop();
}
+ } finally {
+ queries.stop();
}
}
@@ -100,50 +112,132 @@ public class InMemoryQueryRepositoryTest {
when(changeLog.readFromStart()).thenThrow(new QueryChangeLogException("Mocked exception."));
// Create the QueryRepository and invoke one of the methods.
- try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) {
- queries.list();
- }
+ final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE );
+ queries.list();
}
@Test
public void get_present() throws Exception {
// Setup a totally in memory QueryRepository.
- try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
- // Add a query to it.
- final StreamsQuery query = queries.add("query 1", true);
+ final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+ // Add a query to it.
+ final StreamsQuery query = queries.add("query 1", true);
- // Show the fetched query matches the expected ones.
- final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
- assertEquals(query, fetched.get());
- }
+ // Show the fetched query matches the expected ones.
+ final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
+ assertEquals(query, fetched.get());
}
@Test
public void get_notPresent() throws Exception {
// Setup a totally in memory QueryRepository.
- try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
- // Fetch a query that was never added to the repository.
- final Optional<StreamsQuery> query = queries.get(UUID.randomUUID());
+ final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+ // Fetch a query that was never added to the repository.
+ final Optional<StreamsQuery> query = queries.get(UUID.randomUUID());
- // Show it could not be found.
- assertFalse(query.isPresent());
- }
+ // Show it could not be found.
+ assertFalse(query.isPresent());
}
@Test
public void update() throws Exception {
// Setup a totally in memory QueryRepository.
- try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+ final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+ // Add a query to it.
+ final StreamsQuery query = queries.add("query 1", true);
+
+ // Change the isActive state of that query.
+ queries.updateIsActive(query.getQueryId(), false);
+
+ // Show the fetched query matches the expected one.
+ final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
+ final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false);
+ assertEquals(expected, fetched.get());
+ }
+
+ @Test
+ public void updateListenerNotify() throws Exception {
+ // Setup a totally in memory QueryRepository.
+ final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
+ try {
+ queries.startAndWait();
+
// Add a query to it.
final StreamsQuery query = queries.add("query 1", true);
- // Change the isActive state of that query.
- queries.updateIsActive(query.getQueryId(), false);
+ final Set<StreamsQuery> existing = queries.subscribe(new QueryChangeLogListener() {
+ @Override
+ public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L,
+ QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ assertEquals(expected, queryChangeEvent);
+ }
+ });
+
+ assertEquals(Sets.newHashSet(query), existing);
+
+ queries.add("query 2", true);
+ } finally {
+ queries.stop();
+ }
+ }
- // Show the fetched query matches the expected one.
- final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
- final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false);
- assertEquals(expected, fetched.get());
+ @Test
+ public void updateListenerNotify_multiClient() throws Exception {
+ // Setup a totally in memory QueryRepository.
+ final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+ final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE );
+ final QueryRepository queries2 = new InMemoryQueryRepository( changeLog, SCHEDULE );
+
+ try {
+ queries.startAndWait();
+ queries2.startAndWait();
+
+ //show listener on repo that query was added to is being notified of the new query.
+ final CountDownLatch repo1Latch = new CountDownLatch(1);
+ queries.subscribe(new QueryChangeLogListener() {
+ @Override
+ public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+ QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ assertEquals(expected, queryChangeEvent);
+ repo1Latch.countDown();
+ }
+ });
+
+ //show listener not on the repo that query was added to is being notified as well.
+ final CountDownLatch repo2Latch = new CountDownLatch(1);
+ queries2.subscribe(new QueryChangeLogListener() {
+ @Override
+ public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+ QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+ assertEquals(expected, queryChangeEvent);
+ repo2Latch.countDown();
+ }
+ });
+
+ queries.add("query 2", true);
+
+ assertTrue(repo1Latch.await(5, TimeUnit.SECONDS));
+ assertTrue(repo2Latch.await(5, TimeUnit.SECONDS));
+ } catch(final InterruptedException e ) {
+ System.out.println("PING");
+ } finally {
+ queries.stop();
+ queries2.stop();
}
}
+
+ @Test(expected = IllegalStateException.class)
+ public void subscribe_notStarted() throws Exception {
+ // Setup a totally in memory QueryRepository.
+ final QueryRepository queries = new InMemoryQueryRepository(new InMemoryQueryChangeLog(), SCHEDULE);
+ queries.subscribe(new QueryChangeLogListener() {
+ @Override
+ public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {}
+ });
+
+ queries.add("query 2", true);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
index 275a975..9273c33 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
@@ -20,6 +20,8 @@ package org.apache.rya.streams.client.command;
import static java.util.Objects.requireNonNull;
+import java.util.concurrent.TimeUnit;
+
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.AddQuery;
@@ -35,6 +37,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -115,8 +118,11 @@ public class AddQueryCommand implements RyaStreamsCommand {
final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
+ //The AddQuery command doesn't use the scheduled service feature.
+ final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS);
+ final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler);
// Execute the add query command.
- try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+ try {
final AddQuery addQuery = new DefaultAddQuery(queryRepo);
try {
final StreamsQuery query = addQuery.addQuery(params.query, Boolean.parseBoolean(params.isActive));
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
index 2aeb90c..0d96df0 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
@@ -21,6 +21,7 @@ package org.apache.rya.streams.client.command;
import static java.util.Objects.requireNonNull;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.DeleteQuery;
@@ -36,6 +37,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -113,8 +115,11 @@ public class DeleteQueryCommand implements RyaStreamsCommand {
final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
+ //The DeleteQuery command doesn't use the scheduled service feature.
+ final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS);
+ final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler);
// Execute the delete query command.
- try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+ try {
final DeleteQuery deleteQuery = new DefaultDeleteQuery(queryRepo);
try {
deleteQuery.delete(UUID.fromString(params.queryId));
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index 670007b..cd78975 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -21,6 +21,7 @@ package org.apache.rya.streams.client.command;
import static java.util.Objects.requireNonNull;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
@@ -35,6 +36,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -83,8 +85,11 @@ public class ListQueriesCommand implements RyaStreamsCommand {
final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
+ //The ListQueries command doesn't use the scheduled service feature.
+ final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS);
+ final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler);
// Execute the list queries command.
- try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+ try {
final ListQueries listQueries = new DefaultListQueries(queryRepo);
try {
final Set<StreamsQuery> queries = listQueries.all();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index 8f7f162..ddaf647 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
@@ -39,6 +40,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -117,8 +119,11 @@ public class RunQueryCommand implements RyaStreamsCommand {
final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
+ //The RunQuery command doesn't use the scheduled service feature.
+ final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS);
+ final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler);
// Look up the query to be executed from the change log.
- try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+ try {
try {
final UUID queryId = UUID.fromString( params.queryId );
final Optional<StreamsQuery> query = queryRepo.get(queryId);
@@ -145,7 +150,7 @@ public class RunQueryCommand implements RyaStreamsCommand {
} catch(final Exception e) {
throw new ExecutionException("Could not execute the Run Query command.", e);
}
- } catch(final ArgumentsException | ExecutionException e) {
+ } catch(final ExecutionException e) {
// Rethrow the exceptions that are advertised by execute.
throw e;
} catch (final Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 7c548f1..3612dd0 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rya.streams.api.entity.QueryResultStream;
@@ -45,6 +46,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -132,9 +134,12 @@ public class StreamResultsCommand implements RyaStreamsCommand {
throw new ArgumentsException("Invalid Query ID " + params.queryId);
}
+ //The DeleteQuery command doesn't use the scheduled service feature.
+ final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS);
+ final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler);
// Fetch the SPARQL of the query whose results will be streamed.
final String sparql;
- try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+ try {
final Optional<StreamsQuery> sQuery = queryRepo.get(queryId);
if(!sQuery.isPresent()) {
throw new ExecutionException("Could not read the results for query with ID " + queryId +
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index 8b4f074..3bfbadc 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
@@ -38,11 +39,12 @@ import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.apache.rya.test.kafka.KafkaTestUtil;
-import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
/**
* integration Test for adding a new query through a command.
*/
@@ -64,12 +66,7 @@ public class AddQueryCommandIT {
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
- queryRepo = new InMemoryQueryRepository(changeLog);
- }
-
- @After
- public void cleanup() throws Exception {
- queryRepo.close();
+ queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index 6083543..7bec080 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotEquals;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
@@ -39,11 +40,12 @@ import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.apache.rya.test.kafka.KafkaTestUtil;
-import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
/**
* Integration Test for deleting a query from Rya Streams through a command.
*/
@@ -66,12 +68,7 @@ public class DeleteQueryCommandIT {
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
- queryRepo = new InMemoryQueryRepository(changeLog);
- }
-
- @After
- public void cleanup() throws Exception {
- queryRepo.close();
+ queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index 1399142..f6ceb75 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -19,6 +19,7 @@
package org.apache.rya.streams.client.command;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
@@ -34,11 +35,12 @@ import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.apache.rya.test.kafka.KafkaTestUtil;
-import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
/**
* integration Test for listing queries through a command.
*/
@@ -60,12 +62,7 @@ public class ListQueryCommandIT {
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
- queryRepo = new InMemoryQueryRepository(changeLog);
- }
-
- @After
- public void cleanup() throws Exception {
- queryRepo.close();
+ queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 3389d6b..7e3b8bc 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
@@ -56,6 +57,7 @@ import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.query.impl.MapBindingSet;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
/**
* Integration tests the methods of {@link RunQueryCommand}.
@@ -81,7 +83,7 @@ public class RunQueryCommandIT {
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
- queryRepo = new InMemoryQueryRepository(changeLog);
+ queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
// Initialize the Statements Producer and the Results Consumer.
stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
@@ -92,7 +94,6 @@ public class RunQueryCommandIT {
public void cleanup() throws Exception{
stmtProducer.close();
resultConsumer.close();
- queryRepo.close();
}
@Test(expected = ExecutionException.class)
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index 9a773f0..5dbd27f 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
@@ -52,6 +53,7 @@ import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.query.impl.MapBindingSet;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
/**
* Integration tests the methods of {@link KafkaRunQuery}.
@@ -83,7 +85,7 @@ public class KafkaRunQueryIT {
final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
// This query is completely in memory, so it doesn't need to be closed.
- final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() );
+ final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS) );
// Add the query to the query repository.
final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);