You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:20:57 UTC

[11/22] incubator-rya git commit: RYA-453 Implement the Query Manager's Daemon that controls the application.

RYA-453 Implement the Query Manager's Daemon that controls the application.



Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/16202ac7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/16202ac7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/16202ac7

Branch: refs/heads/master
Commit: 16202ac7d81a37d56f9a3a54f22f79bbea3fe8d5
Parents: e355f73
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Jan 30 14:44:58 2018 -0500
Committer: Valiyil <Pu...@parsons.com>
Committed: Fri Mar 9 12:59:48 2018 -0500

----------------------------------------------------------------------
 extras/rya.streams/query-manager/pom.xml        |  5 +
 .../querymanager/QueryManagerDaemon.java        | 99 ++++++++++++++++++--
 .../xml/QueryManagerConfigUnmarshaller.java     | 10 +-
 .../xml/QueryManagerConfigMarshallerTest.java   | 13 ++-
 4 files changed, 110 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/pom.xml b/extras/rya.streams/query-manager/pom.xml
index 2141a3a..deaccdb 100644
--- a/extras/rya.streams/query-manager/pom.xml
+++ b/extras/rya.streams/query-manager/pom.xml
@@ -47,6 +47,11 @@ under the License.
             <version>1.1.0</version>
         </dependency>
         
+        <dependency>
+            <groupId>com.beust</groupId>
+            <artifactId>jcommander</artifactId>
+        </dependency>
+        
         <!-- Test dependencies -->
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
index 2ab0ad8..515d699 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,32 +18,117 @@
  */
 package org.apache.rya.streams.querymanager;
 
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBException;
+
 import org.apache.commons.daemon.Daemon;
 import org.apache.commons.daemon.DaemonContext;
 import org.apache.commons.daemon.DaemonInitException;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
+import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
+import org.apache.rya.streams.querymanager.xml.Kafka;
+import org.apache.rya.streams.querymanager.xml.QueryManagerConfig;
+import org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod;
+import org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
+/**
+ * JSVC integration code for a {@link QueryManager} to be used as a non-Windows daemon.
+ */
 @DefaultAnnotation(NonNull.class)
 public class QueryManagerDaemon implements Daemon {
+
+    private static final Logger log = LoggerFactory.getLogger(QueryManagerDaemon.class);
+
+    /**
+     * The default configuration file's path for the application.
+     */
+    private static final Path DEFAULT_CONFIGURATION_PATH = Paths.get("config/configuration.xml");
+
+    /**
+     * Command line parameters that are used by all commands that interact with Kafka.
+     */
+    class DaemonParameters {
+        @Parameter(names = {"--config", "-c"}, required = false, description = "The path to the application's configuration file.")
+        public String config;
+    }
+
+    private QueryManager manager = null;
+
     @Override
     public void init(final DaemonContext context) throws DaemonInitException, Exception {
-        System.out.println("Initializing Query Manager Daemon.");
+        requireNonNull(context);
+
+        // Parse the command line arguments for the configuration file to use.
+        final String[] args = context.getArguments();
+        final DaemonParameters params = new DaemonParameters();
+        try {
+            new JCommander(params).parse(args);
+        } catch(final ParameterException e) {
+            throw new DaemonInitException("Unable to parse the command line arguments.", e);
+        }
+        final Path configFile = params.config != null ? Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH;
+        log.info("Loading the following configuration file: " + configFile);
+
+        // Unmarshall the configuration file into an object.
+        final QueryManagerConfig config;
+        try(InputStream stream = Files.newInputStream(configFile)) {
+            config = QueryManagerConfigUnmarshaller.unmarshall(stream);
+        } catch(final JAXBException | SAXException e) {
+            throw new DaemonInitException("Unable to marshall the configuration XML file: " + configFile, e);
+        }
+
+        // Read the source polling period from the configuration.
+        final QueryChanngeLogDiscoveryPeriod periodConfig = config.getPerformanceTunning().getQueryChanngeLogDiscoveryPeriod();
+        final long period = periodConfig.getValue().longValue();
+        final TimeUnit units = TimeUnit.valueOf( periodConfig.getUnits().toString() );
+        log.info("Query Change Log Polling Period: " + period + " " + units);
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, period, units);
+
+        // Initialize a QueryChangeLogSource.
+        final Kafka kafka = config.getQueryChangeLogSource().getKafka();
+        log.info("Kafka Source: " + kafka.getHostname() + ":" + kafka.getPort());
+        final QueryChangeLogSource source = new KafkaQueryChangeLogSource(kafka.getHostname(), kafka.getPort(), scheduler);
+
+        // Initialize a QueryExecutor.
+        final KafkaStreamsFactory streamsFactory = new SingleThreadKafkaStreamsFactory(kafka.getHostname() + ":" + kafka.getPort());
+        final QueryExecutor queryExecutor = new LocalQueryExecutor(streamsFactory);
+
+        // Initialize the QueryManager using the configured resources.
+        manager = new QueryManager(queryExecutor, source, scheduler);
     }
 
     @Override
     public void start() throws Exception {
-        System.out.println("Starting Query Manager Daemon.");
+        log.info("Starting the Rya Streams Query Manager Daemon.");
+        manager.startAndWait();
     }
 
     @Override
     public void stop() throws Exception {
-        System.out.println("Stopping Query Manager Daemon.");
+        log.info("Stopping the Rya Streams Query Manager Daemon.");
+        manager.stopAndWait();
     }
 
     @Override
