You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:20:57 UTC
[11/22] incubator-rya git commit: RYA-453 Implement the Query
Manager's Daemon that controls the application.
RYA-453 Implement the Query Manager's Daemon that controls the application.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/16202ac7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/16202ac7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/16202ac7
Branch: refs/heads/master
Commit: 16202ac7d81a37d56f9a3a54f22f79bbea3fe8d5
Parents: e355f73
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Jan 30 14:44:58 2018 -0500
Committer: Valiyil <Pu...@parsons.com>
Committed: Fri Mar 9 12:59:48 2018 -0500
----------------------------------------------------------------------
extras/rya.streams/query-manager/pom.xml | 5 +
.../querymanager/QueryManagerDaemon.java | 99 ++++++++++++++++++--
.../xml/QueryManagerConfigUnmarshaller.java | 10 +-
.../xml/QueryManagerConfigMarshallerTest.java | 13 ++-
4 files changed, 110 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/pom.xml b/extras/rya.streams/query-manager/pom.xml
index 2141a3a..deaccdb 100644
--- a/extras/rya.streams/query-manager/pom.xml
+++ b/extras/rya.streams/query-manager/pom.xml
@@ -47,6 +47,11 @@ under the License.
<version>1.1.0</version>
</dependency>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
index 2ab0ad8..515d699 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,32 +18,117 @@
*/
package org.apache.rya.streams.querymanager;
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBException;
+
import org.apache.commons.daemon.Daemon;
import org.apache.commons.daemon.DaemonContext;
import org.apache.commons.daemon.DaemonInitException;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
+import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
+import org.apache.rya.streams.querymanager.xml.Kafka;
+import org.apache.rya.streams.querymanager.xml.QueryManagerConfig;
+import org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod;
+import org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
+/**
+ * JSVC integration code for a {@link QueryManager} to be used as a non-Windows daemon.
+ */
@DefaultAnnotation(NonNull.class)
public class QueryManagerDaemon implements Daemon {
+
+ private static final Logger log = LoggerFactory.getLogger(QueryManagerDaemon.class);
+
+ /**
+ * The default configuration file's path for the application.
+ */
+ private static final Path DEFAULT_CONFIGURATION_PATH = Paths.get("config/configuration.xml");
+
+ /**
+ * Command line parameters that are used by all commands that interact with Kafka.
+ */
+ class DaemonParameters {
+ @Parameter(names = {"--config", "-c"}, required = false, description = "The path to the application's configuration file.")
+ public String config;
+ }
+
+ private QueryManager manager = null;
+
@Override
public void init(final DaemonContext context) throws DaemonInitException, Exception {
- System.out.println("Initializing Query Manager Daemon.");
+ requireNonNull(context);
+
+ // Parse the command line arguments for the configuration file to use.
+ final String[] args = context.getArguments();
+ final DaemonParameters params = new DaemonParameters();
+ try {
+ new JCommander(params).parse(args);
+ } catch(final ParameterException e) {
+ throw new DaemonInitException("Unable to parse the command line arguments.", e);
+ }
+ final Path configFile = params.config != null ? Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH;
+ log.info("Loading the following configuration file: " + configFile);
+
+ // Unmarshall the configuration file into an object.
+ final QueryManagerConfig config;
+ try(InputStream stream = Files.newInputStream(configFile)) {
+ config = QueryManagerConfigUnmarshaller.unmarshall(stream);
+ } catch(final JAXBException | SAXException e) {
+ throw new DaemonInitException("Unable to marshall the configuration XML file: " + configFile, e);
+ }
+
+ // Read the source polling period from the configuration.
+ final QueryChanngeLogDiscoveryPeriod periodConfig = config.getPerformanceTunning().getQueryChanngeLogDiscoveryPeriod();
+ final long period = periodConfig.getValue().longValue();
+ final TimeUnit units = TimeUnit.valueOf( periodConfig.getUnits().toString() );
+ log.info("Query Change Log Polling Period: " + period + " " + units);
+ final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, period, units);
+
+ // Initialize a QueryChangeLogSource.
+ final Kafka kafka = config.getQueryChangeLogSource().getKafka();
+ log.info("Kafka Source: " + kafka.getHostname() + ":" + kafka.getPort());
+ final QueryChangeLogSource source = new KafkaQueryChangeLogSource(kafka.getHostname(), kafka.getPort(), scheduler);
+
+ // Initialize a QueryExecutor.
+ final KafkaStreamsFactory streamsFactory = new SingleThreadKafkaStreamsFactory(kafka.getHostname() + ":" + kafka.getPort());
+ final QueryExecutor queryExecutor = new LocalQueryExecutor(streamsFactory);
+
+ // Initialize the QueryManager using the configured resources.
+ manager = new QueryManager(queryExecutor, source, scheduler);
}
@Override
public void start() throws Exception {
- System.out.println("Starting Query Manager Daemon.");
+ log.info("Starting the Rya Streams Query Manager Daemon.");
+ manager.startAndWait();
}
@Override
public void stop() throws Exception {
- System.out.println("Stopping Query Manager Daemon.");
+ log.info("Stopping the Rya Streams Query Manager Daemon.");
+ manager.stopAndWait();
}
@Override
- public void destroy() {
- System.out.println("Query Manager Daemon Destroyed.");
- }
+ public void destroy() { }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java
index 834f0b9..39de24d 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java
@@ -20,7 +20,7 @@ package org.apache.rya.streams.querymanager.xml;
import static java.util.Objects.requireNonNull;
-import java.io.Reader;
+import java.io.InputStream;
import java.net.URL;
import javax.xml.XMLConstants;
@@ -49,13 +49,13 @@ public class QueryManagerConfigUnmarshaller {
/**
* Validates and unmarshalls XML into a {@link QueryManagerConfig} object.
*
- * @param xmlReader - Reads the XML that will be unmarshalled. (not null)
+ * @param xmlStream - Reads the XML that will be unmarshalled. (not null)
* @return A {@link QueryManagerConfig} loaded with the XMLs values.
* @throws SAXException Could not load the schema the XML will be validated against.
* @throws JAXBException Could not unmarshal the XML into a POJO.
*/
- public static QueryManagerConfig unmarshall(final Reader xmlReader) throws JAXBException, SAXException {
- requireNonNull(xmlReader);
+ public static QueryManagerConfig unmarshall(final InputStream xmlStream) throws JAXBException, SAXException {
+ requireNonNull(xmlStream);
// Get an input stream to the XSD file that is packaged inside of the jar.
final URL schemaURL = ClassLoader.getSystemResource(XSD_PATH);
@@ -73,6 +73,6 @@ public class QueryManagerConfigUnmarshaller {
unmarshaller.setSchema(schema);
// Perform the unmarshal.
- return (QueryManagerConfig) unmarshaller.unmarshal(xmlReader);
+ return (QueryManagerConfig) unmarshaller.unmarshal(xmlStream);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
index 831c06b..f2b50ab 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
@@ -20,12 +20,15 @@ package org.apache.rya.streams.querymanager.xml;
import static org.junit.Assert.assertNotNull;
-import java.io.StringReader;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
import javax.xml.bind.UnmarshalException;
import org.junit.Test;
+import com.google.common.base.Charsets;
+
/**
* Unit tests the methods of {@link QueryManagerConfigUnmarshaller}.
*/
@@ -50,8 +53,8 @@ public class QueryManagerConfigMarshallerTest {
" </performanceTunning>\n" +
"</queryManagerConfig>";
-
- final QueryManagerConfig config = QueryManagerConfigUnmarshaller.unmarshall(new StringReader(xml));
+ final InputStream xmlStream = new ByteArrayInputStream(xml.getBytes(Charsets.UTF_8));
+ final QueryManagerConfig config = QueryManagerConfigUnmarshaller.unmarshall(xmlStream);
assertNotNull(config);
}
@@ -68,7 +71,7 @@ public class QueryManagerConfigMarshallerTest {
" </queryChangeLogSource>\n" +
"</queryManagerConfig>";
-
- QueryManagerConfigUnmarshaller.unmarshall(new StringReader(xml));
+ final InputStream xmlStream = new ByteArrayInputStream(xml.getBytes(Charsets.UTF_8));
+ QueryManagerConfigUnmarshaller.unmarshall(xmlStream);
}
}
\ No newline at end of file