You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:21:02 UTC
[16/22] incubator-rya git commit: RYA-451 Fixing threading issues
with the QueryManager class.
RYA-451 Fixing threading issues with the QueryManager class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/1cd8db32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/1cd8db32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/1cd8db32
Branch: refs/heads/master
Commit: 1cd8db32e04f116934fef22fe9b465f7cd807755
Parents: a342fe2
Author: kchilton2 <ke...@gmail.com>
Authored: Fri Feb 2 22:47:59 2018 -0500
Committer: Valiyil <Pu...@parsons.com>
Committed: Fri Mar 9 12:59:55 2018 -0500
----------------------------------------------------------------------
.../rya/streams/api/entity/StreamsQuery.java | 4 +-
.../api/queries/InMemoryQueryRepository.java | 30 +-
.../rya/streams/api/queries/QueryChange.java | 10 +
.../queries/InMemoryQueryRepositoryTest.java | 16 +-
.../client/command/ListQueriesCommand.java | 12 +-
.../streams/client/command/RunQueryCommand.java | 2 +-
.../apache/rya/streams/kafka/KafkaTopics.java | 4 +-
.../kafka/SingleThreadKafkaStreamsFactory.java | 2 +-
.../kafka/interactor/CreateKafkaTopic.java | 60 ++
.../query-manager/src/main/README.txt | 7 +-
.../src/main/config/configuration.xml | 10 +-
.../query-manager/src/main/config/log4j.xml | 17 +
.../querymanager/QueryChangeLogSource.java | 5 +-
.../rya/streams/querymanager/QueryManager.java | 929 ++++++++++++++++---
.../querymanager/QueryManagerDaemon.java | 8 +-
.../kafka/KafkaQueryChangeLogSource.java | 46 +-
.../querymanager/kafka/LocalQueryExecutor.java | 31 +-
.../src/main/xsd/QueryManagerConfig.xsd | 14 +
.../querymanager/LogEventWorkGeneratorTest.java | 136 +++
.../querymanager/LogEventWorkerTest.java | 245 +++++
.../QueryEventWorkGeneratorTest.java | 265 ++++++
.../querymanager/QueryEventWorkerTest.java | 172 ++++
.../streams/querymanager/QueryManagerTest.java | 41 +-
.../rya/streams/querymanager/ThreadUtil.java | 48 +
.../kafka/LocalQueryExecutorIT.java | 4 +-
.../kafka/LocalQueryExecutorTest.java | 25 +-
.../xml/QueryManagerConfigMarshallerTest.java | 5 +
27 files changed, 1907 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
index 7194834..11423bd 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
@@ -85,7 +85,7 @@ public class StreamsQuery {
}
return false;
}
-
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
@@ -95,7 +95,7 @@ public class StreamsQuery {
sb.append(getSparql() + "\n");
sb.append("Is ");
if (!isActive) {
- sb.append(" Not ");
+ sb.append("Not ");
}
sb.append("Running.\n");
return sb.toString();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index 5fb0297..95c1922 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -51,9 +51,9 @@ import info.aduna.iteration.CloseableIteration;
*/
@DefaultAnnotation(NonNull.class)
public class InMemoryQueryRepository extends AbstractScheduledService implements QueryRepository {
- private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class);
+ private static final Logger log = LoggerFactory.getLogger(InMemoryQueryRepository.class);
- private final ReentrantLock lock = new ReentrantLock();
+ private final ReentrantLock lock = new ReentrantLock(true);
/**
* The change log that is the ground truth for describing what the queries look like.
@@ -198,7 +198,6 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
@Override
protected void shutDown() throws Exception {
- super.shutDown();
lock.lock();
try {
changeLog.close();
@@ -211,11 +210,12 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
* Updates the {@link #queriesCache} to reflect the latest position within the {@link #changeLog}.
*/
private void updateCache() {
- requireNonNull(changeLog);
+ log.trace("updateCache() - Enter");
CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> it = null;
try {
// Iterate over everything since the last position that was handled within the change log.
+ log.debug("Starting cache position:" + cachePosition);
if(cachePosition.isPresent()) {
it = changeLog.readFromPosition(cachePosition.get() + 1);
} else {
@@ -228,6 +228,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
final QueryChange change = entry.getEntry();
final UUID queryId = change.getQueryId();
+ log.debug("Updating the cache to reflect:\n" + change);
+
switch(change.getChangeType()) {
case CREATE:
final StreamsQuery query = new StreamsQuery(
@@ -253,15 +255,17 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
break;
}
+ log.debug("Notifying listeners with the updated state.");
final Optional<StreamsQuery> newQueryState = Optional.ofNullable(queriesCache.get(queryId));
listeners.forEach(listener -> listener.notify(entry, newQueryState));
cachePosition = Optional.of( entry.getPosition() );
+ log.debug("New chache position: " + cachePosition);
}
} catch (final QueryChangeLogException e) {
// Rethrow the exception because the object the supplier tried to create could not be created.
- throw new RuntimeException("Could not initialize the " + InMemoryQueryRepository.class.getName(), e);
+ throw new RuntimeException("Could not update the cache of " + InMemoryQueryRepository.class.getName(), e);
} finally {
// Try to close the iteration if it was opened.
@@ -270,18 +274,22 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
it.close();
}
} catch (final QueryChangeLogException e) {
- LOG.error("Could not close the " + CloseableIteration.class.getName(), e);
+ log.error("Could not close the " + CloseableIteration.class.getName(), e);
}
+
+ log.trace("updateCache() - Exit");
}
}
@Override
protected void runOneIteration() throws Exception {
+ log.trace("runOneIteration() - Enter");
lock.lock();
try {
updateCache();
} finally {
lock.unlock();
+ log.trace("runOneIteration() - Exit");
}
}
@@ -292,17 +300,25 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
@Override
public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener) {
+ log.trace("subscribe(listener) - Enter");
+
//locks to prevent the current state from changing while subscribing.
lock.lock();
+ log.trace("subscribe(listener) - Acquired lock");
try {
listeners.add(listener);
+ log.trace("subscribe(listener) - Listener Registered");
//return the current state of the query repository
- return queriesCache.values()
+ final Set<StreamsQuery> queries = queriesCache.values()
.stream()
.collect(Collectors.toSet());
+ log.trace("subscribe(listener) - Returning " + queries.size() + " existing queries");
+ return queries;
} finally {
+ log.trace("subscribe(listener) - Releasing lock");
lock.unlock();
+ log.trace("subscribe(listener) - Exit");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
index d283957..d34a394 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
@@ -110,6 +110,16 @@ public final class QueryChange implements Serializable {
return false;
}
+ @Override
+ public String toString() {
+ return "QueryChange: {" +
+ " Query ID: " + queryId + ",\n" +
+ " Change Type: " + changeType + ",\n" +
+ " Is Active: " + isActive + ",\n" +
+ " SPARQL: " + sparql + "\n" +
+ "}";
+ }
+
/**
* Create a {@link QueryChange} that represents a new SPARQL query that will be managed by Rya Streams.
*
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 3b3d48a..d7e116b 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -93,13 +93,9 @@ public class InMemoryQueryRepositoryTest {
// Create a new totally in memory QueryRepository.
final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE );
- try {
- // Listing the queries should work using an initialized change log.
- final Set<StreamsQuery> stored = initializedQueries.list();
- assertEquals(expected, stored);
- } finally {
- queries.stop();
- }
+ // Listing the queries should work using an initialized change log.
+ final Set<StreamsQuery> stored = initializedQueries.list();
+ assertEquals(expected, stored);
} finally {
queries.stop();
}
@@ -166,7 +162,7 @@ public class InMemoryQueryRepositoryTest {
final StreamsQuery query = queries.add("query 1", true);
final Set<StreamsQuery> existing = queries.subscribe((queryChangeEvent, newQueryState) -> {
- final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L,
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(1L,
QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
final Optional<StreamsQuery> expectedQueryState = Optional.of(
new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
@@ -197,7 +193,7 @@ public class InMemoryQueryRepositoryTest {
//show listener on repo that query was added to is being notified of the new query.
final CountDownLatch repo1Latch = new CountDownLatch(1);
queries.subscribe((queryChangeEvent, newQueryState) -> {
- final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
final Optional<StreamsQuery> expectedQueryState = Optional.of(
new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
@@ -210,7 +206,7 @@ public class InMemoryQueryRepositoryTest {
//show listener not on the repo that query was added to is being notified as well.
final CountDownLatch repo2Latch = new CountDownLatch(1);
queries2.subscribe((queryChangeEvent, newQueryState) -> {
- final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L,
+ final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
final Optional<StreamsQuery> expectedQueryState = Optional.of(
new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index cd78975..a5507a6 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -23,6 +23,7 @@ import static java.util.Objects.requireNonNull;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.ListQueries;
@@ -112,12 +113,11 @@ public class ListQueriesCommand implements RyaStreamsCommand {
sb.append("Queries in Rya Streams:\n");
sb.append("---------------------------------------------------------\n");
queries.forEach(query -> {
- sb.append("ID: ");
- sb.append(query.getQueryId());
- sb.append("\t\t");
- sb.append("Query: ");
- sb.append(query.getSparql());
- sb.append("\n");
+ sb.append("ID: ").append(query.getQueryId())
+ .append(" ")
+ .append("Is Active: ").append(query.isActive())
+ .append(StringUtils.rightPad("" + query.isActive(), 9))
+ .append("Query: ").append(query.getSparql()).append("\n");
});
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index ddaf647..7b311f6 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -136,7 +136,7 @@ public class RunQueryCommand implements RyaStreamsCommand {
final Set<String> topics = new HashSet<>();
topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
topics.add( KafkaTopics.queryResultsTopic(queryId) );
- KafkaTopics.createTopic(params.zookeeperServers, topics, 1, 1);
+ KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1);
// Run the query that uses those topics.
final KafkaRunQuery runQuery = new KafkaRunQuery(
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index 095465c..989799a 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -55,7 +55,7 @@ public class KafkaTopics {
}
/**
- * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}.
+ * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChangeLog}.
* <p/>
* This is the inverse function of {@link #queryChangeLogTopic(String)}.
*
@@ -106,7 +106,7 @@ public class KafkaTopics {
* @param partitions - The number of partitions that each of the topics will have.
* @param replicationFactor - The replication factor of the topics that are created.
*/
- public static void createTopic(
+ public static void createTopics(
final String zookeeperServers,
final Set<String> topicNames,
final int partitions,
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
index 7ab7e90..8093951 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
@@ -83,7 +83,7 @@ public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory {
try {
final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory());
return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));
- } catch (MalformedQueryException | TopologyBuilderException e) {
+ } catch (final MalformedQueryException | TopologyBuilderException e) {
throw new KafkaStreamsFactoryException("Could not create a KafkaStreams processing topology for query " + query.getQueryId(), e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
new file mode 100644
index 0000000..771e1c8
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Set;
+
+import org.apache.rya.streams.kafka.KafkaTopics;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates topics in Kafka.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CreateKafkaTopic {
+
+ private final String zookeeperServers;
+
+ /**
+ * Constructs an instance of {@link CreateKafkaTopic}.
+ *
+ * @param zookeeperServers - The Zookeeper servers that are used to manage the Kafka instance. (not null)
+ */
+ public CreateKafkaTopic(final String zookeeperServers) {
+ this.zookeeperServers = requireNonNull(zookeeperServers);
+ }
+
+ /**
+ * Creates a set of Kafka topics for each topic that does not already exist.
+ *
+ * @param topicNames - The topics that will be created. (not null)
+ * @param partitions - The number of partitions that each of the topics will have.
+ * @param replicationFactor - The replication factor of the topics that are created.
+ */
+ public void createTopics(
+ final Set<String> topicNames,
+ final int partitions,
+ final int replicationFactor) {
+ KafkaTopics.createTopics(zookeeperServers, topicNames, partitions, replicationFactor);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/README.txt
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/README.txt b/extras/rya.streams/query-manager/src/main/README.txt
index 3b2dbfe..93d5ac5 100644
--- a/extras/rya.streams/query-manager/src/main/README.txt
+++ b/extras/rya.streams/query-manager/src/main/README.txt
@@ -41,8 +41,11 @@ Java 8
yum install -y ${project.artifactId}-${project.version}.noarch.rpm
3. Update the configuration file:
- Replace "[Kafka Broker Hostname]" with the IP address of the Kafka broker.
- Replace the Kafka port if using something other than the default of 9092.
+ A. Replace "[Zookeepers used to manage Kafka. E.g.: zoo1,zoo2,zoo3]" with
+ the zookeepers used to manage the Kafka cluster. It is a comma separated
+ list.
+ B. Replace "[Kafka Broker Hostname]" with the IP address of the Kafka broker.
+ C. Replace the Kafka port if using something other than the default of 9092.
4. Start the service:
systemctl start rya-streams-query-manager.service
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/config/configuration.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/config/configuration.xml b/extras/rya.streams/query-manager/src/main/config/configuration.xml
index 96da501..7077125 100644
--- a/extras/rya.streams/query-manager/src/main/config/configuration.xml
+++ b/extras/rya.streams/query-manager/src/main/config/configuration.xml
@@ -18,7 +18,7 @@ specific language governing permissions and limitations
under the License.
-->
<queryManagerConfig>
- <!-- The Query Change Log Sources. The source defines a system where Rya
+ <!-- The Query Change Log Sources. The source defines a system where Rya
- Streams Query Change Logs are managed. The query manager will manage
- queries for all Rya instances whose change logs are stored within the
- source.
@@ -29,6 +29,14 @@ under the License.
<port>9092</port>
</kafka>
</queryChangeLogSource>
+
+ <!-- The Query Executor. The executor defines a system for executing the
+ Rya Streams queries. -->
+ <queryExecutor>
+ <localKafkaStreams>
+ <zookeepers>[Zookeepers used to manage Kafka. E.g.: zoo1,zoo2,zoo3]</zookeepers>
+ </localKafkaStreams>
+ </queryExecutor>
<!-- This section defines performance related tuning values. Sensible
- default have been provided to simplify configuration.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/config/log4j.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/config/log4j.xml b/extras/rya.streams/query-manager/src/main/config/log4j.xml
index 2021638..96ea56c 100644
--- a/extras/rya.streams/query-manager/src/main/config/log4j.xml
+++ b/extras/rya.streams/query-manager/src/main/config/log4j.xml
@@ -28,6 +28,23 @@ under the License.
</layout>
</appender>
+ <!-- Kafka configuration configs are loud. -->
+ <logger name="org.apache.kafka.streams.StreamsConfig">
+ <level value="OFF"/>
+ </logger>
+ <logger name="org.apache.kafka.clients.consumer.ConsumerConfig">
+ <level value="OFF"/>
+ </logger>
+ <logger name="org.apache.kafka.clients.producer.ProducerConfig">
+ <level value="OFF"/>
+ </logger>
+
+ <!-- Change this level to DEBUG to see more information about what the
+ QueryManager is doing. -->
+ <logger name="org.apache.rya.streams.querymanager.QueryManager">
+ <level value="INFO"/>
+ </logger>
+
<root>
<level value="INFO" />
<appender-ref ref="console" />
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
index 73e5d12..eb5ca08 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java
@@ -51,8 +51,9 @@ public interface QueryChangeLogSource extends Service {
public void unsubscribe(final SourceListener listener);
/**
- * A listener that is notified when a {@link QueryChangeLog} has
- * been added or removed from a {@link QueryChangeLogSource}.
+ * A listener that is notified when a {@link QueryChangeLog} has been added or
+ * removed from a {@link QueryChangeLogSource}. The listener receives the only
+ * copy of the change log and is responsible for shutting it down.
*/
@DefaultAnnotation(NonNull.class)
public interface SourceListener {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
index 30b4538..e6bd800 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.rya.streams.querymanager;
@@ -20,16 +22,23 @@ import static java.util.Objects.requireNonNull;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.queries.ChangeLogEntry;
import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
import org.apache.rya.streams.api.queries.QueryChange;
-import org.apache.rya.streams.api.queries.QueryChange.ChangeType;
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryChangeLogListener;
import org.apache.rya.streams.api.queries.QueryRepository;
@@ -38,8 +47,10 @@ import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -51,205 +62,823 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* instances/rya streams instances.
*/
@DefaultAnnotation(NonNull.class)
-public class QueryManager extends AbstractIdleService {
- private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class);
+public class QueryManager extends AbstractService {
+ private static final Logger log = LoggerFactory.getLogger(QueryManager.class);
+
+ /**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific
+ * Rya instnace.
+ */
+ private final QueryChangeLogSource changeLogSource;
+ /**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
private final QueryExecutor queryExecutor;
- private final Scheduler scheduler;
/**
- * Map of Rya Instance name to {@link QueryRepository}.
+ * How long blocking operations will be attempted before potentially trying again.
*/
- private final Map<String, QueryRepository> queryRepos = new HashMap<>();
+ private final long blockingValue;
- private final ReentrantLock lock = new ReentrantLock();
+ /**
+ * The units for {@link #blockingValue}.
+ */
+ private final TimeUnit blockingUnits;
- private final QueryChangeLogSource source;
+ /**
+ * Used to inform threads that the application is shutting down, so they must stop work.
+ */
+ private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ /**
+ * This thread pool manages the two thread used to work the {@link LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+ private final ExecutorService executor = Executors.newFixedThreadPool(2);
/**
* Creates a new {@link QueryManager}.
*
* @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null)
* @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null)
- * @param scheduler - The {@link Scheduler} used to discover query changes
- * within the {@link QueryChangeLog}s (not null)
+ * @param blockingValue - How long blocking operations will try before looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not null)
*/
- public QueryManager(final QueryExecutor queryExecutor, final QueryChangeLogSource source, final Scheduler scheduler) {
- this.source = requireNonNull(source);
+ public QueryManager(
+ final QueryExecutor queryExecutor,
+ final QueryChangeLogSource source,
+ final long blockingValue,
+ final TimeUnit blockingUnits) {
+ this.changeLogSource = requireNonNull(source);
this.queryExecutor = requireNonNull(queryExecutor);
- this.scheduler = requireNonNull(scheduler);
+ Preconditions.checkArgument(blockingValue > 0, "The blocking value must be > 0. Was: " + blockingValue);
+ this.blockingValue = blockingValue;
+ this.blockingUnits = requireNonNull(blockingUnits);
+ }
+
+ @Override
+ protected void doStart() {
+ log.info("Starting a QueryManager.");
+
+ // A work queue of discovered Query Change Logs that need to be handled.
+ // This queue exists so that the source notifying thread may be released
+ // immediately instead of calling into blocking functions.
+ final BlockingQueue<LogEvent> logEvents = new ArrayBlockingQueue<>(1024);
+
+ // A work queue of discovered Query Changes from the monitored Query Change Logs
+ // that need to be handled. This queue exists so that the Query Repository notifying
+ // thread may be released immediately instead of calling into blocking functions.
+ final BlockingQueue<QueryEvent> queryEvents = new ArrayBlockingQueue<>(1024);
+
+ try {
+ // Start up a LogEventWorker using the executor service.
+ executor.submit(new LogEventWorker(logEvents, queryEvents, blockingValue, blockingUnits, shutdownSignal));
+
+ // Start up a QueryEvent Worker using the executor service.
+ executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal));
+
+ // Start up the query execution framework.
+ queryExecutor.startAndWait();
+
+ // Startup the source that discovers new Query Change Logs.
+ changeLogSource.startAndWait();
+
+ // Subscribe the source a listener that writes to the LogEventWorker's work queue.
+ changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal));
+ } catch(final RejectedExecutionException | UncheckedExecutionException e) {
+ log.error("Could not start up a QueryManager.", e);
+ notifyFailed(e);
+ }
+
+ // Notify the service was successfully started.
+ notifyStarted();
+
+ log.info("QueryManager has finished starting.");
+ }
+
+ @Override
+ protected void doStop() {
+ log.info("Stopping a QueryManager.");
+
+ // Set the shutdown flag so that all components that rely on that signal will stop processing.
+ shutdownSignal.set(true);
+
+ // Stop the workers and wait for them to die.
+ executor.shutdownNow();
+ try {
+ if(!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
+ }
+ } catch (final InterruptedException e) {
+ log.warn("Waited 10 seconds for the worker threads to die, but they are still running.");
+ }
+
+ // Stop the source of new Change Logs.
+ try {
+ changeLogSource.stopAndWait();
+ } catch(final UncheckedExecutionException e) {
+ log.warn("Could not stop the Change Log Source.", e);
+ }
+
+ // Stop the query execution framework.
+ try {
+ queryExecutor.stopAndWait();
+ } catch(final UncheckedExecutionException e) {
+ log.warn("Could not stop the Query Executor", e);
+ }
+
+ // Notify the service was successfully stopped.
+ notifyStopped();
+
+ log.info("QueryManager has finished stopping.");
}
/**
- * Starts running a query.
+ * Offer a unit of work to a blocking queue until it is either accepted, or the
+ * shutdown signal is set.
*
- * @param ryaInstanceName - The Rya instance the query belongs to. (not null)
- * @param query - The query to run.(not null)
+ * @param workQueue - The blocking work queue to write to. (not null)
+ * @param event - The event that will be offered to the work queue. (not null)
+ * @param offerValue - How long to wait when offering new work.
+ * @param offerUnits - The unit for the {@code offerValue}. (not null)
+ * @param shutdownSignal - Used to signal application shutdown has started, so
+ * this method may terminate without ever placing the event on the queue. (not null)
+ * @return {@code true} if the evet nwas added to the queue, otherwise false.
*/
- private void runQuery(final String ryaInstanceName, final StreamsQuery query) {
- requireNonNull(ryaInstanceName);
- requireNonNull(query);
- LOG.info("Starting Query: " + query.toString() + " on the rya instance: " + ryaInstanceName);
+ private static <T> boolean offerUntilAcceptedOrShutdown(
+ final BlockingQueue<T> workQueue,
+ final T event,
+ final long offerValue,
+ final TimeUnit offerUnits,
+ final AtomicBoolean shutdownSignal) {
+ requireNonNull(workQueue);
+ requireNonNull(event);
+ requireNonNull(shutdownSignal);
- try {
- queryExecutor.startQuery(ryaInstanceName, query);
- } catch (final QueryExecutorException e) {
- LOG.error("Failed to start query.", e);
+ boolean submitted = false;
+ while(!submitted && !shutdownSignal.get()) {
+ try {
+ submitted = workQueue.offer(event, offerValue, offerUnits);
+ if(!submitted) {
+ log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
+ }
+ } catch (final InterruptedException e) {
+ log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again...");
+ }
}
+ return submitted;
}
/**
- * Stops the specified query from running.
- *
- * @param queryId - The ID of the query to stop running. (not null)
+ * An observation that a {@link QueryChangeLog} was created within or
+ * removed from a {@link QueryChangeLogSource}.
*/
- private void stopQuery(final UUID queryId) {
- requireNonNull(queryId);
+ @DefaultAnnotation(NonNull.class)
+ static class LogEvent {
+
+ /**
+ * The types of events that may be observed.
+ */
+ static enum LogEventType {
+ /**
+ * A {@link QueryChangeLog} was created within a {@link QueryChangeLogSource}.
+ */
+ CREATE,
+
+ /**
+ * A {@link QueryChangeLog} was deleted from a {@link QueryChangeLogSource}.
+ */
+ DELETE;
+ }
- LOG.info("Stopping query: " + queryId.toString());
+ private final String ryaInstance;
+ private final LogEventType eventType;
+ private final Optional<QueryChangeLog> log;
- try {
- queryExecutor.stopQuery(queryId);
- } catch (final QueryExecutorException e) {
- LOG.error("Failed to stop query.", e);
+ /**
+ * Constructs an instance of {@link LogEvent}.
+ *
+ * @param ryaInstance - The Rya Instance the log is/was for. (not null)
+ * @param eventType - The type of event that was observed. (not null)
+ * @param log - The log if this is a create event. (not null)
+ */
+ private LogEvent(final String ryaInstance, final LogEventType eventType, final Optional<QueryChangeLog> log) {
+ this.ryaInstance = requireNonNull(ryaInstance);
+ this.eventType = requireNonNull(eventType);
+ this.log = requireNonNull(log);
+ }
+
+ /**
+ * @return The Rya Instance whose log was either created or deleted.
+ */
+ public String getRyaInstanceName() {
+ return ryaInstance;
+ }
+
+ /**
+ * @return The type of event that was observed.
+ */
+ public LogEventType getEventType() {
+ return eventType;
+ }
+
+ /**
+ * @return The {@link QueryChangeLog} if this is a CREATE event.
+ */
+ public Optional<QueryChangeLog> getQueryChangeLog() {
+ return log;
+ }
+
+ @Override
+ public String toString() {
+ return "LogEvent {\n" +
+ " Rya Instance: " + ryaInstance + ",\n" +
+ " Event Type: " + eventType + "\n" +
+ "}";
+ }
+
+ /**
+ * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was created within a
+ * {@link QueryChangeLogSource}.
+ *
+ * @param ryaInstance - The Rya Instance the created log is for. (not null)
+ * @param log - The created {@link QueryChangeLog. (not null)
+ * @return A {@link LogEvent} built using the provided values.
+ */
+ public static LogEvent create(final String ryaInstance, final QueryChangeLog log) {
+ return new LogEvent(ryaInstance, LogEventType.CREATE, Optional.of(log));
+ }
+
+ /**
+ * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was deleted from
+ * a {@link QueryChangeLogSource}.
+ *
+ * @param ryaInstance - The Rya Instance whose log was deleted. (not null)
+ * @return A {@link LogEvent} built using the provided values.
+ */
+ public static LogEvent delete(final String ryaInstance) {
+ return new LogEvent(ryaInstance, LogEventType.DELETE, Optional.empty());
}
}
- @Override
- protected void startUp() throws Exception {
- lock.lock();
- try {
- LOG.info("Starting Query Manager.");
- queryExecutor.startAndWait();
- source.startAndWait();
+ /**
+ * An observation that a {@link StreamsQuery} needs to be executing or not
+ * via the provided {@link QueryExecutor}.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class QueryEvent {
+
+ /**
+ * The type of events that may be observed.
+ */
+ public static enum QueryEventType {
+ /**
+ * Indicates a {@link StreamsQuery} needs to be executing.
+ */
+ EXECUTING,
+
+ /**
+ * Indicates a {@link StreamsQuery} needs to be stopped.
+ */
+ STOPPED,
+
+ /**
+ * Indicates all {@link StreamsQuery}s for a Rya instance need to be stopped.
+ */
+ STOP_ALL;
+ }
+
+ private final String ryaInstance;
+ private final QueryEventType type;
+ private final Optional<UUID> queryId;
+ private final Optional<StreamsQuery> query;
+
+ /**
+ * Constructs an instance of {@link QueryEvent}.
+ *
+ * @param ryaInstance - The Rya instance that generated the event. (not null)
+ * @param type - Indicates whether the query needs to be executing or not. (not null)
+ * @param queryId - If stopped, the ID of the query that must not be running. (not null)
+ * @param query - If executing, the StreamsQuery that defines what should be executing. (not null)
+ */
+ private QueryEvent(
+ final String ryaInstance,
+ final QueryEventType type,
+ final Optional<UUID> queryId,
+ final Optional<StreamsQuery> query) {
+ this.ryaInstance = requireNonNull(ryaInstance);
+ this.type = requireNonNull(type);
+ this.queryId = requireNonNull(queryId);
+ this.query = requireNonNull(query);
+ }
+
+ /**
+ * @return The Rya instance that generated the event.
+ */
+ public String getRyaInstance() {
+ return ryaInstance;
+ }
+
+ /**
+ * @return Indicates whether the query needs to be executing or not.
+ */
+ public QueryEventType getType() {
+ return type;
+ }
+
+ /**
+ * @return If stopped, the ID of the query that must not be running. Otherwise absent.
+ */
+ public Optional<UUID> getQueryId() {
+ return queryId;
+ }
+
+ /**
+ * @return If executing, the StreamsQuery that defines what should be executing. Otherwise absent.
+ */
+ public Optional<StreamsQuery> getStreamsQuery() {
+ return query;
+ }
- // subscribe to the sources to be notified of changes.
- source.subscribe(new QueryManagerSourceListener());
- } finally {
- lock.unlock();
+ @Override
+ public int hashCode() {
+ return Objects.hash(ryaInstance, type, queryId, query);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if(o instanceof QueryEvent) {
+ final QueryEvent other = (QueryEvent) o;
+ return Objects.equals(ryaInstance, other.ryaInstance) &&
+ Objects.equals(type, other.type) &&
+ Objects.equals(queryId, other.queryId) &&
+ Objects.equals(query, other.query);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder string = new StringBuilder();
+ string.append("Query Event {\n")
+ .append(" Rya Instance: ").append(ryaInstance).append(",\n")
+ .append(" Type: ").append(type).append(",\n");
+ switch(type) {
+ case EXECUTING:
+ append(string, query.get());
+ break;
+ case STOPPED:
+ string.append(" Query ID: ").append(queryId.get()).append("\n");
+ break;
+ case STOP_ALL:
+ break;
+ default:
+ // Default to showing everything that is in the object.
+ string.append(" Query ID: ").append(queryId.get()).append("\n");
+ append(string, query.get());
+ break;
+ }
+ string.append("}");
+ return string.toString();
+ }
+
+ private void append(final StringBuilder string, final StreamsQuery query) {
+ requireNonNull(string);
+ requireNonNull(query);
+ string.append(" Streams Query {\n")
+ .append(" Query ID: ").append(query.getQueryId()).append(",\n")
+ .append(" Is Active: ").append(query.isActive()).append(",\n")
+ .append(" SPARQL: ").append(query.getSparql()).append("\n")
+ .append(" }");
+ }
+
+ /**
+ * Create a {@link QueryEvent} that indicates a query needs to be executing.
+ *
+ * @param ryaInstance - The Rya instance that generated the event. (not null)
+ * @param query - The StreamsQuery that defines what should be executing. (not null)
+ * @return A {@link QueryEvent} built using the provided values.
+ */
+ public static QueryEvent executing(final String ryaInstance, final StreamsQuery query) {
+ return new QueryEvent(ryaInstance, QueryEventType.EXECUTING, Optional.empty(), Optional.of(query));
+ }
+
+ /**
+ * Create a {@link QueryEvent} that indicates a query needs to be stopped.
+ *
+ * @param ryaInstance - The Rya instance that generated the event. (not null)
+ * @param queryId - The ID of the query that must not be running. (not null)
+ * @return A {@link QueryEvent} built using the provided values.
+ */
+ public static QueryEvent stopped(final String ryaInstance, final UUID queryId) {
+ return new QueryEvent(ryaInstance, QueryEventType.STOPPED, Optional.of(queryId), Optional.empty());
+ }
+
+ /**
+ * Create a {@link QueryEvent} that indicates all queries for a Rya instance needs to be stopped.
+ *
+ * @param ryaInstance - The Rya instance that generated the event. (not null)
+ * @return A {@link QueryEvent} built using the provided values.
+ */
+ public static QueryEvent stopALL(final String ryaInstance) {
+ return new QueryEvent(ryaInstance, QueryEventType.STOP_ALL, Optional.empty(), Optional.empty());
}
}
- @Override
- protected void shutDown() throws Exception {
- lock.lock();
- try {
- LOG.info("Stopping Query Manager.");
- source.stopAndWait();
- queryExecutor.stopAndWait();
- } finally {
- lock.unlock();
+ /**
+ * Listens to a {@link QueryChangeLogSource} and adds observations to the provided
+ * work queue. It does so until the provided shutdown signal is set.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class LogEventWorkGenerator implements SourceListener {
+
+ private final BlockingQueue<LogEvent> workQueue;
+ private final AtomicBoolean shutdownSignal;
+ private final long offerValue;
+ private final TimeUnit offerUnits;
+
+ /**
+ * Constructs an instance of {@link QueryManagerSourceListener}.
+ *
+ * @param workQueue - A blocking queue that will have {@link LogEvent}s offered to it. (not null)
+ * @param offerValue - How long to wait when offering new work.
+ * @param offerUnits - The unit for the {@code offerValue}. (not null)
+ * @param shutdownSignal - Indicates to this listener that it needs to stop adding events
+ * to the work queue because the application is shutting down. (not null)
+ */
+ public LogEventWorkGenerator(
+ final BlockingQueue<LogEvent> workQueue,
+ final long offerValue,
+ final TimeUnit offerUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.workQueue = requireNonNull(workQueue);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
+ this.offerValue = offerValue;
+ this.offerUnits = requireNonNull(offerUnits);
+ }
+
+ @Override
+ public void notifyCreate(final String ryaInstanceName, final QueryChangeLog changeLog) {
+ log.info("A new Query Change Log has been discovered for Rya Instance " + ryaInstanceName + ". All " +
+ "queries that are set to active within it will be started.");
+
+ // Create an event that summarizes this notification.
+ final LogEvent event = LogEvent.create(ryaInstanceName, changeLog);
+
+ // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+ offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
+ }
+
+ @Override
+ public void notifyDelete(final String ryaInstanceName) {
+ log.info("The Query Change Log for Rya Instance " + ryaInstanceName + " has been deleted. All of the " +
+ "queries related to that instance will be stopped.");
+
+ // Create an event that summarizes this notification.
+ final LogEvent event = LogEvent.delete(ryaInstanceName);
+
+ // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+ offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal);
}
}
/**
- * An implementation of {@link QueryChangeLogListener} for the
- * {@link QueryManager}.
- * <p>
- * When notified of a {@link ChangeType} performs one of the following:
- * <li>{@link ChangeType#CREATE}: Creates a new query using the
- * {@link QueryExecutor} provided to the {@link QueryManager}</li>
- * <li>{@link ChangeType#DELETE}: Deletes a running query by stopping the
- * {@link QueryExecutor} service of the queryID in the event</li>
- * <li>{@link ChangeType#UPDATE}: If the query is running and the update is
- * to stop the query, stops the query. Otherwise, if the query is not
- * running, it is removed.</li>
+ * Processes a work queue of {@link LogEvent}s.
+ * <p/>
+ * Whenever a new log has been created, then it registers a {@link QueryEventWorkGenerator}
+ * that generates {@link QueryEvent}s based on the content and updates to the discovered
+ * {@link QueryChagneLog}.
+ * <p/>
+ * Whenever a log is deleted, then the generator is stopped and a stop all {@link QueryEvent}
+ * is written to the work queue.
*/
- private class QueryExecutionForwardingListener implements QueryChangeLogListener {
- private final String ryaInstanceName;
+ @DefaultAnnotation(NonNull.class)
+ static class LogEventWorker implements Runnable {
/**
- * Creates a new {@link QueryExecutionForwardingListener}.
+ * A map of Rya Instance name to he Query Repository for that instance.
+ */
+ private final Map<String, QueryRepository> repos = new HashMap<>();
+
+ private final BlockingQueue<LogEvent> logWorkQueue;
+ private final BlockingQueue<QueryEvent> queryWorkQueue;
+ private final long blockingValue;
+ private final TimeUnit blockingUnits;
+ private final AtomicBoolean shutdownSignal;
+
+ /**
+ * Constructs an instance of {@link LogEventWorker}.
*
- * @param ryaInstanceName - The rya instance the query change is
- * performed on. (not null)
+ * @param logWorkQueue - A queue of {@link LogEvent}s that will be worked by this object. (not null)
+ * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
+ * @param blockingValue - How long to wait when polling/offering new work.
+ * @param blockingUnits - The unit for the {@code blockingValue}. (not null)
+ * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
+ * may exit the {@link #run()} method. (not null)
*/
- public QueryExecutionForwardingListener(final String ryaInstanceName) {
- this.ryaInstanceName = requireNonNull(ryaInstanceName);
+ public LogEventWorker(
+ final BlockingQueue<LogEvent> logWorkQueue,
+ final BlockingQueue<QueryEvent> queryWorkQueue,
+ final long blockingValue,
+ final TimeUnit blockingUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.logWorkQueue = requireNonNull(logWorkQueue);
+ this.queryWorkQueue = requireNonNull(queryWorkQueue);
+ this.blockingValue = blockingValue;
+ this.blockingUnits = requireNonNull(blockingUnits);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
}
@Override
- public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
- LOG.debug("New query change event.");
- final QueryChange entry = queryChangeEvent.getEntry();
+ public void run() {
+ // Run until the shutdown signal is set.
+ while(!shutdownSignal.get()) {
+ try {
+ // Pull a unit of work from the queue.
+ log.debug("LogEventWorker - Polling the work queue for a new LogEvent.");
+ final LogEvent logEvent = logWorkQueue.poll(blockingValue, blockingUnits);
+ if(logEvent == null) {
+ // Poll again if nothing was found.
+ continue;
+ }
- lock.lock();
- try {
+ log.info("LogEventWorker - handling: \n" + logEvent);
+ final String ryaInstance = logEvent.getRyaInstanceName();
- switch (entry.getChangeType()) {
- case CREATE:
- if(!newQueryState.isPresent()) {
- LOG.error("The query with ID: " + entry.getQueryId() + " must be present with the change to be created.");
- LOG.debug("newQueryState is not allowed to be absent with a CREATE QueryChange, there might be a bug in the QueryRepository.");
- } else {
- runQuery(ryaInstanceName, newQueryState.get());
- }
- break;
- case DELETE:
- stopQuery(entry.getQueryId());
- break;
- case UPDATE:
- if (!newQueryState.isPresent()) {
- LOG.error("The query with ID: " + entry.getQueryId() + " must be provided with the update, cannot perform update.");
- LOG.debug("newQueryState is not allowed to be absent with a UPDATE QueryChange, there might be a bug in the QueryRepository.");
- } else {
- final StreamsQuery updatedQuery = newQueryState.get();
- if (updatedQuery.isActive()) {
- runQuery(ryaInstanceName, updatedQuery);
- LOG.info("Starting query: " + updatedQuery.toString());
- } else {
- stopQuery(updatedQuery.getQueryId());
- LOG.info("Stopping query: " + updatedQuery.toString());
+ switch(logEvent.getEventType()) {
+ case CREATE:
+ // If we see a create message for a Rya Instance we are already maintaining,
+ // then don't do anything.
+ if(repos.containsKey(ryaInstance)) {
+ log.warn("LogEventWorker - A repository is already being managed for the Rya Instance " +
+ ryaInstance + ". This message will be ignored.");
+ continue;
}
- }
- break;
+
+ // Create and start a QueryRepository for the discovered log. Hold onto the repository
+ // so that it may be shutdown later.
+ final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits);
+ final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler);
+ repo.startAndWait();
+ repos.put(ryaInstance, repo);
+
+ // Subscribe a worker that adds the Query Events to the queryWorkQueue queue.
+ // A count down latch is used to ensure the returned set of queries are handled
+ // prior to any notifications from the repository.
+ final CountDownLatch subscriptionWorkFinished = new CountDownLatch(1);
+ final QueryEventWorkGenerator queryWorkGenerator =
+ new QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue,
+ blockingValue, blockingUnits, shutdownSignal);
+
+ log.debug("LogEventWorker - Setting up a QueryWorkGenerator...");
+ final Set<StreamsQuery> queries = repo.subscribe(queryWorkGenerator);
+ log.debug("LogEventWorker - Finished setting up a QueryWorkGenerator.");
+
+ // Handle the view of the queries within the repository as it existed when
+ // the subscription was registered.
+ queries.stream()
+ .forEach(query -> {
+ // Create a QueryEvent that represents the active state of the existing query.
+ final QueryEvent queryEvent = query.isActive() ?
+ QueryEvent.executing(ryaInstance, query) : QueryEvent.stopped(ryaInstance, query.getQueryId());
+ log.debug("LogEventWorker - offering: " + queryEvent);
+
+ // Offer it to the worker until there is room for it in the work queue, or we are shutting down.
+ offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue, blockingUnits, shutdownSignal);
+ });
+
+ // Indicate the subscription work is finished so that the registered listener may start
+ // adding work to the queue.
+ log.info("LogEventWorker - Counting down the subscription work latch.");
+ subscriptionWorkFinished.countDown();
+ break;
+
+ case DELETE:
+ if(repos.containsKey(ryaInstance)) {
+ // Shut down the query repository for the Rya instance. This ensures the listener will
+ // not receive any more work that needs to be done.
+ final QueryRepository deletedRepo = repos.remove(ryaInstance);
+ deletedRepo.stopAndWait();
+
+ // Add work that stops all of the queries related to the instance.
+ final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
+ offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue, blockingUnits, shutdownSignal);
+ }
+ break;
+ }
+ } catch (final InterruptedException e) {
+ log.debug("LogEventWorker did not see any new events over the past 5 seconds. Polling again...");
}
- } finally {
- lock.unlock();
}
+
+ log.info("LogEventWorker shutting down...");
+
+ // Shutdown all of the QueryRepositories that were started.
+ repos.values().forEach(repo -> repo.stopAndWait());
+
+ log.info("LogEventWorker shut down.");
}
}
/**
- * Listener used by the {@link QueryManager} to be notified when
- * {@link QueryChangeLog}s are created or deleted.
+ * Listens to a {@link QueryRepository} and adds observations to the provided work queue.
+ * It does so until the provided shutdown signal is set.
*/
- private class QueryManagerSourceListener implements SourceListener {
+ @DefaultAnnotation(NonNull.class)
+ static class QueryEventWorkGenerator implements QueryChangeLogListener {
+
+ private final String ryaInstance;
+ private final CountDownLatch subscriptionWorkFinished;
+ private final BlockingQueue<QueryEvent> queryWorkQueue;
+ private final long blockingValue;
+ private final TimeUnit blockingUnits;
+ private final AtomicBoolean shutdownSignal;
+
+ /**
+ * Constructs an instance of {@link QueryEventWorkGenerator}.
+ *
+ * @param ryaInstance - The rya instance whose log this objects is watching. (not null)
+ * @param subscriptionWorkFinished - Indicates when work that needs to be completed before this
+ * listener handles notifications is completed. (not null)
+ * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null)
+ * @param blockingValue - How long to wait when polling/offering new work.
+ * @param blockingUnits - The unit for the {@code blockingValue}. (not null)
+ * @param shutdownSignal - Indicates to this listener that it needs to stop adding events
+ * to the work queue because the application is shutting down. (not null)
+ */
+ public QueryEventWorkGenerator(
+ final String ryaInstance,
+ final CountDownLatch subscriptionWorkFinished,
+ final BlockingQueue<QueryEvent> queryWorkQueue,
+ final long blockingValue,
+ final TimeUnit blockingUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.ryaInstance = requireNonNull(ryaInstance);
+ this.subscriptionWorkFinished = requireNonNull(subscriptionWorkFinished);
+ this.queryWorkQueue = requireNonNull(queryWorkQueue);
+ this.blockingValue = blockingValue;
+ this.blockingUnits = requireNonNull(blockingUnits);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
+ }
+
@Override
- public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
- lock.lock();
+ public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
+ requireNonNull(queryChangeEvent);
+ requireNonNull(newQueryState);
+
+ // Wait for the subscription work to be finished.
try {
- LOG.info("Discovered new Query Change Log for Rya Instance " + ryaInstanceName + ".");
- final QueryRepository repo = new InMemoryQueryRepository(log, scheduler);
- repo.startAndWait();
- final Set<StreamsQuery> queries = repo.subscribe(new QueryExecutionForwardingListener(ryaInstanceName));
- queries.forEach(query -> {
- if (query.isActive()) {
- try {
- queryExecutor.startQuery(ryaInstanceName, query);
- } catch (IllegalStateException | QueryExecutorException e) {
- LOG.error("Unable to start query for rya instance " + ryaInstanceName, e);
+ log.debug("Waiting for Subscription Work Finished latch to release...");
+ while(!shutdownSignal.get() && !subscriptionWorkFinished.await(blockingValue, blockingUnits)) {
+ log.debug("Still waiting...");
+ }
+ log.debug("Subscription Work Finished latch to released.");
+ } catch (final InterruptedException e) {
+ log.warn("Interrupted while waiting for the Subscription Work Finished latch to be " +
+ "released. Shutting down?", e);
+ }
+
+ // If we left the loop because of a shutdown, return immediately.
+ if(shutdownSignal.get()) {
+ log.debug("Not processing notification. Shutting down.");
+ return;
+ }
+
+ // Generate work from the notification.
+ final QueryChange change = queryChangeEvent.getEntry();
+ switch(change.getChangeType()) {
+ case CREATE:
+ if(newQueryState.isPresent()) {
+ log.info("Rya Instance " + ryaInstance + " created Rya Streams query " + newQueryState + ".");
+ final StreamsQuery newQuery = newQueryState.get();
+ if(newQuery.isActive()) {
+ final QueryEvent executeNewQuery = QueryEvent.executing(ryaInstance, newQuery);
+ offerUntilAcceptedOrShutdown(queryWorkQueue, executeNewQuery, blockingValue, blockingUnits, shutdownSignal);
}
+ } else {
+ log.error("Received a CREATE QueryChange for Rya Instance: " + ryaInstance +
+ ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
+ "StreamsQuery representing the created query. The query will not be processed.");
}
- });
- queryRepos.put(ryaInstanceName, repo);
- } finally {
- lock.unlock();
+ break;
+
+ case DELETE:
+ final UUID deletedQueryId = change.getQueryId();
+ log.info("Rya Instance " + ryaInstance + " deleted Rya Streams query with ID " + deletedQueryId);
+ final QueryEvent stopDeletedQuery = QueryEvent.stopped(ryaInstance, deletedQueryId);
+ offerUntilAcceptedOrShutdown(queryWorkQueue, stopDeletedQuery, blockingValue, blockingUnits, shutdownSignal);
+ break;
+
+ case UPDATE:
+ if(newQueryState.isPresent()) {
+ final StreamsQuery updatedQuery = newQueryState.get();
+ if(updatedQuery.isActive()) {
+ log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
+ updatedQuery.getQueryId() + " to be active.");
+ final QueryEvent executeUpdatedQuery = QueryEvent.executing(ryaInstance, updatedQuery);
+ offerUntilAcceptedOrShutdown(queryWorkQueue, executeUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
+ } else {
+ log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " +
+ updatedQuery.getQueryId() + " to be inactive.");
+ final QueryEvent stopUpdatedQuery = QueryEvent.stopped(ryaInstance, updatedQuery.getQueryId());
+ offerUntilAcceptedOrShutdown(queryWorkQueue, stopUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
+ }
+ } else {
+ log.error("Received an UPDATE QueryChange for Rya Instance: " + ryaInstance +
+ ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " +
+ "StreamsQuery representing the created query. The query will not be processed.");
+ }
+ break;
}
}
+ }
+
+ /**
+ * Processes a work queue of {@link QueryEvent}s.
+ * <p/>
+ * Each type of event maps the to corresponding method on {@link QueryExecutor} that is called into.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class QueryEventWorker implements Runnable {
+
+ private final BlockingQueue<QueryEvent> workQueue;
+ private final QueryExecutor queryExecutor;
+ private final long pollingValue;
+ private final TimeUnit pollingUnits;
+ private final AtomicBoolean shutdownSignal;
+
+ /**
+ * Constructs an instance of {@link QueryEventWorker}.
+ *
+ * @param workQueue - A queue of {@link QueryEvent}s that will be worked by this object. (not null)
+ * @param queryExecutor - Responsible for executing the {@link StreamsQuery}s. (not null)
+ * @param pollingValue - How long to wait when polling for new work.
+ * @param pollingUnits - The units for the {@code pollingValue}. (not null)
+ * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread
+ * may exit the {@link #run()} method. (not null)
+ */
+ public QueryEventWorker(
+ final BlockingQueue<QueryEvent> workQueue,
+ final QueryExecutor queryExecutor,
+ final long pollingValue,
+ final TimeUnit pollingUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.workQueue = requireNonNull(workQueue);
+ this.queryExecutor = requireNonNull(queryExecutor);
+ this.pollingValue = pollingValue;
+ this.pollingUnits = requireNonNull(pollingUnits);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
+ }
@Override
- public void notifyDelete(final String ryaInstanceName) {
- lock.lock();
- try {
- LOG.info("Notified of deleting QueryChangeLog, stopping all queries belonging to the change log for "
- + ryaInstanceName + ".");
- queryExecutor.stopAll(ryaInstanceName);
- } catch (final QueryExecutorException e) {
- LOG.error("Failed to stop all queries belonging to: " + ryaInstanceName, e);
- } finally {
- lock.unlock();
+ public void run() {
+ log.info("QueryEventWorker starting.");
+
+ // Run until the shutdown signal is set.
+ while(!shutdownSignal.get()) {
+ // Pull a unit of work from the queue.
+ try {
+ log.debug("Polling the work queue for a new QueryEvent.");
+ final QueryEvent event = workQueue.poll(pollingValue, pollingUnits);
+ if(event == null) {
+ // Poll again if nothing was found.
+ continue;
+ }
+
+ log.info("QueryEventWorker handling:\n" + event);
+
+ // Ensure the state within the executor matches the query event's state.
+ switch(event.getType()) {
+ case EXECUTING:
+ try {
+ queryExecutor.startQuery(event.getRyaInstance(), event.getStreamsQuery().get());
+ } catch (final IllegalStateException | QueryExecutorException e) {
+ log.error("Could not start a query represented by the following work: " + event, e);
+ }
+ break;
+
+ case STOPPED:
+ try {
+ queryExecutor.stopQuery(event.getQueryId().get());
+ } catch (final IllegalStateException | QueryExecutorException e) {
+ log.error("Could not stop a query represented by the following work: " + event, e);
+ }
+ break;
+
+ case STOP_ALL:
+ try {
+ queryExecutor.stopAll(event.getRyaInstance());
+ } catch (final IllegalStateException | QueryExecutorException e) {
+ log.error("Could not stop all queries represented by the following work: " + event, e);
+ }
+ break;
+ }
+ } catch (final InterruptedException e) {
+ log.debug("QueryEventWorker interrupted. Probably shutting down.");
+ }
}
+ log.info("QueryEventWorker shut down.");
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
index 515d699..04a0382 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
@@ -33,6 +33,7 @@ import org.apache.commons.daemon.DaemonContext;
import org.apache.commons.daemon.DaemonInitException;
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
import org.apache.rya.streams.querymanager.xml.Kafka;
@@ -91,7 +92,7 @@ public class QueryManagerDaemon implements Daemon {
// Unmarshall the configuration file into an object.
final QueryManagerConfig config;
- try(InputStream stream = Files.newInputStream(configFile)) {
+ try(final InputStream stream = Files.newInputStream(configFile)) {
config = QueryManagerConfigUnmarshaller.unmarshall(stream);
} catch(final JAXBException | SAXException e) {
throw new DaemonInitException("Unable to marshall the configuration XML file: " + configFile, e);
@@ -110,11 +111,12 @@ public class QueryManagerDaemon implements Daemon {
final QueryChangeLogSource source = new KafkaQueryChangeLogSource(kafka.getHostname(), kafka.getPort(), scheduler);
// Initialize a QueryExecutor.
+ final String zookeeperServers = config.getQueryExecutor().getLocalKafkaStreams().getZookeepers();
final KafkaStreamsFactory streamsFactory = new SingleThreadKafkaStreamsFactory(kafka.getHostname() + ":" + kafka.getPort());
- final QueryExecutor queryExecutor = new LocalQueryExecutor(streamsFactory);
+ final QueryExecutor queryExecutor = new LocalQueryExecutor(new CreateKafkaTopic(zookeeperServers), streamsFactory);
// Initialize the QueryManager using the configured resources.
- manager = new QueryManager(queryExecutor, source, scheduler);
+ manager = new QueryManager(queryExecutor, source, period, units);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
index 32305f5..e746baf 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
@@ -33,11 +33,12 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractScheduledService;
@@ -53,6 +54,8 @@ import edu.umd.cs.findbugs.annotations.NonNull;
@DefaultAnnotation(NonNull.class)
public class KafkaQueryChangeLogSource extends AbstractScheduledService implements QueryChangeLogSource {
+ private static final Logger log = LoggerFactory.getLogger(KafkaQueryChangeLogSource.class);
+
/**
* Ensures thread safe interactions with this object.
*/
@@ -74,10 +77,10 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen
private final Set<SourceListener> listeners = new HashSet<>();
/**
- * Maps Rya instance name to a Query Change Log for that instance. This map is used to keep
- * track of how the change logs change over time within the Kafka Server.
+ * Maps Rya instance names to the Query Change Log topic name in Kafka. This map is used to
+ * keep track of how the change logs change over time within the Kafka Server.
*/
- private final HashMap<String, QueryChangeLog> knownChangeLogs = new HashMap<>();
+ private final HashMap<String, String> knownChangeLogs = new HashMap<>();
/**
* A consumer that is used to poll the Kafka Server for topics.
@@ -101,6 +104,8 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen
@Override
protected void startUp() throws Exception {
+ log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " starting up...");
+
// Setup the consumer that is used to list topics for the source.
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
@@ -108,17 +113,23 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
listTopicsConsumer = new KafkaConsumer<>(consumerProperties);
+
+ log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " started.");
}
@Override
protected void shutDown() throws Exception {
- // Shut down the consumer that's used to list topics.
- listTopicsConsumer.close();
+ log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " shutting down...");
- // Shut down all of the change logs that were created within this class.
- for(final QueryChangeLog changeLog : knownChangeLogs.values()) {
- changeLog.close();
+ lock.lock();
+ try {
+ // Shut down the consumer that's used to list topics.
+ listTopicsConsumer.close();
+ } finally {
+ lock.unlock();
}
+
+ log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " shut down.");
}
@Override
@@ -130,8 +141,10 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen
listeners.add(listener);
// Notify it with everything that already exists.
- for(final Entry<String, QueryChangeLog> entry : knownChangeLogs.entrySet()) {
- listener.notifyCreate(entry.getKey(), entry.getValue());
+ for(final Entry<String, String> entry : knownChangeLogs.entrySet()) {
+ final String changeLogTopic = entry.getValue();
+ final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
+ listener.notifyCreate(entry.getKey(), changeLog);
}
} finally {
lock.unlock();
@@ -174,26 +187,23 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen
// Handle the deletes.
for(final String deletedRyaInstance : deletedRyaInstances) {
// Remove the change log from the set of known logs.
- final QueryChangeLog removed = knownChangeLogs.remove(deletedRyaInstance);
+ knownChangeLogs.remove(deletedRyaInstance);
- // Notify the listeners of the update.
+ // Notify the listeners of the update so that they may close the previously provided change log.
for(final SourceListener listener : listeners) {
listener.notifyDelete(deletedRyaInstance);
}
-
- // Ensure the change log is closed.
- removed.close();
}
// Handle the adds.
for(final String createdRyaInstance : createdRyaInstances) {
// Create and store the ChangeLog.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic(createdRyaInstance);
- final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
- knownChangeLogs.put(createdRyaInstance, changeLog);
+ knownChangeLogs.put(createdRyaInstance, changeLogTopic);
// Notify the listeners of the update.
for(final SourceListener listener : listeners) {
+ final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic);
listener.notifyCreate(createdRyaInstance, changeLog);
}
}