-    public void destroy() {
-        System.out.println("Query Manager Daemon Destroyed.");
-    }
+    public void destroy() { }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java
index 834f0b9..39de24d 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java
@@ -20,7 +20,7 @@ package org.apache.rya.streams.querymanager.xml;
 
 import static java.util.Objects.requireNonNull;
 
-import java.io.Reader;
+import java.io.InputStream;
 import java.net.URL;
 
 import javax.xml.XMLConstants;
@@ -49,13 +49,13 @@ public class QueryManagerConfigUnmarshaller {
     /**
      * Validates and unmarshalls XML into a {@link QueryManagerConfig} object.
      *
-     * @param xmlReader - Reads the XML that will be unmarshalled. (not null)
+     * @param xmlStream - Reads the XML that will be unmarshalled. (not null)
      * @return A {@link QueryManagerConfig} loaded with the XMLs values.
      * @throws SAXException Could not load the schema the XML will be validated against.
      * @throws JAXBException Could not unmarshal the XML into a POJO.
      */
-    public static QueryManagerConfig unmarshall(final Reader xmlReader) throws JAXBException, SAXException {
-        requireNonNull(xmlReader);
+    public static QueryManagerConfig unmarshall(final InputStream xmlStream) throws JAXBException, SAXException {
+        requireNonNull(xmlStream);
 
         // Get an input stream to the XSD file that is packaged inside of the jar.
         final URL schemaURL = ClassLoader.getSystemResource(XSD_PATH);
@@ -73,6 +73,6 @@ public class QueryManagerConfigUnmarshaller {
         unmarshaller.setSchema(schema);
 
         // Perform the unmarshal.
-        return (QueryManagerConfig) unmarshaller.unmarshal(xmlReader);
+        return (QueryManagerConfig) unmarshaller.unmarshal(xmlStream);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
index 831c06b..f2b50ab 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
@@ -20,12 +20,15 @@ package org.apache.rya.streams.querymanager.xml;
 
 import static org.junit.Assert.assertNotNull;
 
-import java.io.StringReader;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
 
 import javax.xml.bind.UnmarshalException;
 
 import org.junit.Test;
 
+import com.google.common.base.Charsets;
+
 /**
  * Unit tests the methods of {@link QueryManagerConfigUnmarshaller}.
  */
@@ -50,8 +53,8 @@ public class QueryManagerConfigMarshallerTest {
                 "    </performanceTunning>\n" +
                 "</queryManagerConfig>";
 
-
-        final QueryManagerConfig config = QueryManagerConfigUnmarshaller.unmarshall(new StringReader(xml));
+        final InputStream xmlStream = new ByteArrayInputStream(xml.getBytes(Charsets.UTF_8));
+        final QueryManagerConfig config = QueryManagerConfigUnmarshaller.unmarshall(xmlStream);
         assertNotNull(config);
     }
 
@@ -68,7 +71,7 @@ public class QueryManagerConfigMarshallerTest {
                 "    </queryChangeLogSource>\n" +
                 "</queryManagerConfig>";
 
-
-        QueryManagerConfigUnmarshaller.unmarshall(new StringReader(xml));
+        final InputStream xmlStream = new ByteArrayInputStream(xml.getBytes(Charsets.UTF_8));
+        QueryManagerConfigUnmarshaller.unmarshall(xmlStream);
     }
 }
\ No newline at end of file