You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:40 UTC

[23/50] [abbrv] incubator-rya git commit: RYA-377 Updated the Rya Streams client to be able to stream VisibilityStatement results to the console.

RYA-377 Updated the Rya Streams client to be able to stream VisibilityStatement results to the console.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/bd36443d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/bd36443d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/bd36443d

Branch: refs/heads/master
Commit: bd36443de7e824c9fe8f0a97d3ef7a75c223271c
Parents: 95df37a
Author: kchilton2 <ke...@gmail.com>
Authored: Thu Nov 30 17:25:31 2017 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:01 2018 -0500

----------------------------------------------------------------------
 extras/rya.streams/api/pom.xml                  |  5 ++
 .../streams/api/entity/QueryResultStream.java   | 13 ++--
 .../api/interactor/GetQueryResultStream.java    |  8 ++-
 .../interactor/defaults/DefaultAddQuery.java    | 16 ++++-
 .../defaults/DefaultAddQueryTest.java           | 64 ++++++++++++++++++++
 .../client/command/StreamResultsCommand.java    | 54 +++++++++++++++--
 extras/rya.streams/kafka/pom.xml                |  2 +-
 .../kafka/entity/KafkaQueryResultStream.java    | 11 ++--
 .../interactor/KafkaGetQueryResultStream.java   | 27 ++++++---
 .../interactor/KafkaGetQueryResultStreamIT.java | 59 ++++++++++++++----
 10 files changed, 216 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/pom.xml b/extras/rya.streams/api/pom.xml
index 2a1f51c..250028f 100644
--- a/extras/rya.streams/api/pom.xml
+++ b/extras/rya.streams/api/pom.xml
@@ -41,6 +41,11 @@ under the License.
         </dependency>
         
         <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryparser-sparql</artifactId>
+        </dependency>
+        
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
index aa5dcfd..8f1e589 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java
@@ -22,17 +22,18 @@ import static java.util.Objects.requireNonNull;
 
 import java.util.UUID;
 
-import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
- * An infinite stream of {@link VisibilityBindingSet}s that are the results of a query within Rya Streams.
+ * An infinite stream of values that are the results of a query within Rya Streams.
+ *
+ * @param <V> - The query results' value type.
  */
 @DefaultAnnotation(NonNull.class)
