You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:49:01 UTC
[44/50] [abbrv] incubator-rya git commit: RYA-377 Implement a command
for running a Rya Streams query out of the command line client.
RYA-377 Implement a command for running a Rya Streams query out of the command line client.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/94423229
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/94423229
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/94423229
Branch: refs/heads/master
Commit: 94423229ebe7b34e0fb6c17fbe022e080cfe79d9
Parents: a5e3618
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Nov 14 18:32:53 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500
----------------------------------------------------------------------
.../rya/streams/api/interactor/RunQuery.java | 41 ++++
.../api/queries/InMemoryQueryRepository.java | 21 +-
.../streams/api/queries/QueryRepository.java | 14 +-
.../queries/InMemoryQueryRepositoryTest.java | 121 ++++++-----
.../apache/rya/streams/client/CLIDriver.java | 2 +
.../client/command/LoadStatementsCommand.java | 1 +
.../streams/client/command/RunQueryCommand.java | 155 ++++++++++++++
.../client/command/StreamResultsCommand.java | 2 +-
.../client/command/AddQueryCommandIT.java | 55 ++---
.../client/command/DeleteQueryCommandIT.java | 183 +++++++---------
.../client/command/ListQueryCommandIT.java | 56 ++---
.../client/command/LoadStatementsCommandIT.java | 78 ++-----
.../client/command/RunQueryCommandIT.java | 196 +++++++++++++++++
extras/rya.streams/kafka/pom.xml | 10 +
.../apache/rya/streams/kafka/KafkaTopics.java | 39 ++++
.../kafka/interactor/KafkaLoadStatements.java | 5 +-
.../streams/kafka/interactor/KafkaRunQuery.java | 136 ++++++++++++
.../processors/join/KeyValueJoinStateStore.java | 5 +-
.../apache/rya/streams/kafka/KafkaTestUtil.java | 211 -------------------
.../rya/streams/kafka/RyaStreamsTestUtil.java | 124 +++++++++++
.../interactor/KafkaGetQueryResultStreamIT.java | 2 +-
.../kafka/interactor/KafkaLoadStatementsIT.java | 5 +-
.../kafka/interactor/KafkaRunQueryIT.java | 170 +++++++++++++++
.../processors/StatementPatternProcessorIT.java | 10 +-
.../processors/filter/FilterProcessorIT.java | 4 +-
.../kafka/processors/join/JoinProcessorIT.java | 12 +-
.../projection/MultiProjectionProcessorIT.java | 4 +-
.../projection/ProjectionProcessorIT.java | 4 +-
.../kafka/queries/KafkaQueryChangeLogIT.java | 2 +-
.../VisibilityBindingSetKafkaIT.java | 2 +-
.../VisibilityStatementKafkaIT.java | 2 +-
test/kafka/pom.xml | 5 +
.../rya/test/kafka/KafkaTestInstanceRule.java | 7 +
.../apache/rya/test/kafka/KafkaTestUtil.java | 126 +++++++++++
34 files changed, 1269 insertions(+), 541 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java
new file mode 100644
index 0000000..7f47095
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/RunQuery.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor;
+
+import java.util.UUID;
+
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Runs a Rya Streams processing topology on the machine this class is invoked on.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface RunQuery {
+
+ /**
+ * Runs the specified query on the machine this method was invoked on.
+ *
+ * @param queryId - The id of the query that will be processed. (not null)
+ * @throws RyaStreamsException The query could not processed.
+ */
+ public void run(UUID queryId) throws RyaStreamsException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index c1048fc..80678de 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
@@ -63,13 +64,7 @@ public class InMemoryQueryRepository implements QueryRepository {
this.changeLog = requireNonNull(changeLog);
// Lazily initialize the queries cache the first time you try to use it.
- queriesCache = Suppliers.memoize(new Supplier<Map<UUID, StreamsQuery>>() {
- @Override
- public Map<UUID, StreamsQuery> get() {
- // Initialize the queries cache using the current state of the change log.
- return initializeCache(changeLog);
- }
- });
+ queriesCache = Suppliers.memoize(() -> initializeCache(changeLog));
}
@Override
@@ -98,6 +93,18 @@ public class InMemoryQueryRepository implements QueryRepository {
}
@Override
+ public Optional<StreamsQuery> get(final UUID queryId) throws QueryRepositoryException {
+ requireNonNull(queryId);
+
+ lock.lock();
+ try {
+ return Optional.ofNullable( queriesCache.get().get(queryId) );
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
public void delete(final UUID queryId) throws QueryRepositoryException {
requireNonNull(queryId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
index 850b2bc..7269588 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
@@ -18,6 +18,7 @@
*/
package org.apache.rya.streams.api.queries;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -32,6 +33,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
*/
@DefaultAnnotation(NonNull.class)
public interface QueryRepository extends AutoCloseable {
+
/**
* Adds a new query to Rya Streams.
*
@@ -42,6 +44,15 @@ public interface QueryRepository extends AutoCloseable {
public StreamsQuery add(final String query) throws QueryRepositoryException;
/**
+ * Get an existing query from Rya Streams.
+ *
+ * @param queryId - Identifies which query will be fetched.
+ * @return the {@link StreamsQuery} for the id if one exists; otherwise empty.
+ * @throws QueryRepositoryException The query could not be fetched.
+ */
+ public Optional<StreamsQuery> get(UUID queryId) throws QueryRepositoryException;
+
+ /**
* Removes an existing query from Rya Streams.
*
* @param queryID - The {@link UUID} of the query to remove. (not null)
@@ -53,8 +64,7 @@ public interface QueryRepository extends AutoCloseable {
* Lists all existing queries in Rya Streams.
*
* @return - A List of the current {@link StreamsQuery}s
- * @throws QueryRepositoryException The {@link StreamsQuery}s could not be
- * listed.
+ * @throws QueryRepositoryException The {@link StreamsQuery}s could not be listed.
*/
public Set<StreamsQuery> list() throws QueryRepositoryException;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 25cbab2..92193ca 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -19,16 +19,17 @@
package org.apache.rya.streams.api.queries;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
-import org.apache.rya.streams.api.queries.QueryRepository.QueryRepositoryException;
import org.junit.Test;
/**
@@ -37,69 +38,95 @@ import org.junit.Test;
public class InMemoryQueryRepositoryTest {
@Test
- public void canReadAddedQueries() throws QueryRepositoryException {
+ public void canReadAddedQueries() throws Exception {
// Setup a totally in memory QueryRepository.
- final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() );
-
- // Add some queries to it.
- final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1") );
- expected.add( queries.add("query 2") );
- expected.add( queries.add("query 3") );
-
- // Show they are in the list of all queries.
- final Set<StreamsQuery> stored = queries.list();
- assertEquals(expected, stored);
+ try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+ // Add some queries to it.
+ final Set<StreamsQuery> expected = new HashSet<>();
+ expected.add( queries.add("query 1") );
+ expected.add( queries.add("query 2") );
+ expected.add( queries.add("query 3") );
+
+ // Show they are in the list of all queries.
+ final Set<StreamsQuery> stored = queries.list();
+ assertEquals(expected, stored);
+ }
}
@Test
- public void deletedQueriesDisappear() throws QueryRepositoryException {
+ public void deletedQueriesDisappear() throws Exception {
// Setup a totally in memory QueryRepository.
- final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() );
-
- // Add some queries to it. The second one we will delete.
- final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1") );
- final UUID deletedMeId = queries.add("query 2").getQueryId();
- expected.add( queries.add("query 3") );
-
- // Delete the second query.
- queries.delete( deletedMeId );
-
- // Show only queries 1 and 3 are in the list.
- final Set<StreamsQuery> stored = queries.list();
- assertEquals(expected, stored);
+ try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+ // Add some queries to it. The second one we will delete.
+ final Set<StreamsQuery> expected = new HashSet<>();
+ expected.add( queries.add("query 1") );
+ final UUID deletedMeId = queries.add("query 2").getQueryId();
+ expected.add( queries.add("query 3") );
+
+ // Delete the second query.
+ queries.delete( deletedMeId );
+
+ // Show only queries 1 and 3 are in the list.
+ final Set<StreamsQuery> stored = queries.list();
+ assertEquals(expected, stored);
+ }
}
@Test
- public void initializedWithPopulatedChnageLog() throws QueryRepositoryException {
+ public void initializedWithPopulatedChnageLog() throws Exception {
// Setup a totally in memory QueryRepository. Hold onto the change log so that we can use it again later.
final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
- final QueryRepository queries = new InMemoryQueryRepository( changeLog );
-
- // Add some queries and deletes to it.
- final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1") );
- final UUID deletedMeId = queries.add("query 2").getQueryId();
- expected.add( queries.add("query 3") );
- queries.delete( deletedMeId );
-
- // Create a new totally in memory QueryRepository.
- final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog );
-
- // Listing the queries should work using an initialized change log.
- final Set<StreamsQuery> stored = initializedQueries.list();
- assertEquals(expected, stored);
+ try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) {
+ // Add some queries and deletes to it.
+ final Set<StreamsQuery> expected = new HashSet<>();
+ expected.add( queries.add("query 1") );
+ final UUID deletedMeId = queries.add("query 2").getQueryId();
+ expected.add( queries.add("query 3") );
+ queries.delete( deletedMeId );
+
+ // Create a new totally in memory QueryRepository.
+ try(final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog )) {
+ // Listing the queries should work using an initialized change log.
+ final Set<StreamsQuery> stored = initializedQueries.list();
+ assertEquals(expected, stored);
+ }
+ }
}
@Test(expected = RuntimeException.class)
- public void changeLogThrowsExceptions() throws QueryChangeLogException, QueryRepositoryException {
+ public void changeLogThrowsExceptions() throws Exception {
// Create a mock change log that throws an exception when you try to list what is in it.
final QueryChangeLog changeLog = mock(QueryChangeLog.class);
when(changeLog.readFromStart()).thenThrow(new QueryChangeLogException("Mocked exception."));
// Create the QueryRepository and invoke one of the methods.
- final QueryRepository queries = new InMemoryQueryRepository( changeLog );
- queries.list();
+ try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) {
+ queries.list();
+ }
+ }
+
+ @Test
+ public void get_present() throws Exception {
+ // Setup a totally in memory QueryRepository.
+ try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+ // Add a query to it.
+ final StreamsQuery query = queries.add("query 1");
+
+ // Show the fetched query matches the expected ones.
+ final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
+ assertEquals(query, fetched.get());
+ }
+ }
+
+ @Test
+ public void get_notPresent() throws Exception {
+ // Setup a totally in memory QueryRepository.
+ try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+ // Fetch a query that was never added to the repository.
+ final Optional<StreamsQuery> query = queries.get(UUID.randomUUID());
+
+ // Show it could not be found.
+ assertFalse(query.isPresent());
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
index 5c0816f..05e75d9 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/CLIDriver.java
@@ -30,6 +30,7 @@ import org.apache.rya.streams.client.command.AddQueryCommand;
import org.apache.rya.streams.client.command.DeleteQueryCommand;
import org.apache.rya.streams.client.command.ListQueriesCommand;
import org.apache.rya.streams.client.command.LoadStatementsCommand;
+import org.apache.rya.streams.client.command.RunQueryCommand;
import org.apache.rya.streams.client.command.StreamResultsCommand;
import com.google.common.collect.ImmutableMap;
@@ -63,6 +64,7 @@ public class CLIDriver {
commandClasses.add(DeleteQueryCommand.class);
commandClasses.add(ListQueriesCommand.class);
commandClasses.add(LoadStatementsCommand.class);
+ commandClasses.add(RunQueryCommand.class);
commandClasses.add(StreamResultsCommand.class);
final ImmutableMap.Builder<String, RyaStreamsCommand> builder = ImmutableMap.builder();
for(final Class<? extends RyaStreamsCommand> commandClass : commandClasses) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
index 9414b28..42020b3 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
@@ -127,6 +127,7 @@ public class LoadStatementsCommand implements RyaStreamsCommand {
final Properties producerProps = buildProperties(params);
try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(producerProps)) {
final LoadStatements statements = new KafkaLoadStatements(KafkaTopics.statementsTopic(params.ryaInstance), producer);
+ System.out.printf("Loading statements from file `%s` using visibilities `%s`.\n", statementsPath, params.visibilities);
statements.fromFile(statementsPath, params.visibilities);
} catch (final Exception e) {
System.err.println("Unable to parse statements file: " + statementsPath.toString());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
new file mode 100644
index 0000000..8f7f162
--- /dev/null
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.interactor.KafkaRunQuery;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.base.Strings;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A command that runs a Rya Streams processing topology on the node the client is executed on until it has finished.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RunQueryCommand implements RyaStreamsCommand {
+
+ private class RunParameters extends RyaStreamsCommand.KafkaParameters {
+ @Parameter(names = { "--queryID", "-q" }, required = true, description = "The ID of the query to run.")
+ private String queryId;
+
+ @Parameter(names = {"--zookeepers", "-z"}, required = true, description = "The servers that Zookeeper runs on.")
+ private String zookeeperServers;
+
+ @Override
+ public String toString() {
+ final StringBuilder parameters = new StringBuilder();
+ parameters.append(super.toString());
+
+ if (!Strings.isNullOrEmpty(queryId)) {
+ parameters.append("\tQueryID: " + queryId);
+ parameters.append("\n");
+ }
+ return parameters.toString();
+ }
+ }
+
+ @Override
+ public String getCommand() {
+ return "run-query";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Runs a Rya Streams query until the command is killed. This command also creates the input and output " +
+ "topics required to execute the query.";
+ }
+
+ @Override
+ public String getUsage() {
+ final JCommander parser = new JCommander(new RunParameters());
+
+ final StringBuilder usage = new StringBuilder();
+ parser.usage(usage);
+ return usage.toString();
+ }
+
+ @Override
+ public boolean validArguments(final String[] args) {
+ boolean valid = true;
+ try {
+ new JCommander(new RunParameters(), args);
+ } catch(final ParameterException e) {
+ valid = false;
+ }
+ return valid;
+ }
+
+ @Override
+ public void execute(final String[] args) throws ArgumentsException, ExecutionException {
+ requireNonNull(args);
+
+ // Parse the command line arguments.
+ final RunParameters params = new RunParameters();
+ try {
+ new JCommander(params, args);
+ } catch(final ParameterException e) {
+ throw new ArgumentsException("Could not add a new query because of invalid command line parameters.", e);
+ }
+
+ // Create the Kafka backed QueryChangeLog.
+ final String bootstrapServers = params.kafkaIP + ":" + params.kafkaPort;
+ final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
+ final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
+
+ // Look up the query to be executed from the change log.
+ try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+ try {
+ final UUID queryId = UUID.fromString( params.queryId );
+ final Optional<StreamsQuery> query = queryRepo.get(queryId);
+
+ if(!query.isPresent()) {
+ throw new ArgumentsException("There is no registered query for queryId " + params.queryId);
+ }
+
+ // Make sure the topics required by the application exists for the specified Rya instances.
+ final Set<String> topics = new HashSet<>();
+ topics.add( KafkaTopics.statementsTopic(params.ryaInstance) );
+ topics.add( KafkaTopics.queryResultsTopic(queryId) );
+ KafkaTopics.createTopic(params.zookeeperServers, topics, 1, 1);
+
+ // Run the query that uses those topics.
+ final KafkaRunQuery runQuery = new KafkaRunQuery(
+ params.kafkaIP,
+ params.kafkaPort,
+ KafkaTopics.statementsTopic(params.ryaInstance),
+ KafkaTopics.queryResultsTopic(queryId),
+ queryRepo,
+ new TopologyFactory());
+ runQuery.run(queryId);
+ } catch(final Exception e) {
+ throw new ExecutionException("Could not execute the Run Query command.", e);
+ }
+ } catch(final ArgumentsException | ExecutionException e) {
+ // Rethrow the exceptions that are advertised by execute.
+ throw e;
+ } catch (final Exception e) {
+ throw new ExecutionException("Problem encountered while closing the QueryRepository.", e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 9de978b..64f78a3 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -126,7 +126,7 @@ public class StreamResultsCommand implements RyaStreamsCommand {
// Execute the command.
final GetQueryResultStream getQueryResultStream = new KafkaGetQueryResultStream(params.kafkaIP, params.kafkaPort);
- try (final QueryResultStream stream = getQueryResultStream.fromNow(queryId)) {
+ try (final QueryResultStream stream = getQueryResultStream.fromStart(queryId)) {
while(!finished.get()) {
for(final VisibilityBindingSet visBs : stream.poll(1000)) {
System.out.println(visBs);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index ee4378e..3a412d2 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -20,17 +20,11 @@ package org.apache.rya.streams.client.command;
import static org.junit.Assert.assertEquals;
-import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.streams.api.entity.StreamsQuery;
@@ -43,6 +37,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -54,49 +49,27 @@ import org.junit.Test;
public class AddQueryCommandIT {
private final String ryaInstance = UUID.randomUUID().toString();
-
- private String kafkaIp;
- private String kafkaPort;
private QueryRepository queryRepo;
- private Producer<?, QueryChange> queryProducer = null;
- private Consumer<?, QueryChange> queryConsumer = null;
-
@Rule
- public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
@Before
public void setup() {
- final Properties props = rule.createBootstrapServerConfig();
- final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
- final String[] tokens = location.split(":");
-
- kafkaIp = tokens[0];
- kafkaPort = tokens[1];
-
- // Initialize the QueryRepository.
- final Properties producerProperties = new Properties();
- producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
- producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
-
- final Properties consumerProperties = new Properties();
- consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
- consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
-
- queryProducer = new KafkaProducer<>(producerProperties);
- queryConsumer = new KafkaConsumer<>(consumerProperties);
-
+ // Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+ kafka.createTopic(changeLogTopic);
+
+ // Setup the QueryRepository used by the test.
+ final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
+ final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
queryRepo = new InMemoryQueryRepository(changeLog);
}
@After
- public void cleanup() {
- queryProducer.close();
- queryConsumer.close();
+ public void cleanup() throws Exception {
+ queryRepo.close();
}
@Test
@@ -105,8 +78,8 @@ public class AddQueryCommandIT {
final String query = "SELECT * WHERE { ?person <urn:name> ?name }";
final String[] args = new String[] {
"-r", "" + ryaInstance,
- "-i", kafkaIp,
- "-p", kafkaPort,
+ "-i", kafka.getKafkaHostname(),
+ "-p", kafka.getKafkaPort(),
"-q", query
};
@@ -126,8 +99,8 @@ public class AddQueryCommandIT {
final String query = "SELECT * WHERE { ?person <urn:name> ?name }";
final String[] args = new String[] {
"--ryaInstance", "" + ryaInstance,
- "--kafkaHostname", kafkaIp,
- "--kafkaPort", kafkaPort,
+ "--kafkaHostname", kafka.getKafkaHostname(),
+ "--kafkaPort", kafka.getKafkaPort(),
"--query", query
};
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index c5dad3d..91647f2 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -18,20 +18,15 @@
*/
package org.apache.rya.streams.client.command;
+import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
-import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.streams.api.entity.StreamsQuery;
@@ -44,8 +39,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.Rule;
import org.junit.Test;
@@ -54,124 +48,103 @@ import org.junit.Test;
*/
public class DeleteQueryCommandIT {
- private final String ryaInstance = UUID.randomUUID().toString();
-
- private String kafkaIp;
- private String kafkaPort;
-
- private Producer<?, QueryChange> queryProducer = null;
- private Consumer<?, QueryChange> queryConsumer = null;
-
@Rule
- public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
-
- @Before
- public void setup() {
- final Properties props = rule.createBootstrapServerConfig();
- final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
- final String[] tokens = location.split(":");
-
- kafkaIp = tokens[0];
- kafkaPort = tokens[1];
- }
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
/**
* This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need
* to re-create the repo outside of the command to ensure it has the most up to date values inside of it.
+ *
+ * @param ryaInstance - The rya instance the repository is connected to. (not null)
+ * @param createTopic - Set this to true if the topic doesn't exist yet.
*/
- private QueryRepository makeQueryRepository() {
- final Properties producerProperties = new Properties();
- producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
- producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
-
- final Properties consumerProperties = new Properties();
- consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
- consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
-
- cleanup();
- queryProducer = new KafkaProducer<>(producerProperties);
- queryConsumer = new KafkaConsumer<>(consumerProperties);
+ private QueryRepository makeQueryRepository(final String ryaInstance, final boolean createTopic) {
+ requireNonNull(ryaInstance);
+ // Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+ if(createTopic) {
+ kafka.createTopic(changeLogTopic);
+ }
+
+ // Setup the QueryRepository used by the test.
+ final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
+ final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
return new InMemoryQueryRepository(changeLog);
}
- @After
- public void cleanup() {
- if(queryProducer != null) {
- queryProducer.close();
- }
- if(queryConsumer != null) {
- queryConsumer.close();
- }
- }
-
@Test
public void shortParams() throws Exception {
+ final String ryaInstance = UUID.randomUUID().toString();
+
// Add a few queries to Rya Streams.
- QueryRepository repo = makeQueryRepository();
- repo.add("query1");
- final UUID query2Id = repo.add("query2").getQueryId();
- repo.add("query3");
-
- // Show that all three of the queries were added.
- Set<StreamsQuery> queries = repo.list();
- assertEquals(3, queries.size());
-
- // Delete query 2 using the delete query command.
- final String[] deleteArgs = new String[] {
- "-r", "" + ryaInstance,
- "-i", kafkaIp,
- "-p", kafkaPort,
- "-q", query2Id.toString()
- };
-
- final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
- deleteCommand.execute(deleteArgs);
-
- // Show query2 was deleted.
- repo = makeQueryRepository();
- queries = repo.list();
- assertEquals(2, queries.size());
-
- for(final StreamsQuery query : queries) {
- assertNotEquals(query2Id, query.getQueryId());
+ try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) {
+ repo.add("query1");
+ final UUID query2Id = repo.add("query2").getQueryId();
+ repo.add("query3");
+
+ // Show that all three of the queries were added.
+ Set<StreamsQuery> queries = repo.list();
+ assertEquals(3, queries.size());
+
+ // Delete query 2 using the delete query command.
+ final String[] deleteArgs = new String[] {
+ "-r", "" + ryaInstance,
+ "-i", kafka.getKafkaHostname(),
+ "-p", kafka.getKafkaPort(),
+ "-q", query2Id.toString()
+ };
+
+ final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
+ deleteCommand.execute(deleteArgs);
+
+ // Show query2 was deleted.
+ try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) {
+ queries = repo2.list();
+ assertEquals(2, queries.size());
+
+ for(final StreamsQuery query : queries) {
+ assertNotEquals(query2Id, query.getQueryId());
+ }
+ }
}
}
@Test
public void longParams() throws Exception {
+ final String ryaInstance = UUID.randomUUID().toString();
+
// Add a few queries to Rya Streams.
- QueryRepository repo = makeQueryRepository();
- repo.add("query1");
- final UUID query2Id = repo.add("query2").getQueryId();
- repo.add("query3");
-
- // Show that all three of the queries were added.
- Set<StreamsQuery> queries = repo.list();
- assertEquals(3, queries.size());
-
- // Delete query 2 using the delete query command.
- final String[] deleteArgs = new String[] {
- "--ryaInstance", "" + ryaInstance,
- "--kafkaHostname", kafkaIp,
- "--kafkaPort", kafkaPort,
- "--queryID", query2Id.toString()
- };
-
- final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
- deleteCommand.execute(deleteArgs);
-
- // Show query2 was deleted.
- repo = makeQueryRepository();
- queries = repo.list();
- assertEquals(2, queries.size());
-
- for(final StreamsQuery query : queries) {
- assertNotEquals(query2Id, query.getQueryId());
+ try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) {
+ repo.add("query1");
+ final UUID query2Id = repo.add("query2").getQueryId();
+ repo.add("query3");
+
+ // Show that all three of the queries were added.
+ Set<StreamsQuery> queries = repo.list();
+ assertEquals(3, queries.size());
+
+ // Delete query 2 using the delete query command.
+ final String[] deleteArgs = new String[] {
+ "--ryaInstance", "" + ryaInstance,
+ "--kafkaHostname", kafka.getKafkaHostname(),
+ "--kafkaPort", kafka.getKafkaPort(),
+ "--queryID", query2Id.toString()
+ };
+
+ final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
+ deleteCommand.execute(deleteArgs);
+
+ // Show query2 was deleted.
+ try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) {
+ queries = repo2.list();
+ assertEquals(2, queries.size());
+
+ for(final StreamsQuery query : queries) {
+ assertNotEquals(query2Id, query.getQueryId());
+ }
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index b32967e..00b4ce0 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -18,16 +18,10 @@
*/
package org.apache.rya.streams.client.command;
-import java.util.Properties;
import java.util.UUID;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
@@ -39,6 +33,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -50,52 +45,29 @@ import org.junit.Test;
public class ListQueryCommandIT {
private final String ryaInstance = UUID.randomUUID().toString();
-
- private String kafkaIp;
- private String kafkaPort;
private QueryRepository queryRepo;
- private Producer<?, QueryChange> queryProducer = null;
- private Consumer<?, QueryChange> queryConsumer = null;
-
@Rule
- public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
@Before
public void setup() {
- final Properties props = rule.createBootstrapServerConfig();
- final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
- final String[] tokens = location.split(":");
-
- kafkaIp = tokens[0];
- kafkaPort = tokens[1];
-
- // Initialize the QueryRepository.
- final Properties producerProperties = new Properties();
- producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
- producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
-
- final Properties consumerProperties = new Properties();
- consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
- consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
-
- queryProducer = new KafkaProducer<>(producerProperties);
- queryConsumer = new KafkaConsumer<>(consumerProperties);
-
+ // Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+ kafka.createTopic(changeLogTopic);
+
+ // Setup the QueryRepository used by the test.
+ final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
+ final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
queryRepo = new InMemoryQueryRepository(changeLog);
}
@After
- public void cleanup() {
- queryProducer.close();
- queryConsumer.close();
+ public void cleanup() throws Exception {
+ queryRepo.close();
}
-
@Test
public void shortParams() throws Exception {
// Add a few queries to Rya Streams.
@@ -106,8 +78,8 @@ public class ListQueryCommandIT {
// Execute the List Queries command.
final String[] args = new String[] {
"-r", "" + ryaInstance,
- "-i", kafkaIp,
- "-p", kafkaPort
+ "-i", kafka.getKafkaHostname(),
+ "-p", kafka.getKafkaPort()
};
final ListQueriesCommand command = new ListQueriesCommand();
@@ -124,8 +96,8 @@ public class ListQueryCommandIT {
// Execute the List Queries command.
final String[] args = new String[] {
"--ryaInstance", "" + ryaInstance,
- "--kafkaHostname", kafkaIp,
- "--kafkaPort", kafkaPort
+ "--kafkaHostname", kafka.getKafkaHostname(),
+ "--kafkaPort", kafka.getKafkaPort()
};
final ListQueriesCommand command = new ListQueriesCommand();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
index 95a4876..03c31b4 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/LoadStatementsCommandIT.java
@@ -26,21 +26,16 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.Properties;
import java.util.UUID;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Before;
+import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
@@ -55,21 +50,8 @@ public class LoadStatementsCommandIT {
private final String ryaInstance = UUID.randomUUID().toString();
- private String kafkaIp;
- private String kafkaPort;
-
@Rule
- public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
-
- @Before
- public void setup() {
- final Properties props = rule.createBootstrapServerConfig();
- final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
- final String[] tokens = location.split(":");
-
- kafkaIp = tokens[0];
- kafkaPort = tokens[1];
- }
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
@Test
public void shortParams() throws Exception {
@@ -77,35 +59,27 @@ public class LoadStatementsCommandIT {
final String visibilities = "a|b|c";
final String[] args = new String[] {
"-r", "" + ryaInstance,
- "-i", kafkaIp,
- "-p", kafkaPort,
+ "-i", kafka.getKafkaHostname(),
+ "-p", kafka.getKafkaPort(),
"-f", TURTLE_FILE.toString(),
"-v", visibilities
};
+ // Load the file of statements into the Statements topic.
new LoadStatementsCommand().execute(args);
// Show that the statements were loaded into the topic.
- // Read a VisibilityBindingSet from the test topic.
final List<VisibilityStatement> read = new ArrayList<>();
- final Properties consumerProps = new Properties();
- consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
- consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
- consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName());
-
- try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) {
- final String topic = KafkaTopics.statementsTopic(ryaInstance);
- consumer.subscribe(Arrays.asList(topic));
- final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000);
+ try(final Consumer<String, VisibilityStatement> consumer =
+ KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityStatementDeserializer.class)) {
+ // Subscribe for messages.
+ consumer.subscribe( Arrays.asList(KafkaTopics.statementsTopic(ryaInstance)) );
- assertEquals(3, records.count());
- final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator();
+ // Read the messages and extract their values.
+ final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = consumer.poll(3000).iterator();
while(iter.hasNext()) {
- final VisibilityStatement visiSet = iter.next().value();
- read.add(visiSet);
+ read.add( iter.next().value() );
}
}
@@ -131,35 +105,27 @@ public class LoadStatementsCommandIT {
final String visibilities = "a|b|c";
final String[] args = new String[] {
"--ryaInstance", "" + ryaInstance,
- "--kafkaHostname", kafkaIp,
- "--kafkaPort", kafkaPort,
+ "--kafkaHostname", kafka.getKafkaHostname(),
+ "--kafkaPort", kafka.getKafkaPort(),
"--statementsFile", TURTLE_FILE.toString(),
"--visibilities", visibilities
};
+ // Load the file of statements into the Statements topic.
new LoadStatementsCommand().execute(args);
// Show that the statements were loaded into the topic.
- // Read a VisibilityBindingSet from the test topic.
final List<VisibilityStatement> read = new ArrayList<>();
- final Properties consumerProps = new Properties();
- consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaIp + ":" + kafkaPort);
- consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
- consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName());
-
- try(final Consumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) {
- final String topic = KafkaTopics.statementsTopic(ryaInstance);
- consumer.subscribe(Arrays.asList(topic));
- final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(3000);
+ try(final Consumer<String, VisibilityStatement> consumer =
+ KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityStatementDeserializer.class)) {
+ // Subscribe for messages.
+ consumer.subscribe( Arrays.asList(KafkaTopics.statementsTopic(ryaInstance)) );
- assertEquals(3, records.count());
- final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator();
+ // Read the messages and extract their values.
+ final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = consumer.poll(3000).iterator();
while(iter.hasNext()) {
- final VisibilityStatement visiSet = iter.next().value();
- read.add(visiSet);
+ read.add( iter.next().value() );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
new file mode 100644
index 0000000..788b41f
--- /dev/null
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.command;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.interactor.LoadStatements;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.client.RyaStreamsCommand.ArgumentsException;
+import org.apache.rya.streams.client.RyaStreamsCommand.ExecutionException;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.apache.rya.test.kafka.KafkaTestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Integration tests the methods of {@link RunQueryCommand}.
+ */
+public class RunQueryCommandIT {
+
+ private final String ryaInstance = UUID.randomUUID().toString();
+
+ private QueryRepository queryRepo;
+ private Producer<String, VisibilityStatement> stmtProducer = null;
+ private Consumer<String, VisibilityBindingSet> resultConsumer = null;
+
+ @Rule
+ public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+ @Before
+ public void setup() {
+ // Make sure the topic that the change log uses exists.
+ final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
+ kafka.createTopic(changeLogTopic);
+
+ // Setup the QueryRepository used by the test.
+ final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
+ final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
+ final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+ queryRepo = new InMemoryQueryRepository(changeLog);
+
+ // Initialize the Statements Producer and the Results Consumer.
+ stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
+ resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
+ }
+
+ @After
+ public void cleanup() throws Exception{
+ stmtProducer.close();
+ resultConsumer.close();
+ queryRepo.close();
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void runUnregisteredQuery() throws Exception {
+ // Arguments that run a query that is not registered with Rya Streams.
+ final String[] args = new String[] {
+ "--ryaInstance", "" + ryaInstance,
+ "--kafkaHostname", kafka.getKafkaHostname(),
+ "--kafkaPort", kafka.getKafkaPort(),
+ "--queryID", UUID.randomUUID().toString()
+ };
+
+ // Run the test. This will throw an exception.
+ final RunQueryCommand command = new RunQueryCommand();
+ command.execute(args);
+ }
+
+ @Test
+ public void runQuery() throws Exception {
+ // Register a query with the Query Repository.
+ final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }");
+
+ // Arguments that run the query we just registered with Rya Streams.
+ final String[] args = new String[] {
+ "--ryaInstance", "" + ryaInstance,
+ "--kafkaHostname", kafka.getKafkaHostname(),
+ "--kafkaPort", kafka.getKafkaPort(),
+ "--queryID", sQuery.getQueryId().toString(),
+ "--zookeepers", kafka.getZookeeperServers()
+ };
+
+ // Create a new Thread that runs the command.
+ final Thread commandThread = new Thread() {
+ @Override
+ public void run() {
+ final RunQueryCommand command = new RunQueryCommand();
+ try {
+ command.execute(args);
+ } catch (ArgumentsException | ExecutionException e) {
+ // Do nothing. Test will still fail because the expected results will be missing.
+ }
+ }
+ };
+
+ // Create the statements that will be loaded.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final List<VisibilityStatement> statements = new ArrayList<>();
+ statements.add(new VisibilityStatement(vf.createStatement(
+ vf.createURI("urn:Alice"),
+ vf.createURI("urn:worksAt"),
+ vf.createURI("urn:BurgerJoint")), "a"));
+ statements.add(new VisibilityStatement(vf.createStatement(
+ vf.createURI("urn:Bob"),
+ vf.createURI("urn:worksAt"),
+ vf.createURI("urn:TacoShop")), "a"));
+ statements.add(new VisibilityStatement(vf.createStatement(
+ vf.createURI("urn:Charlie"),
+ vf.createURI("urn:worksAt"),
+ vf.createURI("urn:TacoShop")), "a"));
+
+ // Create the expected results.
+ final List<VisibilityBindingSet> expected = new ArrayList<>();
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Alice"));
+ bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Bob"));
+ bs.addBinding("business", vf.createURI("urn:TacoShop"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+ bs = new MapBindingSet();
+ bs.addBinding("person", vf.createURI("urn:Charlie"));
+ bs.addBinding("business", vf.createURI("urn:TacoShop"));
+ expected.add(new VisibilityBindingSet(bs, "a"));
+
+ // Execute the test. This will result in a set of results that were read from the results topic.
+ final List<VisibilityBindingSet> results;
+ try {
+ // Wait for the program to start.
+ commandThread.start();
+ Thread.sleep(5000);
+
+ // Write some statements to the program.
+ final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance);
+ final LoadStatements loadStatements = new KafkaLoadStatements(statementsTopic, stmtProducer);
+ loadStatements.fromCollection(statements);
+
+ // Read the output of the streams program.
+ final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId());
+ resultConsumer.subscribe( Lists.newArrayList(resultsTopic) );
+ results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer);
+ } finally {
+ // Tear down the test.
+ commandThread.interrupt();
+ commandThread.join(3000);
+ }
+
+ // Show the read results matched the expected ones.
+ assertEquals(expected, results);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 16a8b8e..0ccbb6e 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -55,6 +55,16 @@ under the License.
<!-- Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
index a8fbf23..3e0df50 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
@@ -20,12 +20,19 @@ package org.apache.rya.streams.kafka;
import static java.util.Objects.requireNonNull;
+import java.util.Properties;
+import java.util.Set;
import java.util.UUID;
+import org.I0Itec.zkclient.ZkClient;
import org.apache.rya.streams.api.queries.QueryChangeLog;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
/**
* Creates the Kafka topic names that are used for Rya Streams systems.
@@ -66,4 +73,36 @@ public class KafkaTopics {
requireNonNull(queryId);
return "QueryResults-" + queryId.toString();
}
+
+ /**
+ * Creates a set of Kafka topics for each topic that does not already exist.
+ *
+ * @param zookeeperServers - The Zookeeper servers that are used by the Kafka Streams program. (not null)
+ * @param topicNames - The topics that will be created. (not null)
+ * @param partitions - The number of partitions that each of the topics will have.
+ * @param replicationFactor - The replication factor of the topics that are created.
+ */
+ public static void createTopic(
+ final String zookeeperServers,
+ final Set<String> topicNames,
+ final int partitions,
+ final int replicationFactor) {
+ requireNonNull(zookeeperServers);
+ requireNonNull(topicNames);
+
+ ZkUtils zkUtils = null;
+ try {
+ zkUtils = ZkUtils.apply(new ZkClient(zookeeperServers, 30000, 30000, ZKStringSerializer$.MODULE$), false);
+ for(final String topicName : topicNames) {
+ if(!AdminUtils.topicExists(zkUtils, topicName)) {
+ AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ }
+ }
+ }
+ finally {
+ if(zkUtils != null) {
+ zkUtils.close();
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
index 8ab3ab6..d3ec650 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
@@ -64,7 +64,6 @@ public class KafkaLoadStatements implements LoadStatements {
this.producer = requireNonNull(producer);
}
-
@Override
public void fromFile(final Path statementsPath, final String visibilities) throws RyaStreamsException {
requireNonNull(statementsPath);
@@ -77,7 +76,7 @@ public class KafkaLoadStatements implements LoadStatements {
parser.setRDFHandler(new RDFHandlerBase() {
@Override
public void startRDF() throws RDFHandlerException {
- log.trace("starting loading statements.");
+ log.trace("Starting loading statements.");
}
@Override
@@ -89,7 +88,7 @@ public class KafkaLoadStatements implements LoadStatements {
@Override
public void endRDF() throws RDFHandlerException {
producer.flush();
- log.trace("done.");
+ log.trace("Done.");
}
});
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
new file mode 100644
index 0000000..e587998
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.interactor;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.RunQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Streams implementation of {@link RunQuery}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaRunQuery implements RunQuery {
+ private static final Logger log = LoggerFactory.getLogger(KafkaRunQuery.class);
+
+ private final String kafkaHostname;
+ private final String kafkaPort;
+ private final String statementsTopic;
+ private final String resultsTopic;
+ private final TopologyBuilderFactory topologyFactory;
+ private final QueryRepository queryRepo;
+
+ /**
+ * Constructs an instance of {@link KafkaRunQuery}.
+ *
+ * @param kafkaHostname - The hostname of the Kafka Broker to connect to. (not null)
+ * @param kafkaPort - The port of the Kafka Broker to connect to. (not null)
+ * @param statementsTopic - The name of the topic that statements will be read from. (not null)
+ * @param resultsTopic - The name of the topic that query results will be writen to. (not null)
+ * @param queryRepo - The query repository that holds queries that are registered. (not null)
+ * @param topologyFactory - Builds Kafka Stream processing topologies from SPARQL. (not null)
+ */
+ public KafkaRunQuery(
+ final String kafkaHostname,
+ final String kafkaPort,
+ final String statementsTopic,
+ final String resultsTopic,
+ final QueryRepository queryRepo,
+ final TopologyBuilderFactory topologyFactory) {
+ this.kafkaHostname = requireNonNull( kafkaHostname );
+ this.kafkaPort = requireNonNull( kafkaPort );
+ this.statementsTopic = requireNonNull(statementsTopic );
+ this.resultsTopic = requireNonNull( resultsTopic );
+ this.topologyFactory = requireNonNull( topologyFactory );
+ this.queryRepo = requireNonNull( queryRepo );
+ }
+
+ @Override
+ public void run(final UUID queryId) throws RyaStreamsException {
+ requireNonNull(queryId);
+
+ // Fetch the query from the repository. Throw an exception if it isn't present.
+ final Optional<StreamsQuery> query = queryRepo.get(queryId);
+ if(!query.isPresent()) {
+ throw new RyaStreamsException("Could not run the Query with ID " + queryId + " because no such query " +
+ "is currently registered.");
+ }
+
+ // Build a processing topology using the SPARQL, provided statements topic, and provided results topic.
+ final String sparql = query.get().getSparql();
+ final TopologyBuilder topologyBuilder;
+ try {
+ topologyBuilder = topologyFactory.build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory());
+ } catch (final Exception e) {
+ throw new RyaStreamsException("Could not run the Query with ID " + queryId + " because a processing " +
+ "topolgoy could not be built for the SPARQL " + sparql, e);
+ }
+
+ // Setup the Kafka Stream program.
+ final Properties streamsProps = new Properties();
+ streamsProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kafkaPort);
+ streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "KafkaRunQuery-" + UUID.randomUUID());
+
+ final KafkaStreams streams = new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps));
+
+ // If an unhandled exception is thrown, rethrow it.
+ streams.setUncaughtExceptionHandler((t, e) -> {
+ // Log the problem and kill the program.
+ log.error("Unhandled exception while processing the Rya Streams query. Shutting down.", e);
+ System.exit(1);
+ });
+
+ // Setup a shutdown hook that kills the streams program at shutdown.
+ final CountDownLatch awaitTermination = new CountDownLatch(1);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ awaitTermination.countDown();
+ }
+ });
+
+ // Run the streams program and wait for termination.
+ streams.start();
+ try {
+ awaitTermination.await();
+ } catch (final InterruptedException e) {
+ log.warn("Interrupted while waiting for termination. Shutting down.");
+ }
+ streams.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
index d73b40e..d12957a 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
@@ -34,6 +34,7 @@ import org.openrdf.query.impl.MapBindingSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -70,12 +71,12 @@ public class KeyValueJoinStateStore implements JoinStateStore {
/**
* This is the minimum value in UTF-8 character.
*/
- private static final String START_RANGE_SUFFIX = new String(new byte[] { 0x00 } );
+ private static final String START_RANGE_SUFFIX = new String(new byte[] { 0x00 }, Charsets.UTF_8);
/**
* This is the maximum value of a UTF-8 character.
*/
- private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF } );
+ private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF }, Charsets.UTF_8);
/**
* A default empty value that is stored for a start of range or end of range marker.