You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:40 UTC
[23/50] [abbrv] incubator-rya git commit: RYA-377 Updated the Rya
Streams client to be able to stream VisibilityStatement results to the
console.
RYA-377 Updated the Rya Streams client to be able to stream VisibilityStatement results to the console.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/bd36443d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/bd36443d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/bd36443d
Branch: refs/heads/master
Commit: bd36443de7e824c9fe8f0a97d3ef7a75c223271c
Parents: 95df37a
Author: kchilton2 <ke...@gmail.com>
Authored: Thu Nov 30 17:25:31 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500
----------------------------------------------------------------------
extras/rya.streams/api/pom.xml | 5 ++
.../streams/api/entity/QueryResultStream.java | 13 ++--
.../api/interactor/GetQueryResultStream.java | 8 ++-
.../interactor/defaults/DefaultAddQuery.java | 16 ++++-
.../defaults/DefaultAddQueryTest.java | 64 ++++++++++++++++++++
.../client/command/StreamResultsCommand.java | 54 +++++++++++++++--
extras/rya.streams/kafka/pom.xml | 2 +-
.../kafka/entity/KafkaQueryResultStream.java | 11 ++--
.../interactor/KafkaGetQueryResultStream.java | 27 ++++++---
.../interactor/KafkaGetQueryResultStreamIT.java | 59 ++++++++++++++----
10 files changed, 216 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/pom.xml b/extras/rya.streams/api/pom.xml
index 2a1f51c..250028f 100644
--- a/extras/rya.streams/api/pom.xml
+++ b/extras/rya.streams/api/pom.xml
@@ -41,6 +41,11 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-queryparser-sparql</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
index aa5dcfd..8f1e589 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
@@ -22,17 +22,18 @@ import static java.util.Objects.requireNonNull;
import java.util.UUID;
-import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
- * An infinite stream of {@link VisibilityBindingSet}s that are the results of a query within Rya Streams.
+ * An infinite stream of values that are the results of a query within Rya Streams.
+ *
+ * @param <V> - The query results' value type.
*/
@DefaultAnnotation(NonNull.class)
-public abstract class QueryResultStream implements AutoCloseable {
+public abstract class QueryResultStream<V> implements AutoCloseable {
private final UUID queryId;
@@ -57,10 +58,10 @@ public abstract class QueryResultStream implements AutoCloseable {
* Wait at most {@code timeoutMs} milliseconds for the next collection of results.
*
* @param timeoutMs - The number of milliseconds to at most wait for the next collection of results. (not null)
- * @return The next collection of {@link VisibilityBindingSet}s that are the result of the query. Empty if
- * there where no new results within the timout period.
+ * @return The next collection of values that are the result of the query. Empty if there where no new results
+ * within the timeout period.
* @throws IllegalStateException If the stream has been closed.
* @throws RyaStreamsException Could not fetch the next set of results.
*/
- public abstract Iterable<VisibilityBindingSet> poll(long timeoutMs) throws IllegalStateException, RyaStreamsException;
+ public abstract Iterable<V> poll(long timeoutMs) throws IllegalStateException, RyaStreamsException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
index 9ca577c..951d060 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
@@ -28,9 +28,11 @@ import edu.umd.cs.findbugs.annotations.NonNull;
/**
* Get a {@link QueryResultStream} over the results of a query that is being managed by Rya Streams.
+ *
+ * @param <T> - The type of results that are in the result stream.
*/
@DefaultAnnotation(NonNull.class)
-public interface GetQueryResultStream {
+public interface GetQueryResultStream<T> {
/**
* Stream all of the results that have been produced by a query.
@@ -39,7 +41,7 @@ public interface GetQueryResultStream {
* @return A {@link QueryResultStream} that starts with the first result that was ever produced.
* @throws RyaStreamsException Could not create the result stream.
*/
- public QueryResultStream fromStart(UUID queryId) throws RyaStreamsException;
+ public QueryResultStream<T> fromStart(UUID queryId) throws RyaStreamsException;
/**
* Stream results that have been produced by a query after this method was invoked.
@@ -48,5 +50,5 @@ public interface GetQueryResultStream {
* @return A {@link QueryResultStream} that only returns results that were produced after this method is invoked.
* @throws RyaStreamsException Could not create the result stream.
*/
- public QueryResultStream fromNow(UUID queryId) throws RyaStreamsException;
+ public QueryResultStream<T> fromNow(UUID queryId) throws RyaStreamsException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
index 9704322..f94835c 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
@@ -24,6 +24,8 @@ import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.AddQuery;
import org.apache.rya.streams.api.queries.QueryRepository;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.parser.sparql.SPARQLParser;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -35,6 +37,8 @@ import edu.umd.cs.findbugs.annotations.NonNull;
public class DefaultAddQuery implements AddQuery {
private final QueryRepository repository;
+ private final SPARQLParser parser = new SPARQLParser();
+
/**
* Creates a new {@link DefaultAddQuery}.
*
@@ -46,6 +50,16 @@ public class DefaultAddQuery implements AddQuery {
@Override
public StreamsQuery addQuery(final String query) throws RyaStreamsException {
+ requireNonNull(query);
+
+ // Make sure the SPARQL is valid.
+ try {
+ parser.parseQuery(query, null);
+ } catch (final MalformedQueryException e) {
+ throw new RyaStreamsException("Could not add the query because the SPARQL is invalid.", e);
+ }
+
+ // If it is, then store it in the repository.
return repository.add(query);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
new file mode 100644
index 0000000..88be6e7
--- /dev/null
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.AddQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link DefaultAddQuery}.
+ */
+public class DefaultAddQueryTest {
+
+ @Test
+ public void addQuery_validSparql() throws Exception {
+ // Valid SPARQL.
+ final String sparql = "SELECT * WHERE { ?person <urn:worksAt> ?business }";
+
+ // Setup the interactor.
+ final QueryRepository repo = mock(QueryRepository.class);
+ final AddQuery addQuery = new DefaultAddQuery(repo);
+
+ // Add the query.
+ addQuery.addQuery(sparql);
+
+ // Verify the call was forwarded to the repository.
+ verify(repo, times(1)).add(eq(sparql));
+ }
+
+ @Test(expected = RyaStreamsException.class)
+ public void addQuery_invalidSparql() throws Exception {
+ // Inalid SPARQL.
+ final String sparql = "This is not sparql.";
+
+ // Setup the interactor.
+ final QueryRepository repo = mock(QueryRepository.class);
+ final AddQuery addQuery = new DefaultAddQuery(repo);
+
+ // Add the query.
+ addQuery.addQuery(sparql);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 64f78a3..7c548f1 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -20,14 +20,26 @@ package org.apache.rya.streams.client.command;
import static java.util.Objects.requireNonNull;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.streams.api.entity.QueryResultStream;
+import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.interactor.GetQueryResultStream;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Reduced;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.parser.sparql.SPARQLParser;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
@@ -107,6 +119,12 @@ public class StreamResultsCommand implements RyaStreamsCommand {
throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e);
}
+ // Create the Kafka backed QueryChangeLog.
+ final String bootstrapServers = params.kafkaIP + ":" + params.kafkaPort;
+ final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
+ final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
+
+ // Parse the Query ID from the command line parameters.
final UUID queryId;
try {
queryId = UUID.fromString( params.queryId );
@@ -114,6 +132,19 @@ public class StreamResultsCommand implements RyaStreamsCommand {
throw new ArgumentsException("Invalid Query ID " + params.queryId);
}
+ // Fetch the SPARQL of the query whose results will be streamed.
+ final String sparql;
+ try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+ final Optional<StreamsQuery> sQuery = queryRepo.get(queryId);
+ if(!sQuery.isPresent()) {
+ throw new ExecutionException("Could not read the results for query with ID " + queryId +
+ " because no such query exists.");
+ }
+ sparql = sQuery.get().getSparql();
+ } catch (final Exception e) {
+ throw new ExecutionException("Problem encountered while closing the QueryRepository.", e);
+ }
+
// This command executes until the application is killed, so create a kill boolean.
final AtomicBoolean finished = new AtomicBoolean(false);
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -123,13 +154,24 @@ public class StreamResultsCommand implements RyaStreamsCommand {
}
});
- // Execute the command.
- final GetQueryResultStream getQueryResultStream = new KafkaGetQueryResultStream(params.kafkaIP, params.kafkaPort);
+ // Build the interactor based on the type of result the query produces.
+ final GetQueryResultStream<?> getQueryResultStream;
+ try {
+ final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+ if(tupleExpr instanceof Reduced) {
+ getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityStatementDeserializer.class);
+ } else {
+ getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityBindingSetDeserializer.class);
+ }
+ } catch (final MalformedQueryException e) {
+ throw new ExecutionException("Could not parse the SPARQL for the query: " + sparql, e);
+ }
- try (final QueryResultStream stream = getQueryResultStream.fromStart(queryId)) {
+ // Iterate through the results and print them to the console until the program or the stream ends.
+ try (final QueryResultStream<?> stream = getQueryResultStream.fromStart(queryId)) {
while(!finished.get()) {
- for(final VisibilityBindingSet visBs : stream.poll(1000)) {
- System.out.println(visBs);
+ for(final Object result : stream.poll(1000)) {
+ System.out.println(result);
}
}
} catch (final Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 2d33f32..8926870 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -41,7 +41,7 @@ under the License.
<dependency>
<groupId>org.apache.rya</groupId>
<artifactId>rya.pcj.functions.geo</artifactId>
- <version>3.2.12-incubating-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rya</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
index 360aaa2..02a3812 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.streams.api.entity.QueryResultStream;
import org.apache.rya.streams.api.exception.RyaStreamsException;
@@ -37,11 +36,13 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* A Kafka implementation of {@link QueryResultStream}. It delegates the {@link #poll(long)} method to
* a {@link Consumer}. As a result, the starting point of this stream is whatever position the consumer
* starts at within the Kafka topic.
+ *
+ * @param <V> - The type of the consumed records' value.
*/
@DefaultAnnotation(NonNull.class)
-public class KafkaQueryResultStream extends QueryResultStream {
+public class KafkaQueryResultStream<V> extends QueryResultStream<V> {
- private final Consumer<?, VisibilityBindingSet> consumer;
+ private final Consumer<?, V> consumer;
/**
* Constructs an instance of {@link KafkaQueryResultStream}.
@@ -49,13 +50,13 @@ public class KafkaQueryResultStream extends QueryResultStream {
* @param queryId - The query the results are for. (not null)
* @param consumer - The consumer that will be polled by this class. (not null)
*/
- public KafkaQueryResultStream(final UUID queryId, final Consumer<?, VisibilityBindingSet> consumer) {
+ public KafkaQueryResultStream(final UUID queryId, final Consumer<?, V> consumer) {
super(queryId);
this.consumer = requireNonNull(consumer);
}
@Override
- public Iterable<VisibilityBindingSet> poll(final long timeoutMs) throws RyaStreamsException {
+ public Iterable<V> poll(final long timeoutMs) throws RyaStreamsException {
return new RecordEntryIterable<>( consumer.poll(timeoutMs) );
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
index b3c3fea..529b493 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
@@ -26,40 +26,47 @@ import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.streams.api.entity.QueryResultStream;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.GetQueryResultStream;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.entity.KafkaQueryResultStream;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
* A Kafka topic implementation of {@link GetQueryResultStream}.
+ *
+ * @param <T> - The type of results that are in the result stream.
*/
@DefaultAnnotation(NonNull.class)
-public class KafkaGetQueryResultStream implements GetQueryResultStream {
+public class KafkaGetQueryResultStream<T> implements GetQueryResultStream<T> {
private final String bootstrapServers;
+ private final Class<? extends Deserializer<T>> deserializerClass;
/**
* Constructs an instance of {@link KafkaGetQueryResultStream}.
*
* @param kafkaHostname - The hostname of the Kafka Broker to connect to. (not null)
* @param kafkaPort - The port of the Kafka Broker to connect to. (not null)
+ * @param deserializerClass - The value deserializer to use when reading from the Kafka topic. (not null)
*/
- public KafkaGetQueryResultStream(final String kafkaHostname, final String kafkaPort) {
+ public KafkaGetQueryResultStream(
+ final String kafkaHostname,
+ final String kafkaPort,
+ final Class<? extends Deserializer<T>> deserializerClass) {
requireNonNull(kafkaHostname);
requireNonNull(kafkaPort);
bootstrapServers = kafkaHostname + ":" + kafkaPort;
+ this.deserializerClass = requireNonNull(deserializerClass);
}
@Override
- public QueryResultStream fromStart(final UUID queryId) throws RyaStreamsException {
+ public QueryResultStream<T> fromStart(final UUID queryId) throws RyaStreamsException {
requireNonNull(queryId);
// Always start at the earliest point within the topic.
@@ -67,21 +74,21 @@ public class KafkaGetQueryResultStream implements GetQueryResultStream {
}
@Override
- public QueryResultStream fromNow(final UUID queryId) throws RyaStreamsException {
+ public QueryResultStream<T> fromNow(final UUID queryId) throws RyaStreamsException {
requireNonNull(queryId);
// Always start at the latest point within the topic.
return makeStream(queryId, "latest");
}
- private QueryResultStream makeStream(final UUID queryId, final String autoOffsetResetConfig) {
+ private QueryResultStream<T> makeStream(final UUID queryId, final String autoOffsetResetConfig) {
// Configure which instance of Kafka to connect to.
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Nothing meaningful is in the key and the value is a VisibilityBindingSet.
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityBindingSetDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerClass);
// Use a UUID for the Group Id so that we never register as part of the same group as another consumer.
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
@@ -96,13 +103,13 @@ public class KafkaGetQueryResultStream implements GetQueryResultStream {
// We are not closing the consumer here because the returned QueryResultStream is responsible for closing the
// underlying resources required to process it.
- final KafkaConsumer<Object, VisibilityBindingSet> consumer = new KafkaConsumer<>(props);
+ final KafkaConsumer<String, T> consumer = new KafkaConsumer<>(props);
// Register the consumer for the query's results.
final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
consumer.subscribe(Arrays.asList(resultTopic));
// Return the result stream.
- return new KafkaQueryResultStream(queryId, consumer);
+ return new KafkaQueryResultStream<>(queryId, consumer);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
index c740ba2..8882753 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
@@ -29,10 +29,14 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.api.entity.QueryResultStream;
import org.apache.rya.streams.api.interactor.GetQueryResultStream;
import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.Rule;
@@ -54,23 +58,23 @@ public class KafkaGetQueryResultStreamIT {
* the target number of results, or it hits the target number of results.
*
* @param pollMs - How long each poll could take.
- * @param pollIterations - The maximum nubmer of polls that will be attempted.
+ * @param pollIterations - The maximum number of polls that will be attempted.
* @param targetSize - The number of results to read before stopping.
* @param stream - The stream that will be polled.
* @return The results that were read from the stream.
* @throws Exception If the poll failed.
*/
- private List<VisibilityBindingSet> pollForResults(
+ private <T> List<T> pollForResults(
final int pollMs,
final int pollIterations,
final int targetSize,
- final QueryResultStream stream) throws Exception{
- final List<VisibilityBindingSet> read = new ArrayList<>();
+ final QueryResultStream<T> stream) throws Exception{
+ final List<T> read = new ArrayList<>();
int i = 0;
while(read.size() < targetSize && i < pollIterations) {
- for(final VisibilityBindingSet visBs : stream.poll(pollMs)) {
- read.add( visBs );
+ for(final T result : stream.poll(pollMs)) {
+ read.add( result );
}
i++;
}
@@ -109,7 +113,8 @@ public class KafkaGetQueryResultStreamIT {
}
// Use the interactor that is being tested to read all of the visibility binding sets.
- final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort());
+ final GetQueryResultStream<VisibilityBindingSet> interactor =
+ new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(queryId));
// Show the fetched binding sets match the original, as well as their order.
@@ -133,8 +138,9 @@ public class KafkaGetQueryResultStreamIT {
producer.flush();
// Use the interactor that is being tested to read all of the visibility binding sets that appear after this point.
- final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort());
- try(QueryResultStream results = interactor.fromNow(queryId)) {
+ final GetQueryResultStream<VisibilityBindingSet> interactor =
+ new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
+ try(QueryResultStream<VisibilityBindingSet> results = interactor.fromNow(queryId)) {
// Read results from the stream.
List<VisibilityBindingSet> read = new ArrayList<>();
for(final VisibilityBindingSet visBs : results.poll(500)) {
@@ -175,11 +181,42 @@ public class KafkaGetQueryResultStreamIT {
final UUID queryId = UUID.randomUUID();
// Use the interactor that is being tested to create a result stream and immediately close it.
- final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort());
- final QueryResultStream results = interactor.fromStart(queryId);
+ final GetQueryResultStream<VisibilityBindingSet> interactor =
+ new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
+ final QueryResultStream<VisibilityBindingSet> results = interactor.fromStart(queryId);
results.close();
// Try to poll the closed stream.
results.poll(1);
}
+
+ @Test
+ public void fromStart_visibilityStatements() throws Exception {
+ // Create an ID for the query.
+ final UUID queryId = UUID.randomUUID();
+
+ // Create some statements that will be written to the result topic.
+ final List<VisibilityStatement> original = new ArrayList<>();
+ final ValueFactory vf = new ValueFactoryImpl();
+ original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+ original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(63)), "b") );
+ original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral("urn:34")), "") );
+
+ // Write the entries to the query result topic in Kafka.
+ try(final Producer<?, VisibilityStatement> producer =
+ KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
+ final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+ for(final VisibilityStatement visStmt : original) {
+ producer.send(new ProducerRecord<>(resultTopic, visStmt));
+ }
+ }
+
+ // Use the interactor that is being tested to read all of the visibility binding sets.
+ final GetQueryResultStream<VisibilityStatement> interactor =
+ new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityStatementDeserializer.class);
+ final List<VisibilityStatement> read = pollForResults(500, 3, 3, interactor.fromStart(queryId));
+
+ // Show the fetched binding sets match the original, as well as their order.
+ assertEquals(original, read);
+ }
}
\ No newline at end of file