-public abstract class QueryResultStream implements AutoCloseable {
+public abstract class QueryResultStream<V> implements AutoCloseable {
 
     private final UUID queryId;
 
@@ -57,10 +58,10 @@ public abstract class QueryResultStream implements AutoCloseable {
      * Wait at most {@code timeoutMs} milliseconds for the next collection of results.
      *
      * @param timeoutMs - The number of milliseconds to at most wait for the next collection of results. (not null)
-     * @return The next collection of {@link VisibilityBindingSet}s that are the result of the query. Empty if
-     *   there where no new results within the timout period.
+     * @return The next collection of values that are the result of the query. Empty if there where no new results
+     *   within the timeout period.
      * @throws IllegalStateException If the stream has been closed.
      * @throws RyaStreamsException Could not fetch the next set of results.
      */
-    public abstract Iterable<VisibilityBindingSet> poll(long timeoutMs) throws IllegalStateException, RyaStreamsException;
+    public abstract Iterable<V> poll(long timeoutMs) throws IllegalStateException, RyaStreamsException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
index 9ca577c..951d060 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQueryResultStream.java
@@ -28,9 +28,11 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Get a {@link QueryResultStream} over the results of a query that is being managed by Rya Streams.
+ *
+ * @param <T> - The type of results that are in the result stream.
  */
 @DefaultAnnotation(NonNull.class)
-public interface GetQueryResultStream {
+public interface GetQueryResultStream<T> {
 
     /**
      * Stream all of the results that have been produced by a query.
@@ -39,7 +41,7 @@ public interface GetQueryResultStream {
      * @return A {@link QueryResultStream} that starts with the first result that was ever produced.
      * @throws RyaStreamsException Could not create the result stream.
      */
-    public QueryResultStream fromStart(UUID queryId) throws RyaStreamsException;
+    public QueryResultStream<T> fromStart(UUID queryId) throws RyaStreamsException;
 
     /**
      * Stream results that have been produced by a query after this method was invoked.
@@ -48,5 +50,5 @@ public interface GetQueryResultStream {
      * @return A {@link QueryResultStream} that only returns results that were produced after this method is invoked.
      * @throws RyaStreamsException Could not create the result stream.
      */
-    public QueryResultStream fromNow(UUID queryId) throws RyaStreamsException;
+    public QueryResultStream<T> fromNow(UUID queryId) throws RyaStreamsException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
index 9704322..f94835c 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
@@ -24,6 +24,8 @@ import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.AddQuery;
 import org.apache.rya.streams.api.queries.QueryRepository;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.parser.sparql.SPARQLParser;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -35,6 +37,8 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 public class DefaultAddQuery implements AddQuery {
     private final QueryRepository repository;
 
+    private final SPARQLParser parser = new SPARQLParser();
+
     /**
      * Creates a new {@link DefaultAddQuery}.
      *
@@ -46,6 +50,16 @@ public class DefaultAddQuery implements AddQuery {
 
     @Override
     public StreamsQuery addQuery(final String query) throws RyaStreamsException {
+        requireNonNull(query);
+
+        // Make sure the SPARQL is valid.
+        try {
+            parser.parseQuery(query, null);
+        } catch (final MalformedQueryException e) {
+            throw new RyaStreamsException("Could not add the query because the SPARQL is invalid.", e);
+        }
+
+        // If it is, then store it in the repository.
         return repository.add(query);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
new file mode 100644
index 0000000..88be6e7
--- /dev/null
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.AddQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link DefaultAddQuery}.
+ */
+public class DefaultAddQueryTest {
+
+    @Test
+    public void addQuery_validSparql() throws Exception {
+        // Valid SPARQL.
+        final String sparql = "SELECT * WHERE { ?person <urn:worksAt> ?business }";
+
+        // Setup the interactor.
+        final QueryRepository repo = mock(QueryRepository.class);
+        final AddQuery addQuery = new DefaultAddQuery(repo);
+
+        // Add the query.
+        addQuery.addQuery(sparql);
+
+        // Verify the call was forwarded to the repository.
+        verify(repo, times(1)).add(eq(sparql));
+    }
+
+    @Test(expected = RyaStreamsException.class)
+    public void addQuery_invalidSparql() throws Exception {
+        // Inalid SPARQL.
+        final String sparql = "This is not sparql.";
+
+        // Setup the interactor.
+        final QueryRepository repo = mock(QueryRepository.class);
+        final AddQuery addQuery = new DefaultAddQuery(repo);
+
+        // Add the query.
+        addQuery.addQuery(sparql);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 64f78a3..7c548f1 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -20,14 +20,26 @@ package org.apache.rya.streams.client.command;
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.streams.api.entity.QueryResultStream;
+import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.interactor.GetQueryResultStream;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Reduced;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.parser.sparql.SPARQLParser;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -107,6 +119,12 @@ public class StreamResultsCommand implements RyaStreamsCommand {
             throw new ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e);
         }
 
+        // Create the Kafka backed QueryChangeLog.
+        final String bootstrapServers = params.kafkaIP + ":" + params.kafkaPort;
+        final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance);
+        final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
+
+        // Parse the Query ID from the command line parameters.
         final UUID queryId;
         try {
             queryId = UUID.fromString( params.queryId );
@@ -114,6 +132,19 @@ public class StreamResultsCommand implements RyaStreamsCommand {
             throw new ArgumentsException("Invalid Query ID " + params.queryId);
         }
 
+        // Fetch the SPARQL of the query whose results will be streamed.
+        final String sparql;
+        try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
+            final Optional<StreamsQuery> sQuery = queryRepo.get(queryId);
+            if(!sQuery.isPresent()) {
+                throw new ExecutionException("Could not read the results for query with ID " + queryId +
+                        " because no such query exists.");
+            }
+            sparql = sQuery.get().getSparql();
+        } catch (final Exception e) {
+            throw new ExecutionException("Problem encountered while closing the QueryRepository.", e);
+        }
+
         // This command executes until the application is killed, so create a kill boolean.
         final AtomicBoolean finished = new AtomicBoolean(false);
         Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -123,13 +154,24 @@ public class StreamResultsCommand implements RyaStreamsCommand {
             }
         });
 
-        // Execute the command.
-        final GetQueryResultStream getQueryResultStream = new KafkaGetQueryResultStream(params.kafkaIP, params.kafkaPort);
+        // Build the interactor based on the type of result the query produces.
+        final GetQueryResultStream<?> getQueryResultStream;
+        try {
+            final TupleExpr tupleExpr = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+            if(tupleExpr instanceof Reduced) {
+                getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityStatementDeserializer.class);
+            } else {
+                getQueryResultStream = new KafkaGetQueryResultStream<>(params.kafkaIP, params.kafkaPort, VisibilityBindingSetDeserializer.class);
+            }
+        } catch (final MalformedQueryException e) {
+            throw new ExecutionException("Could not parse the SPARQL for the query: " + sparql, e);
+        }
 
-        try (final QueryResultStream stream = getQueryResultStream.fromStart(queryId)) {
+        // Iterate through the results and print them to the console until the program or the stream ends.
+        try (final QueryResultStream<?> stream = getQueryResultStream.fromStart(queryId)) {
             while(!finished.get()) {
-                for(final VisibilityBindingSet visBs : stream.poll(1000)) {
-                    System.out.println(visBs);
+                for(final Object result : stream.poll(1000)) {
+                    System.out.println(result);
                 }
             }
         } catch (final Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 2d33f32..8926870 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -41,7 +41,7 @@ under the License.
                     <dependency>
                         <groupId>org.apache.rya</groupId>
                         <artifactId>rya.pcj.functions.geo</artifactId>
-                        <version>3.2.12-incubating-SNAPSHOT</version>
+                        <version>${project.version}</version>
                     </dependency>
                     <dependency>
                         <groupId>org.apache.rya</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
index 360aaa2..02a3812 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java
@@ -26,7 +26,6 @@ import java.util.UUID;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.streams.api.entity.QueryResultStream;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 
@@ -37,11 +36,13 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * A Kafka implementation of {@link QueryResultStream}. It delegates the {@link #poll(long)} method to
  * a {@link Consumer}. As a result, the starting point of this stream is whatever position the consumer
  * starts at within the Kafka topic.
+ *
+ * @param <V> - The type of the consumed records' value.
  */
 @DefaultAnnotation(NonNull.class)
-public class KafkaQueryResultStream extends QueryResultStream {
+public class KafkaQueryResultStream<V> extends QueryResultStream<V> {
 
-    private final Consumer<?, VisibilityBindingSet> consumer;
+    private final Consumer<?, V> consumer;
 
     /**
      * Constructs an instance of {@link KafkaQueryResultStream}.
@@ -49,13 +50,13 @@ public class KafkaQueryResultStream extends QueryResultStream {
      * @param queryId - The query the results are for. (not null)
      * @param consumer - The consumer that will be polled by this class. (not null)
      */
-    public KafkaQueryResultStream(final UUID queryId, final Consumer<?, VisibilityBindingSet> consumer) {
+    public KafkaQueryResultStream(final UUID queryId, final Consumer<?, V> consumer) {
         super(queryId);
         this.consumer = requireNonNull(consumer);
     }
 
     @Override
-    public Iterable<VisibilityBindingSet> poll(final long timeoutMs) throws RyaStreamsException {
+    public Iterable<V> poll(final long timeoutMs) throws RyaStreamsException {
         return new RecordEntryIterable<>( consumer.poll(timeoutMs) );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
index b3c3fea..529b493 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java
@@ -26,40 +26,47 @@ import java.util.UUID;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.streams.api.entity.QueryResultStream;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.GetQueryResultStream;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.entity.KafkaQueryResultStream;
-import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * A Kafka topic implementation of {@link GetQueryResultStream}.
+ *
+ * @param <T> - The type of results that are in the result stream.
  */
 @DefaultAnnotation(NonNull.class)
-public class KafkaGetQueryResultStream implements GetQueryResultStream {
+public class KafkaGetQueryResultStream<T> implements GetQueryResultStream<T> {
 
     private final String bootstrapServers;
+    private final Class<? extends Deserializer<T>> deserializerClass;
 
     /**
      * Constructs an instance of {@link KafkaGetQueryResultStream}.
      *
      * @param kafkaHostname - The hostname of the Kafka Broker to connect to. (not null)
      * @param kafkaPort - The port of the Kafka Broker to connect to. (not null)
+     * @param deserializerClass - The value deserializer to use when reading from the Kafka topic. (not null)
      */
-    public KafkaGetQueryResultStream(final String kafkaHostname, final String kafkaPort) {
+    public KafkaGetQueryResultStream(
+            final String kafkaHostname,
+            final String kafkaPort,
+            final Class<? extends Deserializer<T>> deserializerClass) {
         requireNonNull(kafkaHostname);
         requireNonNull(kafkaPort);
         bootstrapServers = kafkaHostname + ":" + kafkaPort;
+        this.deserializerClass = requireNonNull(deserializerClass);
     }
 
     @Override
-    public QueryResultStream fromStart(final UUID queryId) throws RyaStreamsException {
+    public QueryResultStream<T> fromStart(final UUID queryId) throws RyaStreamsException {
         requireNonNull(queryId);
 
         // Always start at the earliest point within the topic.
@@ -67,21 +74,21 @@ public class KafkaGetQueryResultStream implements GetQueryResultStream {
     }
 
     @Override
-    public QueryResultStream fromNow(final UUID queryId) throws RyaStreamsException {
+    public QueryResultStream<T> fromNow(final UUID queryId) throws RyaStreamsException {
         requireNonNull(queryId);
 
         // Always start at the latest point within the topic.
         return makeStream(queryId, "latest");
     }
 
-    private QueryResultStream makeStream(final UUID queryId, final String autoOffsetResetConfig) {
+    private QueryResultStream<T> makeStream(final UUID queryId, final String autoOffsetResetConfig) {
         // Configure which instance of Kafka to connect to.
         final Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 
         // Nothing meaningful is in the key and the value is a VisibilityBindingSet.
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityBindingSetDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerClass);
 
         // Use a UUID for the Group Id so that we never register as part of the same group as another consumer.
         props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
@@ -96,13 +103,13 @@ public class KafkaGetQueryResultStream implements GetQueryResultStream {
 
         // We are not closing the consumer here because the returned QueryResultStream is responsible for closing the
         // underlying resources required to process it.
-        final KafkaConsumer<Object, VisibilityBindingSet> consumer = new KafkaConsumer<>(props);
+        final KafkaConsumer<String, T> consumer = new KafkaConsumer<>(props);
 
         // Register the consumer for the query's results.
         final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
         consumer.subscribe(Arrays.asList(resultTopic));
 
         // Return the result stream.
-        return new KafkaQueryResultStream(queryId, consumer);
+        return new KafkaQueryResultStream<>(queryId, consumer);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd36443d/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
index c740ba2..8882753 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
@@ -29,10 +29,14 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.api.entity.QueryResultStream;
 import org.apache.rya.streams.api.interactor.GetQueryResultStream;
 import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.apache.rya.test.kafka.KafkaTestUtil;
 import org.junit.Rule;
@@ -54,23 +58,23 @@ public class KafkaGetQueryResultStreamIT {
      * the target number of results, or it hits the target number of results.
      *
      * @param pollMs - How long each poll could take.
-     * @param pollIterations - The maximum nubmer of polls that will be attempted.
+     * @param pollIterations - The maximum number of polls that will be attempted.
      * @param targetSize - The number of results to read before stopping.
      * @param stream - The stream that will be polled.
      * @return The results that were read from the stream.
      * @throws Exception If the poll failed.
      */
-    private List<VisibilityBindingSet> pollForResults(
+    private <T> List<T> pollForResults(
             final int pollMs,
             final int pollIterations,
             final int targetSize,
-            final QueryResultStream stream)  throws Exception{
-        final List<VisibilityBindingSet> read = new ArrayList<>();
+            final QueryResultStream<T> stream)  throws Exception{
+        final List<T> read = new ArrayList<>();
 
         int i = 0;
         while(read.size() < targetSize && i < pollIterations) {
-            for(final VisibilityBindingSet visBs : stream.poll(pollMs)) {
-                read.add( visBs );
+            for(final T result : stream.poll(pollMs)) {
+                read.add( result );
             }
             i++;
         }
@@ -109,7 +113,8 @@ public class KafkaGetQueryResultStreamIT {
         }
 
         // Use the interactor that is being tested to read all of the visibility binding sets.
-        final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort());
+        final GetQueryResultStream<VisibilityBindingSet> interactor =
+                new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
         final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(queryId));
 
         // Show the fetched binding sets match the original, as well as their order.
@@ -133,8 +138,9 @@ public class KafkaGetQueryResultStreamIT {
             producer.flush();
 
             // Use the interactor that is being tested to read all of the visibility binding sets that appear after this point.
-            final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort());
-            try(QueryResultStream results = interactor.fromNow(queryId)) {
+            final GetQueryResultStream<VisibilityBindingSet> interactor =
+                    new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
+            try(QueryResultStream<VisibilityBindingSet> results = interactor.fromNow(queryId)) {
                 // Read results from the stream.
                 List<VisibilityBindingSet> read = new ArrayList<>();
                 for(final VisibilityBindingSet visBs : results.poll(500)) {
@@ -175,11 +181,42 @@ public class KafkaGetQueryResultStreamIT {
         final UUID queryId = UUID.randomUUID();
 
         // Use the interactor that is being tested to create a result stream and immediately close it.
-        final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort());
-        final QueryResultStream results = interactor.fromStart(queryId);
+        final GetQueryResultStream<VisibilityBindingSet> interactor =
+                new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
+        final QueryResultStream<VisibilityBindingSet> results = interactor.fromStart(queryId);
         results.close();
 
         // Try to poll the closed stream.
         results.poll(1);
     }
+
+    @Test
+    public void fromStart_visibilityStatements() throws Exception {
+        // Create an ID for the query.
+        final UUID queryId = UUID.randomUUID();
+
+        // Create some statements that will be written to the result topic.
+        final List<VisibilityStatement> original = new ArrayList<>();
+        final ValueFactory vf = new ValueFactoryImpl();
+        original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+        original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(63)), "b") );
+        original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral("urn:34")), "") );
+
+        // Write the entries to the query result topic in Kafka.
+        try(final Producer<?, VisibilityStatement> producer =
+                KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
+            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
+            for(final VisibilityStatement visStmt : original) {
+                producer.send(new ProducerRecord<>(resultTopic, visStmt));
+            }
+        }
+
+        // Use the interactor that is being tested to read all of the visibility binding sets.
+        final GetQueryResultStream<VisibilityStatement> interactor =
+                new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityStatementDeserializer.class);
+        final List<VisibilityStatement> read = pollForResults(500, 3, 3, interactor.fromStart(queryId));
+
+        // Show the fetched binding sets match the original, as well as their order.
+        assertEquals(original, read);
+    }
 }
\ No newline at end of file