You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:21:01 UTC
[15/22] incubator-rya git commit: RYA-451 Fixing threading issues
with the QueryManager class.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
index 947a215..3a59636 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -32,10 +32,15 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
import org.apache.rya.streams.kafka.KafkaStreamsFactory.KafkaStreamsFactoryException;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
import org.apache.rya.streams.querymanager.QueryExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -50,6 +55,7 @@ import kafka.consumer.KafkaStream;
*/
@DefaultAnnotation(NonNull.class)
public class LocalQueryExecutor extends AbstractIdleService implements QueryExecutor {
+ private static final Logger log = LoggerFactory.getLogger(LocalQueryExecutor.class);
/**
* Provides thread safety when interacting with this class.
@@ -72,6 +78,11 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec
private final Map<UUID, KafkaStreams> byQueryId = new HashMap<>();
/**
+ * Used to create the input and output topics for a Kafka Streams job.
+ */
+ private final CreateKafkaTopic createKafkaTopic;
+
+ /**
* Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s.
*/
private final KafkaStreamsFactory streamsFactory;
@@ -79,23 +90,31 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec
/**
* Constructs an instance of {@link LocalQueryExecutor}.
*
+ * @param createKafkaTopic - Used to create the input and output topics for a Kafka Streams job. (not null)
* @param streamsFactory - Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s. (not null)
*/
- public LocalQueryExecutor(final KafkaStreamsFactory streamsFactory) {
+ public LocalQueryExecutor(
+ final CreateKafkaTopic createKafkaTopic,
+ final KafkaStreamsFactory streamsFactory) {
+ this.createKafkaTopic = requireNonNull(createKafkaTopic);
this.streamsFactory = requireNonNull(streamsFactory);
}
@Override
protected void startUp() throws Exception {
- // Nothing to do.
+ log.info("Local Query Executor starting up.");
}
@Override
protected void shutDown() throws Exception {
+ log.info("Local Query Executor shutting down. Stopping all jobs...");
+
// Stop all of the running queries.
for(final KafkaStreams job : byQueryId.values()) {
job.close();
}
+
+ log.info("Local Query Executor shut down.");
}
@Override
@@ -106,6 +125,14 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec
lock.lock();
try {
+ // Make sure the Statements topic exists for the query.
+ final Set<String> topics = Sets.newHashSet(
+ KafkaTopics.statementsTopic(ryaInstance),
+ KafkaTopics.queryResultsTopic(query.getQueryId()));
+
+ // Make sure the Query Results topic exists for the query.
+ createKafkaTopic.createTopics(topics, 1, 1);
+
// Setup the Kafka Streams job that will execute.
final KafkaStreams streams = streamsFactory.make(ryaInstance, query);
streams.start();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
index c1285d4..21170bb 100644
--- a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
+++ b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
@@ -30,6 +30,13 @@ under the License.
</xs:choice>
</xs:complexType>
</xs:element>
+ <xs:element name="queryExecutor">
+ <xs:complexType>
+ <xs:choice>
+ <xs:element name="localKafkaStreams" type="localKafkaStreams"/>
+ </xs:choice>
+ </xs:complexType>
+ </xs:element>
<xs:element name="performanceTunning">
<xs:complexType>
<xs:sequence>
@@ -65,6 +72,13 @@ under the License.
</xs:sequence>
</xs:complexType>
+ <!-- Define what a local Kafka Streams query executor looks like. -->
+ <xs:complexType name="localKafkaStreams">
+ <xs:sequence>
+ <xs:element name="zookeepers" type="xs:string"/>
+ </xs:sequence>
+ </xs:complexType>
+
<!-- Define the legal range for a TCP port. -->
<xs:simpleType name="tcpPort">
<xs:restriction base="xs:int">
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
new file mode 100644
index 0000000..da9be78
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent.LogEventType;
+import org.apache.rya.streams.querymanager.QueryManager.LogEventWorkGenerator;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link LogEventWorkGenerator}.
+ */
+public class LogEventWorkGeneratorTest {
+
+ @Test
+ public void shutdownSignalKillsThread() {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the LogEventWorkGenerator work.
+ final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a created change log.
+ final Thread notifyThread = new Thread(() -> {
+ generator.notifyCreate("rya", mock(QueryChangeLog.class));
+ });
+
+ // Fill the queue so that nothing may be offered to it.
+ queue.offer(LogEvent.delete("rya"));
+
+ // Start the thread and show that it is still alive after the offer period.
+ notifyThread.start();
+ assertTrue( ThreadUtil.stillAlive(notifyThread, 200) );
+
+ // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+ shutdownSignal.set(true);
+ assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) );
+ }
+
+ @Test
+ public void notifyCreate() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the LogEventWorkGenerator work.
+ final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a created change log.
+ final CountDownLatch notified = new CountDownLatch(1);
+ final Thread notifyThread = new Thread(() -> {
+ generator.notifyCreate("rya", mock(QueryChangeLog.class));
+ notified.countDown();
+ });
+
+ try {
+ // Start the thread that performs the notification.
+ notifyThread.start();
+
+ // Wait for the thread to indicate it has notified and check the queue for the value.
+ notified.await(200, TimeUnit.MILLISECONDS);
+ final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS);
+ assertEquals(LogEventType.CREATE, event.getEventType());
+ assertEquals("rya", event.getRyaInstanceName());
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+
+ @Test
+ public void notifyDelete() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the LogEventWorkGenerator work.
+ final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a deleted change log.
+ final CountDownLatch notified = new CountDownLatch(1);
+ final Thread notifyThread = new Thread(() -> {
+ generator.notifyDelete("rya");
+ notified.countDown();
+ });
+
+ try {
+ // Start the thread that performs the notification.
+ notifyThread.start();
+
+ // Wait for the thread to indicate it has notified and check the queue for the value.
+ notified.await(200, TimeUnit.MILLISECONDS);
+ final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS);
+ assertEquals(LogEventType.DELETE, event.getEventType());
+ assertEquals("rya", event.getRyaInstanceName());
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
new file mode 100644
index 0000000..cb708ed
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent;
+import org.apache.rya.streams.querymanager.QueryManager.LogEventWorker;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link LogEventWorker}.
+ */
+public class LogEventWorkerTest {
+
+ @Test
+ public void shutdownSignalKillsThread() {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The thread that will perform the LogEventWorker task.
+ final Thread logEventWorker = new Thread(new LogEventWorker(new ArrayBlockingQueue<>(1),
+ new ArrayBlockingQueue<>(1), 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ logEventWorker.start();
+
+ // Wait longer than the poll time to see if the thread died. Show that it is still running.
+ assertTrue(ThreadUtil.stillAlive(logEventWorker, 200));
+
+ // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+ shutdownSignal.set(true);
+ assertFalse(ThreadUtil.stillAlive(logEventWorker, 500));
+ }
+
+ @Test
+ public void nofity_logCreated_doesNotExist() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to feed work.
+ final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The queue work is written to.
+ final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The Query Change Log that will be watched.
+ final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+ // Write a message that indicates a new query should be active.
+ final UUID firstQueryId = UUID.randomUUID();
+ changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+
+ // Write a message that adds an active query, but then makes it inactive. Because both of these
+ // events are written to the log before the worker subscribes to the repository for updates, they
+ // must result in a single query stopped event.
+ final UUID secondQueryId = UUID.randomUUID();
+ changeLog.write(QueryChange.create(secondQueryId, "select * where { ?d ?e ?f . }", true));
+ changeLog.write(QueryChange.update(secondQueryId, false));
+
+ // Start the worker that will be tested.
+ final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+ queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ logEventWorker.start();
+
+ try {
+ // Write a unit of work that indicates a log was created.
+ final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+ logEventQueue.offer(createLogEvent);
+
+ // We must see the following Query Events added to the work queue.
+ // Query 1, executing.
+ // Query 2, stopped.
+ Set<QueryEvent> expectedEvents = new HashSet<>();
+ expectedEvents.add(QueryEvent.executing("rya",
+ new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+ expectedEvents.add(QueryEvent.stopped("rya", secondQueryId));
+
+ Set<QueryEvent> queryEvents = new HashSet<>();
+ queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+ queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+ assertEquals(expectedEvents, queryEvents);
+
+ // Write an event to the change log that stops the first query.
+ changeLog.write(QueryChange.update(firstQueryId, false));
+
+ // Show it was also reflected in the changes.
+ // Query 1, stopped.
+ expectedEvents = new HashSet<>();
+ expectedEvents.add(QueryEvent.stopped("rya", firstQueryId));
+
+ queryEvents = new HashSet<>();
+ queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+ assertEquals(expectedEvents, queryEvents);
+ } finally {
+ shutdownSignal.set(true);
+ logEventWorker.join();
+ }
+ }
+
+ @Test
+ public void nofity_logCreated_exists() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to feed work.
+ final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The queue work is written to.
+ final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The Query Change Log that will be watched.
+ final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+ // Write a message that indicates a new query should be active.
+ final UUID firstQueryId = UUID.randomUUID();
+ changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+
+ // Start the worker that will be tested.
+ final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+ queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ logEventWorker.start();
+
+ try {
+ // Write a unit of work that indicates a log was created.
+ final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+ logEventQueue.offer(createLogEvent);
+
+ // Say the same log was created a second time.
+ logEventQueue.offer(createLogEvent);
+
+ // Show that only a single unit of work was added for the log. This indicates the
+ // second message was effectively skipped as it would have add its work added twice otherwise.
+ final Set<QueryEvent> expectedEvents = new HashSet<>();
+ expectedEvents.add(QueryEvent.executing("rya",
+ new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+
+ final Set<QueryEvent> queryEvents = new HashSet<>();
+ queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );
+
+ assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+ assertEquals(expectedEvents, queryEvents);
+ } finally {
+ shutdownSignal.set(true);
+ logEventWorker.join();
+ }
+ }
+
+ @Test
+ public void notify_logDeleted_exists() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to feed work.
+ final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The queue work is written to.
+ final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The Query Change Log that will be watched.
+ final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+ // Start the worker that will be tested.
+ final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+ queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ logEventWorker.start();
+
+ try {
+ // Write a unit of work that indicates a log was created.
+ final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+ logEventQueue.offer(createLogEvent);
+
+ // Write a unit of work that indicates a log was deleted.
+ logEventQueue.offer(LogEvent.delete("rya"));
+
+ // Show that a single unit of work was created for deleting everything for "rya".
+ assertEquals(QueryEvent.stopALL("rya"), queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+ assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+ } finally {
+ shutdownSignal.set(true);
+ logEventWorker.join();
+ }
+ }
+
+ @Test
+ public void notify_logDeleted_doesNotExist() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to feed work.
+ final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10);
+
+ // The queue work is written to.
+ final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10);
+
+ // Start the worker that will be tested.
+ final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
+ queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ logEventWorker.start();
+
+ try {
+ // Write a unit of work that indicates a log was deleted. Since it was never created,
+ // this will not cause anything to be written to the QueryEvent queue.
+ logEventQueue.offer(LogEvent.delete("rya"));
+
+ // Show that a single unit of work was created for deleting everything for "rya".
+ assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+ } finally {
+ shutdownSignal.set(true);
+ logEventWorker.join();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
new file mode 100644
index 0000000..4495e19
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorkGenerator;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link QueryEventWorkGenerator}.
+ */
+public class QueryEventWorkGeneratorTest {
+
+ @Test
+ public void shutdownSignalKillsThread() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", new CountDownLatch(1), queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a created query.
+ final Thread notifyThread = new Thread(() -> {
+ generator.notify(mock(ChangeLogEntry.class), Optional.empty());
+ });
+
+ // Fill the queue so that nothing may be offered to it.
+ queue.offer(QueryEvent.stopALL("rya"));
+
+ // Start the thread and show that it is still alive after the offer period.
+ notifyThread.start();
+ assertTrue( ThreadUtil.stillAlive(notifyThread, 200) );
+
+ // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+ shutdownSignal.set(true);
+ assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) );
+ }
+
+ @Test
+ public void waitsForSubscriptionWork() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final CountDownLatch latch = new CountDownLatch(1);
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a created query.
+ final UUID queryId = UUID.randomUUID();
+ final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+ final Thread notifyThread = new Thread(() -> {
+ final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+ final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+ generator.notify(entry, Optional.of(query));
+ });
+
+ // Start the thread.
+ notifyThread.start();
+
+ try {
+ // Wait longer than the blocking period and show the thread is still alive and nothing has been added
+ // to the work queue.
+ Thread.sleep(150);
+ assertTrue( notifyThread.isAlive() );
+
+ // Count down the latch.
+ latch.countDown();
+
+ // Show work was added to the queue and the notifying thread died.
+ final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+ final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+ assertEquals(expected, event);
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+
+ @Test
+ public void notifyCreate() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final CountDownLatch latch = new CountDownLatch(1);
+ latch.countDown();
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a created query.
+ final UUID queryId = UUID.randomUUID();
+ final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+ final Thread notifyThread = new Thread(() -> {
+ final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+ final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+ generator.notify(entry, Optional.of(query));
+ });
+
+ // Start the thread.
+ notifyThread.start();
+
+ try {
+ // Show work was added to the queue and the notifying thread died.
+ final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+ final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+ assertEquals(expected, event);
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+
+ @Test
+ public void notifyDelete() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final CountDownLatch latch = new CountDownLatch(1);
+ latch.countDown();
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with a deleted query.
+ final UUID queryId = UUID.randomUUID();
+ final Thread notifyThread = new Thread(() -> {
+ final QueryChange change = QueryChange.delete(queryId);
+ final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+ generator.notify(entry, Optional.empty());
+ });
+
+ // Start the thread.
+ notifyThread.start();
+
+ try {
+ // Show work was added to the queue and the notifying thread died.
+ final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+ final QueryEvent expected = QueryEvent.stopped("rya", queryId);
+ assertEquals(expected, event);
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+
+ @Test
+ public void notifyUpdate_isActive() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final CountDownLatch latch = new CountDownLatch(1);
+ latch.countDown();
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with an update query change.
+ final UUID queryId = UUID.randomUUID();
+ final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+ final Thread notifyThread = new Thread(() -> {
+ final QueryChange change = QueryChange.update(queryId, true);
+ final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+ generator.notify(entry, Optional.of(query));
+ });
+
+ // Start the thread.
+ notifyThread.start();
+
+ try {
+ // Show work was added to the queue and the notifying thread died.
+ final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+ final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+ assertEquals(expected, event);
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+
+ @Test
+ public void notifyUpdate_isNotActive() throws Exception {
+ // The signal that will kill the notifying thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue generated work is offered to.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The listener that will perform the QueryEventWorkGenerator work.
+ final CountDownLatch latch = new CountDownLatch(1);
+ latch.countDown();
+ final QueryEventWorkGenerator generator =
+ new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+ // A thread that will attempt to notify the generator with an update query change.
+ final UUID queryId = UUID.randomUUID();
+ final StreamsQuery query = new StreamsQuery(queryId, "query", false);
+ final Thread notifyThread = new Thread(() -> {
+ final QueryChange change = QueryChange.update(queryId, false);
+ final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
+ generator.notify(entry, Optional.of(query));
+ });
+
+ // Start the thread.
+ notifyThread.start();
+
+ try {
+ // Show work was added to the queue and the notifying thread died.
+ final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+ final QueryEvent expected = QueryEvent.stopped("rya", queryId);
+ assertEquals(expected, event);
+ } finally {
+ shutdownSignal.set(true);
+ notifyThread.join();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
new file mode 100644
index 0000000..33c0719
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorker;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link QueryManager.QueryEventWorker}.
+ */
+public class QueryEventWorkerTest {
+
+ @Test
+ public void shutdownSignalKillsThread() {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The thread that will perform the QueryEventWorker task.
+ final Thread queryEventWorker = new Thread(new QueryEventWorker(new ArrayBlockingQueue<>(1),
+ mock(QueryExecutor.class), 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ queryEventWorker.start();
+
+ // Wait longer than the poll time to see if the thread died. Show that it is still running.
+ assertTrue(ThreadUtil.stillAlive(queryEventWorker, 200));
+
+ // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down.
+ shutdownSignal.set(true);
+ assertFalse(ThreadUtil.stillAlive(queryEventWorker, 1000));
+ }
+
+ @Test
+ public void executingWork() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to send the execute work to the thread.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The message that indicates a query needs to be executed.
+ final String ryaInstance = "rya";
+ final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "sparql", true);
+ final QueryEvent executingEvent = QueryEvent.executing(ryaInstance, query);
+
+ // Release a latch if the startQuery method on the queryExecutor is invoked with the correct values.
+ final CountDownLatch startQueryInvoked = new CountDownLatch(1);
+ final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ doAnswer(invocation -> {
+ startQueryInvoked.countDown();
+ return null;
+ }).when(queryExecutor).startQuery(ryaInstance, query);
+
+ // The thread that will perform the QueryEventWorker task.
+ final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+ queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ try {
+ queryEventWorker.start();
+
+ // Provide a message indicating a query needs to be executing.
+ queue.put(executingEvent);
+
+ // Verify the Query Executor was told to start the query.
+ assertTrue( startQueryInvoked.await(150, TimeUnit.MILLISECONDS) );
+ } finally {
+ shutdownSignal.set(true);
+ queryEventWorker.join();
+ }
+ }
+
+ @Test
+ public void stoppedWork() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to send the execute work to the thread.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The message that indicates a query needs to be stopped.
+ final UUID queryId = UUID.randomUUID();
+ final QueryEvent stoppedEvent = QueryEvent.stopped("rya", queryId);
+
+ // Release a latch if the stopQuery method on the queryExecutor is invoked with the correct values.
+ final CountDownLatch stopQueryInvoked = new CountDownLatch(1);
+ final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ doAnswer(invocation -> {
+ stopQueryInvoked.countDown();
+ return null;
+ }).when(queryExecutor).stopQuery(queryId);
+
+ final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+ queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ try {
+ // The thread that will perform the QueryEventWorker task.
+ queryEventWorker.start();
+
+ // Provide a message indicating a query needs to be executing.
+ queue.put(stoppedEvent);
+
+ // Verify the Query Executor was told to stop the query.
+ assertTrue( stopQueryInvoked.await(150, TimeUnit.MILLISECONDS) );
+ } finally {
+ shutdownSignal.set(true);
+ queryEventWorker.join();
+ }
+ }
+
+ @Test
+ public void stopAllWork() throws Exception {
+ // The signal that will kill the working thread.
+ final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ // The queue used to send the execute work to the thread.
+ final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+ // The message that indicates all queries for a rya instance need to be stopped.
+ final String ryaInstance = "rya";
+ final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
+
+ // Release a latch if the stopQuery method on the queryExecutor is invoked with the correct values.
+ final CountDownLatch testMethodInvoked = new CountDownLatch(1);
+ final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ doAnswer(invocation -> {
+ testMethodInvoked.countDown();
+ return null;
+ }).when(queryExecutor).stopAll(ryaInstance);
+
+ final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+ queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+ try {
+ // The thread that will perform the QueryEventWorker task.
+ queryEventWorker.start();
+
+ // Provide a message indicating a query needs to be executing.
+ queue.put(stopAllEvent);
+
+ // Verify the Query Executor was told to stop all the queries.
+ assertTrue( testMethodInvoked.await(150, TimeUnit.MILLISECONDS) );
+ } finally {
+ shutdownSignal.set(true);
+ queryEventWorker.join();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
index a1203a0..04e70c0 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.rya.streams.querymanager;
@@ -34,13 +36,10 @@ import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
import org.junit.Test;
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
-
/**
- * Test for the {@link QueryManager}
+ * Unit tests the methods of {@link QueryManager}.
*/
public class QueryManagerTest {
- private static final Scheduler TEST_SCHEDULER = Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS);
/**
* Tests when the query manager is notified to create a new query, the query
@@ -74,7 +73,7 @@ public class QueryManagerTest {
return null;
}).when(source).subscribe(any(SourceListener.class));
- final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+ final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
try {
qm.startAndWait();
queryStarted.await(5, TimeUnit.SECONDS);
@@ -128,7 +127,7 @@ public class QueryManagerTest {
return null;
}).when(source).subscribe(any(SourceListener.class));
- final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+ final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
try {
qm.startAndWait();
queryDeleted.await(5, TimeUnit.SECONDS);
@@ -183,7 +182,7 @@ public class QueryManagerTest {
return null;
}).when(source).subscribe(any(SourceListener.class));
- final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+ final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
try {
qm.startAndWait();
queryDeleted.await(10, TimeUnit.SECONDS);
@@ -192,4 +191,4 @@ public class QueryManagerTest {
qm.stopAndWait();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
new file mode 100644
index 0000000..9896e31
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Utilities that are useful for interacting with {@link Thread}s while testing.
+ */
+public class ThreadUtil {
+
+ /**
+ * Private constructor to prevent instantiation.
+ */
+ private ThreadUtil() { }
+
+ /**
+ * A utility function that returns whether a thread is alive or not after waiting
+ * some specified period of time to join it.
+ *
+ * @param thread - The thread that will be joined. (not null)
+ * @param millis - How long to wait to join the thread.
+ * @return {@code true} if the thread is still alive, otherwise {@code false}.
+ */
+ public static boolean stillAlive(final Thread thread, final long millis) {
+ requireNonNull(thread);
+ try {
+ thread.join(millis);
+ } catch (final InterruptedException e) { }
+ return thread.isAlive();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index 3cbe894..f9c8a03 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -35,6 +35,7 @@ import org.apache.rya.streams.api.interactor.LoadStatements;
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
@@ -119,9 +120,10 @@ public class LocalQueryExecutorIT {
expected.add(new VisibilityBindingSet(bs, "a"));
// Start the executor that will be tested.
+ final CreateKafkaTopic createKafkaTopic = new CreateKafkaTopic( kafka.getZookeeperServers() );
final String kafkaServers = kafka.getKafkaHostname() + ":" + kafka.getKafkaPort();
final KafkaStreamsFactory jobFactory = new SingleThreadKafkaStreamsFactory(kafkaServers);
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(createKafkaTopic, jobFactory);
executor.startAndWait();
try {
// Start the query.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
index 0df5794..c0f888e 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
@@ -32,6 +32,7 @@ import java.util.UUID;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
import org.apache.rya.streams.querymanager.QueryExecutor;
import org.junit.Test;
@@ -44,7 +45,7 @@ public class LocalQueryExecutorTest {
@Test(expected = IllegalStateException.class)
public void startQuery_serviceNotStarted() throws Exception {
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true));
}
@@ -60,7 +61,7 @@ public class LocalQueryExecutorTest {
when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Tell the executor to start the query.
@@ -75,14 +76,14 @@ public class LocalQueryExecutorTest {
@Test(expected = IllegalStateException.class)
public void stopQuery_serviceNotStarted() throws Exception {
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.stopQuery(UUID.randomUUID());
}
@Test
public void stopQuery_queryNotRunning() throws Exception {
// Start an executor.
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.startAndWait();
try {
// Try to stop a query that was never stareted.
@@ -104,7 +105,7 @@ public class LocalQueryExecutorTest {
when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Tell the executor to start the query.
@@ -122,7 +123,7 @@ public class LocalQueryExecutorTest {
@Test(expected = IllegalStateException.class)
public void stopAll_serviceNotStarted() throws Exception {
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.stopAll("rya");
}
@@ -141,7 +142,7 @@ public class LocalQueryExecutorTest {
when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(queryJob2);
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Tell the executor to start the queries.
@@ -180,7 +181,7 @@ public class LocalQueryExecutorTest {
when(jobFactory.make(eq(ryaInstance2), eq(query2))).thenReturn(queryJob2);
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Tell the executor to start the queries.
@@ -205,14 +206,14 @@ public class LocalQueryExecutorTest {
@Test(expected = IllegalStateException.class)
public void getRunningQueryIds_serviceNotStarted() throws Exception {
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.getRunningQueryIds();
}
@Test
public void getRunningQueryIds_noneStarted() throws Exception {
// Start an executor.
- final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
executor.startAndWait();
try {
// Get the list of running queries.
@@ -240,7 +241,7 @@ public class LocalQueryExecutorTest {
when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Start the queries.
@@ -275,7 +276,7 @@ public class LocalQueryExecutorTest {
when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
// Start the executor that will be tested.
- final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+ final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
executor.startAndWait();
try {
// Start the queries.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
index f2b50ab..de6b9f3 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
@@ -45,6 +45,11 @@ public class QueryManagerConfigMarshallerTest {
" <port>6</port>\n" +
" </kafka>\n" +
" </queryChangeLogSource>\n" +
+ " <queryExecutor>\n" +
+ " <localKafkaStreams>\n" +
+ " <zookeepers>zoo1,zoo2,zoo3</zookeepers>\n" +
+ " </localKafkaStreams>\n" +
+ " </queryExecutor>\n" +
" <performanceTunning>\n" +
" <queryChanngeLogDiscoveryPeriod>\n" +
" <value>1</value>\n" +