You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:20:55 UTC

[09/22] incubator-rya git commit: RYA-456 Implement a Single Node implementation of QueryExecutor.

RYA-456 Implement a Single Node implementation of QueryExecutor.



Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/a11ca4a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/a11ca4a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/a11ca4a2

Branch: refs/heads/master
Commit: a11ca4a2b1fd73ba754b1c2495b061e67c06ab0e
Parents: a31e256
Author: kchilton2 <ke...@gmail.com>
Authored: Fri Jan 26 15:55:59 2018 -0500
Committer: Valiyil <Pu...@parsons.com>
Committed: Fri Mar 9 12:59:45 2018 -0500

----------------------------------------------------------------------
 .../client/command/RunQueryCommandIT.java       |   3 +-
 .../rya/streams/kafka/KafkaStreamsFactory.java  |  57 ++++
 .../kafka/SingleThreadKafkaStreamsFactory.java  |  90 ++++++
 .../rya/streams/querymanager/QueryExecutor.java |  19 +-
 .../querymanager/kafka/LocalQueryExecutor.java  | 187 ++++++++++++
 .../kafka/LocalQueryExecutorIT.java             | 148 +++++++++
 .../kafka/LocalQueryExecutorTest.java           | 299 +++++++++++++++++++
 7 files changed, 795 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 7e3b8bc..5d63f32 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -91,7 +91,8 @@ public class RunQueryCommandIT {
     }
 
     @After
-    public void cleanup() throws Exception{
+    public void cleanup() throws Exception {
+        queryRepo.stopAndWait();
         stmtProducer.close();
         resultConsumer.close();
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java
new file mode 100644
index 0000000..bd8ff1e
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Builds {@link KafkaStreams} objects that are able to process a specific {@link StreamsQuery}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface KafkaStreamsFactory {
+
+    /**
+     * Builds a {@link KafkaStreams} object  that is able to process a specific {@link StreamsQuery}.
+     *
+     * @param ryaInstance - The Rya Instance the streams job is for. (not null)
+     * @param query - Defines the query that will be executed. (not null)
+     * @return A {@link KafkaStreams} object that will process the provided query.
+     * @throws KafkaStreamsFactoryException Unable to create a {@link KafkaStreams} object from the provided values.
+     */
+    public KafkaStreams make(String ryaInstance, StreamsQuery query) throws KafkaStreamsFactoryException;
+
+    /**
+     * A {@link KafkaStreamsFactory} could not create a {@link KafkaStreams} object.
+     */
+    public static class KafkaStreamsFactoryException extends Exception {
+        private static final long serialVersionUID = 1L;
+
+        public KafkaStreamsFactoryException(final String message) {
+            super(message);
+        }
+
+        public KafkaStreamsFactoryException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
new file mode 100644
index 0000000..7ab7e90
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
+import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.openrdf.query.MalformedQueryException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates {@link KafkaStreams} objects that are able to process {@link StreamsQuery}s
+ * using a single thread of execution starting from the earliest point in within the
+ * input topic. The Application ID used by the client is based on the Query ID of the
+ * query that is being executed so that this job may resume where it left off if it
+ * is stopped.
+ */
+@DefaultAnnotation(NonNull.class)
+public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory {
+
+    private final TopologyBuilderFactory topologyFactory = new TopologyFactory();
+
+    private final String bootstrapServersConfig;
+
+    /**
+     * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}.
+     *
+     * @param bootstrapServersConfig - Configures which Kafka cluster the jobs will interact with. (not null)
+     */
+    public SingleThreadKafkaStreamsFactory(final String bootstrapServersConfig) {
+        this.bootstrapServersConfig = requireNonNull(bootstrapServersConfig);
+    }
+
+    @Override
+    public KafkaStreams make(final String ryaInstance, final StreamsQuery query) throws KafkaStreamsFactoryException {
+        requireNonNull(ryaInstance);
+        requireNonNull(query);
+
+        // Setup the Kafka Stream program.
+        final Properties streamsProps = new Properties();
+
+        // Configure the Kafka servers that will be talked to.
+        streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
+
+        // Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run.
+        streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "RyaStreams-Query-" + query.getQueryId());
+
+        // Always start at the beginning of the input topic.
+        streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        // Setup the topology that processes the Query.
+        final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(query.getQueryId());
+
+        try {
+            final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory());
+            return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));
+        } catch (MalformedQueryException | TopologyBuilderException e) {
+            throw new KafkaStreamsFactoryException("Could not create a KafkaStreams processing topology for query " + query.getQueryId(), e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java
index 4572f08..bcee796 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -35,36 +35,41 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  */
 @DefaultAnnotation(NonNull.class)
 public interface QueryExecutor extends Service {
+
     /**
      * Starts running a {@link StreamsQuery}.
      *
      * @param ryaInstanceName - The rya instance whose {@link Statement}s will be processed by the query. (not null)
      * @param query - The query to run. (not null)
      * @throws QueryExecutorException When the query fails to start.
+     * @throws IllegalStateException The service has not been started yet.
      */
-    public void startQuery(final String ryaInstanceName, final StreamsQuery query) throws QueryExecutorException;
+    public void startQuery(final String ryaInstanceName, final StreamsQuery query) throws QueryExecutorException, IllegalStateException;
 
     /**
      * Stops a {@link StreamsQuery}.
      *
      * @param queryID - The ID of the query to stop. (not null)
      * @throws QueryExecutorException When the query fails to stop.
+     * @throws IllegalStateException The service has not been started yet.
      */
-    public void stopQuery(final UUID queryID) throws QueryExecutorException;
+    public void stopQuery(final UUID queryID) throws QueryExecutorException, IllegalStateException;
 
     /**
      * Stops all {@link StreamsQuery} belonging to a specific rya instance.
      *
      * @param ryaInstanceName - The name of the rya instance to stop all queries for. (not null)
      * @throws QueryExecutorException When the queries fails to stop.
+     * @throws IllegalStateException The service has not been started yet.
      */
-    public void stopAll(final String ryaInstanceName) throws QueryExecutorException;
+    public void stopAll(final String ryaInstanceName) throws QueryExecutorException, IllegalStateException;
 
     /**
-     * @return - A set of {@link UUID}s representing the current active queries.
+     * @return A set of {@link UUID}s representing the current active queries.
      * @throws QueryExecutorException Can't discover which queries are currently running.
+     * @throws IllegalStateException The service has not been started yet.
      */
-    public Set<UUID> getRunningQueryIds() throws QueryExecutorException;
+    public Set<UUID> getRunningQueryIds() throws QueryExecutorException, IllegalStateException;
 
     /**
      * Exception to be used by {@link QueryExecutor} when queries fail to start or stop.
@@ -100,4 +105,4 @@ public interface QueryExecutor extends Service {
             super(cause);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
new file mode 100644
index 0000000..947a215
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory.KafkaStreamsFactoryException;
+import org.apache.rya.streams.querymanager.QueryExecutor;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import kafka.consumer.KafkaStream;
+
+/**
+ * A {@link QueryExecutor} that runs a {@link KafkaStreams} job within its own JVM every
+ * time {@link #startQuery(String, StreamsQuery)} is invoked.
+ * <p/>
+ * This executor may run out of JVM resources if it is used to execute too many queries.
+ */
+@DefaultAnnotation(NonNull.class)
+public class LocalQueryExecutor extends AbstractIdleService implements QueryExecutor {
+
+    /**
+     * Provides thread safety when interacting with this class.
+     */
+    public static ReentrantLock lock = new ReentrantLock();
+
+    /**
+     * Lookup the Rya Instance of a specific Query Id.
+     */
+    private final Map<UUID, String> ryaInstanceById = new HashMap<>();
+
+    /**
+     * Lookup the Query IDs that are running for a specific Rya Instance.
+     */
+    private final Multimap<String, UUID> idByRyaInstance = HashMultimap.create();
+
+    /**
+     * Lookup the executing {@link KafkaStreams} job for a running Query Id.
+     */
+    private final Map<UUID, KafkaStreams> byQueryId = new HashMap<>();
+
+    /**
+     * Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s.
+     */
+    private final KafkaStreamsFactory streamsFactory;
+
+    /**
+     * Constructs an instance of {@link LocalQueryExecutor}.
+     *
+     * @param streamsFactory - Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s. (not null)
+     */
+    public LocalQueryExecutor(final KafkaStreamsFactory streamsFactory) {
+        this.streamsFactory = requireNonNull(streamsFactory);
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+        // Nothing to do.
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+        // Stop all of the running queries.
+        for(final KafkaStreams job : byQueryId.values()) {
+            job.close();
+        }
+    }
+
+    @Override
+    public void startQuery(final String ryaInstance, final StreamsQuery query) throws QueryExecutorException {
+        requireNonNull(ryaInstance);
+        requireNonNull(query);
+        checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method.");
+
+        lock.lock();
+        try {
+            // Setup the Kafka Streams job that will execute.
+            final KafkaStreams streams = streamsFactory.make(ryaInstance, query);
+            streams.start();
+
+            // Mark which Rya Instance the Query ID is for.
+            ryaInstanceById.put(query.getQueryId(), ryaInstance);
+
+            // Add the Query ID to the collection of running queries for the Rya instance.
+            idByRyaInstance.put(ryaInstance, query.getQueryId());
+
+            // Add the running Kafka Streams job for the Query ID.
+            byQueryId.put(query.getQueryId(), streams);
+
+        } catch (final KafkaStreamsFactoryException e) {
+            throw new QueryExecutorException("Could not start query " + query.getQueryId(), e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void stopQuery(final UUID queryId) throws QueryExecutorException {
+        requireNonNull(queryId);
+        checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method.");
+
+        lock.lock();
+        try {
+            if(byQueryId.containsKey(queryId)) {
+                // Stop the job from running.
+                final KafkaStreams streams = byQueryId.get(queryId);
+                streams.close();
+
+                // Remove it from the Rya Instance Name lookup.
+                final String ryaInstance = ryaInstanceById.remove(queryId);
+
+                // Remove it from the collection of running queries for the Rya Instance.
+                idByRyaInstance.remove(ryaInstance, queryId);
+
+                // Remove it from the running Kafka Streams job lookup.
+                byQueryId.remove(queryId);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void stopAll(final String ryaInstanceName) throws QueryExecutorException {
+        requireNonNull(ryaInstanceName);
+        checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method.");
+
+        lock.lock();
+        try {
+            if(idByRyaInstance.containsKey(ryaInstanceName)) {
+                // A defensive copy of the queries so that we may remove them from the maps.
+                final Set<UUID> queryIds = new HashSet<>( idByRyaInstance.get(ryaInstanceName) );
+
+                // Stop each of them.
+                for(final UUID queryId : queryIds) {
+                    stopQuery(queryId);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public Set<UUID> getRunningQueryIds() throws QueryExecutorException {
+        lock.lock();
+        checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method.");
+
+        try {
+            return new HashSet<>( byQueryId.keySet() );
+        } finally {
+            lock.unlock();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
new file mode 100644
index 0000000..3cbe894
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.interactor.LoadStatements;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.streams.querymanager.QueryExecutor;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration tests the methods of {@link LocalQueryExecutor}.
+ */
+public class LocalQueryExecutorIT {
+
+    private final String ryaInstance = UUID.randomUUID().toString();
+
+    private Producer<String, VisibilityStatement> stmtProducer = null;
+    private Consumer<String, VisibilityBindingSet> resultConsumer = null;
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
+
+    @Before
+    public void setup() {
+        // Make sure the topic that the change log uses exists.
+        final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+        kafka.createTopic(changeLogTopic);
+
+        // Initialize the Statements Producer and the Results Consumer.
+        stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
+        resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        stmtProducer.close();
+        resultConsumer.close();
+    }
+
+    @Test
+    public void runQuery() throws Exception {
+        // Test values.
+        final String ryaInstance = "rya";
+        final StreamsQuery sQuery = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
+
+        // Create the statements that will be loaded.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add(new VisibilityStatement(vf.createStatement(
+                vf.createURI("urn:Alice"),
+                vf.createURI("urn:worksAt"),
+                vf.createURI("urn:BurgerJoint")), "a"));
+        statements.add(new VisibilityStatement(vf.createStatement(
+                vf.createURI("urn:Bob"),
+                vf.createURI("urn:worksAt"),
+                vf.createURI("urn:TacoShop")), "a"));
+        statements.add(new VisibilityStatement(vf.createStatement(
+                vf.createURI("urn:Charlie"),
+                vf.createURI("urn:worksAt"),
+                vf.createURI("urn:TacoShop")), "a"));
+
+        // Create the expected results.
+        final List<VisibilityBindingSet> expected = new ArrayList<>();
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Bob"));
+        bs.addBinding("business", vf.createURI("urn:TacoShop"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+        bs = new MapBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Charlie"));
+        bs.addBinding("business", vf.createURI("urn:TacoShop"));
+        expected.add(new VisibilityBindingSet(bs, "a"));
+
+        // Start the executor that will be tested.
+        final String kafkaServers = kafka.getKafkaHostname() + ":" + kafka.getKafkaPort();
+        final KafkaStreamsFactory jobFactory = new SingleThreadKafkaStreamsFactory(kafkaServers);
+        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        executor.startAndWait();
+        try {
+            // Start the query.
+            executor.startQuery(ryaInstance, sQuery);
+
+            // Wait for the program to start.
+            Thread.sleep(5000);
+
+            // Write some statements to the program.
+            final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+            final LoadStatements loadStatements = new KafkaLoadStatements(statementsTopic, stmtProducer);
+            loadStatements.fromCollection(statements);
+
+            // Read the output of the streams program.
+            final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+            resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
+            final List<VisibilityBindingSet> results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
+            assertEquals(expected, results);
+
+        } finally {
+            executor.stopAndWait();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
new file mode 100644
index 0000000..0df5794
--- /dev/null
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.querymanager.QueryExecutor;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests the methods of {@link LocalQueryExecutor}.
+ */
+public class LocalQueryExecutorTest {
+
+    @Test(expected = IllegalStateException.class)
+    public void startQuery_serviceNotStarted() throws Exception {
+        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true));
+    }
+
+    @Test
+    public void startQuery() throws Exception {
+        // Test values.
+        final String ryaInstance = "rya";
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+        // Mock the streams factory so that we can tell if the start function is invoked by the executor.
+        final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+        final KafkaStreams queryJob = mock(KafkaStreams.class);
+        when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
+
+        // Start the executor that will be tested.
+        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        executor.startAndWait();
+        try {
+            // Tell the executor to start the query.
+            executor.startQuery(ryaInstance, query);
+
+            // Show a job was started for that query's ID.
+            verify(queryJob).start();
+        } finally {
+            executor.stopAndWait();
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void stopQuery_serviceNotStarted() throws Exception {
+        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        executor.stopQuery(UUID.randomUUID());
+    }
+
+    @Test
+    public void stopQuery_queryNotRunning() throws Exception {
+        // Start an executor.
+        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        executor.startAndWait();
+        try {
+            // Try to stop a query that was never stareted.
+            executor.stopQuery(UUID.randomUUID());
+        } finally {
+            executor.stopAndWait();
+        }
+    }
+
+    @Test
+    public void stopQuery() throws Exception {
+        // Test values.
+        final String ryaInstance = "rya";
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+        // Mock the streams factory so that we can tell if the stop function is invoked by the executor.
+        final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+        final KafkaStreams queryJob = mock(KafkaStreams.class);
+        when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
+
+        // Start the executor that will be tested.
+        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        executor.startAndWait();
+        try {
+            // Tell the executor to start the query.
+            executor.startQuery(ryaInstance, query);
+
+            // Tell the executor to stop the query.
+            executor.stopQuery(query.getQueryId());
+
+            // Show a job was stopped for that query's ID.
+            verify(queryJob).close();
+        } finally {
+            executor.stopAndWait();
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void stopAll_serviceNotStarted() throws Exception {
+        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        executor.stopAll("rya");
+    }
+
+    @Test
+    public void stopAll_noneForThatRyaInstance() throws Exception {
+        // Test values.
+        final String ryaInstance = "rya";
+        final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+        // Mock the streams factory so that we can tell if the stop function is invoked by the executor.
+        final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+        final KafkaStreams queryJob1 = mock(KafkaStreams.class);
+        final KafkaStreams queryJob2 = mock(KafkaStreams.class);
+        when(jobFactory.make(eq(ryaInstance), eq(query1))).thenReturn(queryJob1);
+        when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(queryJob2);
+
+        // Start the executor that will be tested.
+        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        executor.startAndWait();
+        try {
+            // Tell the executor to start the queries.
+            executor.startQuery(ryaInstance, query1);
+            executor.startQuery(ryaInstance, query2);
+
+            // Verify both are running.
+            verify(queryJob1).start();
+            verify(queryJob2).start();
+
+            // Tell the executor to stop queries running under rya2.
+            executor.stopAll("someOtherRyaInstance");
+
+            // Show none of the queries were stopped.
+            verify(queryJob1, never()).close();
+            verify(queryJob2, never()).close();
+
+        } finally {
+            executor.stopAndWait();
+        }
+    }
+
+    @Test
+    public void stopAll() throws Exception {
+        // Test values.
+        final String ryaInstance1 = "rya1";
+        final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final String ryaInstance2 = "rya2";
+        final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+        // Mock the streams factory so that we can tell if the stop function is invoked by the executor.
+        final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+        final KafkaStreams queryJob1 = mock(KafkaStreams.class);
+        when(jobFactory.make(eq(ryaInstance1), eq(query1))).thenReturn(queryJob1);
+        final KafkaStreams queryJob2 = mock(KafkaStreams.class);
+        when(jobFactory.make(eq(ryaInstance2), eq(query2))).thenReturn(queryJob2);
+
+        // Start the executor that will be tested.
+        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        executor.startAndWait();
+        try {
+            // Tell the executor to start the queries.
+            executor.startQuery(ryaInstance1, query1);
+            executor.startQuery(ryaInstance2, query2);
+
+            // Verify both are running.
+            verify(queryJob1).start();
+            verify(queryJob2).start();
+
+            // Tell the executor to stop queries running under rya2.
+            executor.stopAll(ryaInstance2);
+
+            // Show the first query is still running, but the second isn't.
+            verify(queryJob1, never()).close();
+            verify(queryJob2).close();
+
+        } finally {
+            executor.stopAndWait();
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void getRunningQueryIds_serviceNotStarted() throws Exception {
+        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        executor.getRunningQueryIds();
+    }
+
+    @Test
+    public void getRunningQueryIds_noneStarted() throws Exception {
+        // Start an executor.
+        final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        executor.startAndWait();
+        try {
+            // Get the list of running queries.
+            final Set<UUID> runningQueries = executor.getRunningQueryIds();
+
+            // Show no queries are reported as running.
+            assertTrue(runningQueries.isEmpty());
+        } finally {
+            executor.stopAndWait();
+        }
+    }
+
+    @Test
+    public void getRunningQueryIds_noneStopped() throws Exception {
+        // Test values.
+        final String ryaInstance = "rya";
+        final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+        // Mock the streams factory so that we can figure out what is started.
+        final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+        when(jobFactory.make(eq(ryaInstance), eq(query1))).thenReturn(mock(KafkaStreams.class));
+        when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(mock(KafkaStreams.class));
+        when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
+
+        // Start the executor that will be tested.
+        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        executor.startAndWait();
+        try {
+            // Start the queries.
+            executor.startQuery(ryaInstance, query1);
+            executor.startQuery(ryaInstance, query2);
+            executor.startQuery(ryaInstance, query3);
+
+            // All of those query IDs should be reported as running.
+            final Set<UUID> expected = Sets.newHashSet(
+                    query1.getQueryId(),
+                    query2.getQueryId(),
+                    query3.getQueryId());
+            assertEquals(expected, executor.getRunningQueryIds());
+
+        } finally {
+            executor.stopAndWait();
+        }
+    }
+
+    @Test
+    public void getRunningQueryIds_stoppedNoLongerListed() throws Exception {
+        // Test values.
+        final String ryaInstance = "rya";
+        final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+
+        // Mock the streams factory so that we can figure out what is started.
+        final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
+        when(jobFactory.make(eq(ryaInstance), eq(query1))).thenReturn(mock(KafkaStreams.class));
+        when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(mock(KafkaStreams.class));
+        when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class));
+
+        // Start the executor that will be tested.
+        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        executor.startAndWait();
+        try {
+            // Start the queries.
+            executor.startQuery(ryaInstance, query1);
+            executor.startQuery(ryaInstance, query2);
+            executor.startQuery(ryaInstance, query3);
+
+            // Stop the second query.
+            executor.stopQuery(query2.getQueryId());
+
+            // Only the first and third queries are running.
+            final Set<UUID> expected = Sets.newHashSet(
+                    query1.getQueryId(),
+                    query3.getQueryId());
+            assertEquals(expected, executor.getRunningQueryIds());
+
+        } finally {
+            executor.stopAndWait();
+        }
+    }
+}
\ No newline at end of file