You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/29 20:36:45 UTC

[2/2] incubator-rya git commit: RYA-440 Added commands to Rya Shell used to interact with Rya Streams. Closes #267.

RYA-440 Added commands to Rya Shell used to interact with Rya Streams. Closes #267.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/59b20263
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/59b20263
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/59b20263

Branch: refs/heads/master
Commit: 59b20263c7c574d46d8462985fd7be126e8c4ea1
Parents: 3d4a5d0
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Jan 16 17:40:08 2018 -0500
Committer: caleb <ca...@parsons.com>
Committed: Mon Jan 29 14:01:11 2018 -0500

----------------------------------------------------------------------
 .../org/apache/rya/api/client/RyaClient.java    |  10 +
 .../api/client/SetRyaStreamsConfiguration.java  |  41 ++
 .../org/apache/rya/api/instance/RyaDetails.java | 104 ++++-
 .../apache/rya/api/instance/RyaDetailsTest.java |   8 +-
 .../mongodb/instance/MongoDetailsAdapter.java   |  87 +++-
 .../instance/MongoDetailsAdapterTest.java       |   9 +-
 .../client/SetRyaStreamsConfigurationBase.java  |  83 ++++
 .../accumulo/AccumuloRyaClientFactory.java      |   6 +-
 .../AccumuloSetRyaStreamsConfiguration.java     |  59 +++
 .../api/client/mongo/MongoRyaClientFactory.java |   1 +
 .../mongo/MongoSetRyaStreamsConfiguration.java  |  60 +++
 .../AccumuloSetRyaStreamsConfigurationIT.java   |  81 ++++
 .../MongoSetRyaStreamsConfigurationIT.java      |  85 ++++
 .../rya/streams/api/RyaStreamsClient.java       | 137 ++++++
 .../rya/streams/api/interactor/GetQuery.java    |  44 ++
 .../interactor/defaults/DefaultGetQuery.java    |  55 +++
 .../kafka/KafkaRyaStreamsClientFactory.java     | 170 +++++++
 extras/shell/pom.xml                            |   4 +
 .../apache/rya/shell/RyaConnectionCommands.java |  50 +-
 .../apache/rya/shell/RyaStreamsCommands.java    | 297 ++++++++++++
 .../org/apache/rya/shell/SharedShellState.java  |  58 ++-
 .../rya/shell/util/StreamsQueryFormatter.java   | 106 +++++
 .../META-INF/spring/spring-shell-plugin.xml     |   3 +-
 .../rya/shell/RyaStreamsCommandsTest.java       | 461 +++++++++++++++++++
 .../apache/rya/shell/SharedShellStateTest.java  |  31 ++
 .../shell/util/StreamsQueryFormatterTest.java   | 102 ++++
 .../src/test/resources/RyaShellTest-context.xml |   1 +
 27 files changed, 2101 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
index 92b18a1..c122f43 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
@@ -45,6 +45,7 @@ public class RyaClient {
     private final ListInstances listInstances;
     private final Optional<AddUser> addUser;
     private final Optional<RemoveUser> removeUser;
+    private final SetRyaStreamsConfiguration setRyaStreamsConfig;
     private final Uninstall uninstall;
     private final LoadStatements loadStatements;
     private final LoadStatementsFile loadStatementsFile;
@@ -65,6 +66,7 @@ public class RyaClient {
             final ListInstances listInstances,
             final Optional<AddUser> addUser,
             final Optional<RemoveUser> removeUser,
+            final SetRyaStreamsConfiguration setRyaStreamsConfig,
             final Uninstall uninstall,
             final LoadStatements loadStatements,
             final LoadStatementsFile loadStatementsFile,
@@ -81,6 +83,7 @@ public class RyaClient {
         this.listInstances = requireNonNull(listInstances);
         this.addUser = requireNonNull(addUser);
         this.removeUser = requireNonNull(removeUser);
+        this.setRyaStreamsConfig = requireNonNull(setRyaStreamsConfig);
         this.uninstall = requireNonNull(uninstall);
         this.loadStatements = requireNonNull(loadStatements);
         this.loadStatementsFile = requireNonNull(loadStatementsFile);
@@ -176,6 +179,13 @@ public class RyaClient {
     }
 
     /**
+     * @return An instance of {@link SetRyaStreamsConfiguration} that is connected to a Rya storage.
+     */
+    public SetRyaStreamsConfiguration getSetRyaStreamsConfiguration() {
+        return setRyaStreamsConfig;
+    }
+
+    /**
      * @return An instance of {@link Uninstall} that is connected to a Rya storage.
      */
     public Uninstall getUninstall() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java b/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..5e75f06
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update which Rya Streams subsystem a Rya instance is connected to.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface SetRyaStreamsConfiguration {
+
+    /**
+     * Update which Rya Streams subsystem a Rya instance is connected to.
+     *
+     * @param instanceName - Indicates which Rya instance will have a Rya Streams subsystem assigned to it. (not null)
+     * @param streamsDetails - Indicates which Rya Streams subsystem the instance will use. (not null)
+     * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+     * @throws RyaClientException Something caused the command to fail.
+     */
+    public void setRyaStreamsConfiguration(String ryaInstance, RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java b/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
index 9d2c1e5..bda7390 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
@@ -29,25 +29,22 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
 /**
  * Details about how a Rya instance's state.
  */
 @Immutable
 @DefaultAnnotation(NonNull.class)
 public class RyaDetails implements Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     // General metadata about the instance.
     private final String instanceName;
@@ -68,6 +65,9 @@ public class RyaDetails implements Serializable {
     private final ProspectorDetails prospectorDetails;
     private final JoinSelectivityDetails joinSelectivityDetails;
 
+    // Rya Streams Details.
+    private final Optional<RyaStreamsDetails> ryaStreamsDetails;
+
     /**
      * Private to prevent initialization through the constructor. To build
      * instances of this class, use the {@link Builder}.
@@ -82,7 +82,8 @@ public class RyaDetails implements Serializable {
             final TemporalIndexDetails temporalDetails,
             final FreeTextIndexDetails freeTextDetails,
             final ProspectorDetails prospectorDetails,
-            final JoinSelectivityDetails joinSelectivityDetails) {
+            final JoinSelectivityDetails joinSelectivityDetails,
+            final Optional<RyaStreamsDetails> ryaStreamsDetails) {
         this.instanceName = requireNonNull(instanceName);
         this.version = requireNonNull(version);
         this.users = requireNonNull(users);
@@ -93,6 +94,7 @@ public class RyaDetails implements Serializable {
         this.freeTextDetails = requireNonNull(freeTextDetails);
         this.prospectorDetails = requireNonNull(prospectorDetails);
         this.joinSelectivityDetails = requireNonNull(joinSelectivityDetails);
+        this.ryaStreamsDetails = requireNonNull(ryaStreamsDetails);
     }
 
     /**
@@ -168,6 +170,13 @@ public class RyaDetails implements Serializable {
         return joinSelectivityDetails;
     }
 
+    /**
+     * @return Information about the instance's Rya Streams integration, if it was set.
+     */
+    public Optional<RyaStreamsDetails> getRyaStreamsDetails() {
+        return ryaStreamsDetails;
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(
@@ -179,7 +188,8 @@ public class RyaDetails implements Serializable {
                 temporalDetails,
                 freeTextDetails,
                 prospectorDetails,
-                joinSelectivityDetails);
+                joinSelectivityDetails,
+                ryaStreamsDetails);
     }
 
     @Override
@@ -197,7 +207,8 @@ public class RyaDetails implements Serializable {
                     Objects.equals(temporalDetails, details.temporalDetails) &&
                     Objects.equals(freeTextDetails, details.freeTextDetails) &&
                     Objects.equals(prospectorDetails, details.prospectorDetails) &&
-                    Objects.equals(joinSelectivityDetails, details.joinSelectivityDetails);
+                    Objects.equals(joinSelectivityDetails, details.joinSelectivityDetails) &&
+                    Objects.equals(ryaStreamsDetails, details.ryaStreamsDetails);
         }
         return false;
     }
@@ -239,6 +250,9 @@ public class RyaDetails implements Serializable {
         private ProspectorDetails prospectorDetails;
         private JoinSelectivityDetails joinSelectivityDetails;
 
+        // Rya Streams Details.
+        private RyaStreamsDetails ryaStreamsDetails;
+
         /**
          * Construcst an empty instance of {@link Builder}.
          */
@@ -262,6 +276,7 @@ public class RyaDetails implements Serializable {
             freeTextDetails = details.freeTextDetails;
             prospectorDetails = details.prospectorDetails;
             joinSelectivityDetails = details.joinSelectivityDetails;
+            ryaStreamsDetails = details.ryaStreamsDetails.orNull();
         }
 
         /**
@@ -375,6 +390,15 @@ public class RyaDetails implements Serializable {
         }
 
         /**
+         * @param ryaStreamsDetails - Information about the instance's Rya Streams integration.
+         * @return This {@link Builder} so that method invocations may be chained.
+         */
+        public Builder setRyaStreamsDetails(@Nullable final RyaStreamsDetails ryaStreamsDetails) {
+            this.ryaStreamsDetails = ryaStreamsDetails;
+            return this;
+        }
+
+        /**
          * @return An instance of {@link RyaDetails} built using this
          *   builder's values.
          */
@@ -389,7 +413,8 @@ public class RyaDetails implements Serializable {
                     temporalDetails,
                     freeTextDetails,
                     prospectorDetails,
-                    joinSelectivityDetails);
+                    joinSelectivityDetails,
+                    Optional.fromNullable(ryaStreamsDetails));
         }
     }
 
@@ -1071,4 +1096,57 @@ public class RyaDetails implements Serializable {
             return false;
         }
     }
+
+    /**
+     * Details about the Rya instance's Rya Streams integration.
+     */
+    public static class RyaStreamsDetails implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String hostname;
+        private final int port;
+
+        /**
+         * Constructs an instance of {@link RyaStreamsDetails}.
+         *
+         * @param hostname - The hostname used to communicate with the Rya Streams subsystem. (not null)
+         * @param port - The port used to communicate with the Rya Streams subsystem.
+         */
+        public RyaStreamsDetails(final String hostname, final int port) {
+            this.hostname = requireNonNull(hostname);
+            this.port = port;
+        }
+
+        /**
+         * @return The hostname used to communicate with the Rya Streams subsystem.
+         */
+        public String getHostname() {
+            return hostname;
+        }
+
+        /**
+         * @return The port used to communicate with the Rya Streams subsystem.
+         */
+        public int getPort() {
+            return port;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(hostname, port);
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if(this == obj) {
+                return true;
+            }
+            if(obj instanceof RyaStreamsDetails) {
+                final RyaStreamsDetails other = (RyaStreamsDetails) obj;
+                return Objects.equals(hostname, other.hostname) &&
+                        port == other.port;
+            }
+            return false;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java b/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
index a356877..b6e92e0 100644
--- a/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
+++ b/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
@@ -31,6 +31,7 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
 import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
 import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
 import org.junit.Test;
 
@@ -65,7 +66,8 @@ public class RyaDetailsTest {
                                     .setId("pcj 2")
                                     .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
             .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
-            .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) );
+            .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+            .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5));
 
         final RyaDetails details1 = builder.build();
         final RyaDetails details2 = builder.build();
@@ -96,7 +98,8 @@ public class RyaDetailsTest {
                                     .setId("pcj 2")
                                     .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
             .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
-            .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) );
+            .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+            .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5));
 
         final RyaDetails details1 = builder.build();
         final RyaDetails details2 = builder.build();
@@ -127,6 +130,7 @@ public class RyaDetailsTest {
                                     .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
             .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
             .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+            .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5))
             .build();
 
         // Create a new Builder using another RyaDetails object.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
index 8010808..f86c150 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
@@ -32,10 +32,10 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
 import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
 import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.mongodb.BasicDBList;
 import com.mongodb.BasicDBObject;
 import com.mongodb.BasicDBObjectBuilder;
@@ -48,7 +48,6 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * Serializes configuration details for use in Mongo.
  * The {@link DBObject} will look like:
  * <pre>
- * {@code
  * {
  *   "instanceName": &lt;string&gt;,
  *   "version": &lt;string&gt;?,
@@ -68,6 +67,10 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  *   "freeTextDetails": &lt;boolean&gt;,
  *   "prospectorDetails": &lt;date&gt;,
  *   "joinSelectivityDetails": &lt;date&gt;
+ *   "ryaStreamsDetails": {
+ *       "hostname": &lt;string&gt;
+ *       "port": &lt;int&gt;
+ *   }
  * }
  * </pre>
  */
@@ -91,13 +94,19 @@ public class MongoDetailsAdapter {
     public static final String PROSPECTOR_DETAILS_KEY = "prospectorDetails";
     public static final String JOIN_SELECTIVITY_DETAILS_KEY = "joinSelectivitiyDetails";
 
+    public static final String RYA_STREAMS_DETAILS_KEY = "ryaStreamsDetails";
+    public static final String RYA_STREAMS_HOSTNAME_KEY = "hostname";
+    public static final String RYA_STREAMS_PORT_KEY = "port";
+
     /**
-     * Serializes {@link RyaDetails} to mongo {@link DBObject}.
-     * @param details - The details to be serialized.
-     * @return The mongo {@link DBObject}.
+     * Converts a {@link RyaDetails} object into its MongoDB {@link DBObject} equivalent.
+     *
+     * @param details - The details to convert. (not null)
+     * @return The MongoDB {@link DBObject} equivalent.
      */
     public static BasicDBObject toDBObject(final RyaDetails details) {
-        Preconditions.checkNotNull(details);
+        requireNonNull(details);
+
         final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start()
                 .add(INSTANCE_KEY, details.getRyaInstanceName())
                 .add(VERSION_KEY, details.getRyaVersion())
@@ -106,12 +115,29 @@ public class MongoDetailsAdapter {
                 .add(PCJ_DETAILS_KEY, toDBObject(details.getPCJIndexDetails()))
                 .add(TEMPORAL_DETAILS_KEY, details.getTemporalIndexDetails().isEnabled())
                 .add(FREETEXT_DETAILS_KEY, details.getFreeTextIndexDetails().isEnabled());
+
         if(details.getProspectorDetails().getLastUpdated().isPresent()) {
             builder.add(PROSPECTOR_DETAILS_KEY, details.getProspectorDetails().getLastUpdated().get());
         }
+
         if(details.getJoinSelectivityDetails().getLastUpdated().isPresent()) {
             builder.add(JOIN_SELECTIVITY_DETAILS_KEY, details.getJoinSelectivityDetails().getLastUpdated().get());
         }
+
+        // If the Rya Streams Details are present, then add them.
+        if(details.getRyaStreamsDetails().isPresent()) {
+            final RyaStreamsDetails ryaStreamsDetails = details.getRyaStreamsDetails().get();
+
+            // The embedded object that holds onto the fields.
+            final DBObject ryaStreamsFields = BasicDBObjectBuilder.start()
+                    .add(RYA_STREAMS_HOSTNAME_KEY, ryaStreamsDetails.getHostname())
+                    .add(RYA_STREAMS_PORT_KEY, ryaStreamsDetails.getPort())
+                    .get();
+
+            // Add them to the main builder.
+            builder.add(RYA_STREAMS_DETAILS_KEY, ryaStreamsFields);
+        }
+
         return (BasicDBObject) builder.get();
     }
 
@@ -154,20 +180,38 @@ public class MongoDetailsAdapter {
         return builder.get();
     }
 
+    /**
+     * Converts a MongoDB {@link DBObject} into its {@link RyaDetails} equivalent.
+     *
+     * @param mongoObj - The MongoDB object to convert. (not null)
+     * @return The equivalent {@link RyaDetails} object.
+     * @throws MalformedRyaDetailsException The MongoDB object could not be converted.
+     */
     public static RyaDetails toRyaDetails(final DBObject mongoObj) throws MalformedRyaDetailsException {
+        requireNonNull(mongoObj);
         final BasicDBObject basicObj = (BasicDBObject) mongoObj;
         try {
-            return RyaDetails.builder()
-                    .setRyaInstanceName(basicObj.getString(INSTANCE_KEY))
-                    .setRyaVersion(basicObj.getString(VERSION_KEY))
-                    .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY)))
-                    //RYA-215            .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY)))
-                    .setPCJIndexDetails(getPCJIndexDetails(basicObj))
-                    .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY)))
-                    .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY)))
-                    .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY))))
-                    .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY))))
-                    .build();
+            final RyaDetails.Builder builder = RyaDetails.builder()
+                .setRyaInstanceName(basicObj.getString(INSTANCE_KEY))
+                .setRyaVersion(basicObj.getString(VERSION_KEY))
+                .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY)))
+                //RYA-215            .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY)))
+                .setPCJIndexDetails(getPCJIndexDetails(basicObj))
+                .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY)))
+                .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY)))
+                .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY))))
+                .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY))));
+
+            // If the Rya Streams Details are present, then add them.
+            if(basicObj.containsField(RYA_STREAMS_DETAILS_KEY)) {
+                final BasicDBObject streamsObject = (BasicDBObject) basicObj.get(RYA_STREAMS_DETAILS_KEY);
+                final String hostname = streamsObject.getString(RYA_STREAMS_HOSTNAME_KEY);
+                final int port = streamsObject.getInt(RYA_STREAMS_PORT_KEY);
+                builder.setRyaStreamsDetails(new RyaStreamsDetails(hostname, port));
+            }
+
+            return builder.build();
+
         } catch(final Exception e) {
             throw new MalformedRyaDetailsException("Failed to make RyaDetail from Mongo Object, it is malformed.", e);
         }
@@ -213,14 +257,15 @@ public class MongoDetailsAdapter {
     }
 
     /**
-     * Exception thrown when a MongoDB {@link DBObject} is malformed when attemptin
-     * to adapt it into a {@link RyaDetails}.
+     * Indicates a MongoDB {@link DBObject} was malformed when attempting
+     * to convert it into a {@link RyaDetails} object.
      */
     public static class MalformedRyaDetailsException extends Exception {
         private static final long serialVersionUID = 1L;
 
         /**
-         * Creates a new {@link MalformedRyaDetailsException}
+         * Creates a new {@link MalformedRyaDetailsException}.
+         *
          * @param message - The message to be displayed by the exception.
          * @param e - The source cause of the exception.
          */
@@ -228,4 +273,4 @@ public class MongoDetailsAdapter {
             super(message, e);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
index 0ea9456..f5845c2 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
@@ -32,6 +32,7 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
 import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
 import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
 import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
 import org.apache.rya.mongodb.instance.MongoDetailsAdapter.MalformedRyaDetailsException;
 import org.junit.Test;
@@ -72,6 +73,7 @@ public class MongoDetailsAdapterTest {
                 .setFreeTextDetails(new FreeTextIndexDetails(true))
                 .setProspectorDetails(new ProspectorDetails(Optional.fromNullable(new Date(0L))))
                 .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.fromNullable(new Date(1L))))
+                .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 6))
                 .build();
 
         final BasicDBObject actual = MongoDetailsAdapter.toDBObject(details);
@@ -100,7 +102,8 @@ public class MongoDetailsAdapterTest {
                         + "temporalDetails : true,"
                         + "freeTextDetails : true,"
                         + "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"},"
-                        + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}"
+                        + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"},"
+                        + "ryaStreamsDetails : { hostname : \"localhost\" , port : 6}"
                         + "}"
                 );
 
@@ -134,7 +137,8 @@ public class MongoDetailsAdapterTest {
                         + "temporalDetails : true,"
                         + "freeTextDetails : true,"
                         + "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"},"
-                        + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}"
+                        + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"},"
+                        + "ryaStreamsDetails : { hostname : \"localhost\" , port : 6}"
                         + "}"
                 );
 
@@ -163,6 +167,7 @@ public class MongoDetailsAdapterTest {
                 .setFreeTextDetails(new FreeTextIndexDetails(true))
                 .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(new Date(0L))))
                 .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(new Date(1L))))
+                .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 6))
                 .build();
 
         assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java b/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
new file mode 100644
index 0000000..92a4c44
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.instance.RyaDetailsUpdater;
+import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A base class that implements the core functionality of the {@link SetRyaStreamsConfiguration} interactor.
+ * Subclasses just need to implement {@link #getRyaDetailsRepo(String)} so that the common code may update
+ * any implementation of that repository.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class SetRyaStreamsConfigurationBase implements SetRyaStreamsConfiguration {
+
+    private final InstanceExists instanceExists;
+
+    /**
+     * Constructs an instance of {@link SetRyaStreamsConfigurationBase}.
+     *
+     * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+     */
+    public SetRyaStreamsConfigurationBase(final InstanceExists instanceExists) {
+        this.instanceExists = requireNonNull(instanceExists);
+    }
+
+    /**
+     * Get a {@link RyaDetailsRepository} that is connected to a specific instance of Rya.
+     *
+     * @param ryaInstance - The Rya instance the repository must be connected to. (not null)
+     * @return A {@link RyaDetailsRepository} connected to the specified Rya instance.
+     */
+    protected abstract RyaDetailsRepository getRyaDetailsRepo(String ryaInstance);
+
+    @Override
+    public void setRyaStreamsConfiguration(final String ryaInstance, final RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException{
+        requireNonNull(ryaInstance);
+        requireNonNull(streamsDetails);
+
+        // Verify the Rya Instance exists.
+        if(!instanceExists.exists(ryaInstance)) {
+            throw new InstanceDoesNotExistException("There is no Rya instance named '" + ryaInstance + "' in this storage.");
+        }
+
+        // Update the old details object using the provided Rya Streams details.
+        final RyaDetailsRepository repo = getRyaDetailsRepo(ryaInstance);
+        try {
+            new RyaDetailsUpdater(repo).update(oldDetails -> {
+                final RyaDetails.Builder builder = RyaDetails.builder(oldDetails);
+                builder.setRyaStreamsDetails(streamsDetails);
+                return builder.build();
+            });
+        } catch (CouldNotApplyMutationException | RyaDetailsRepositoryException e) {
+            throw new RyaClientException("Unable to update which Rya Streams subsystem is used by the '" +
+                    ryaInstance + "' Rya instance.", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index 17fddaa..fdabea9 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -23,6 +23,7 @@ import static java.util.Objects.requireNonNull;
 import java.util.Optional;
 
 import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.api.client.InstanceExists;
 import org.apache.rya.api.client.RyaClient;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -50,6 +51,8 @@ public class AccumuloRyaClientFactory {
         requireNonNull(connector);
 
         // Build the RyaCommands option with the initialized commands.
+        final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, connector);
+
         return new RyaClient(
                 new AccumuloInstall(connectionDetails, connector),
                 new AccumuloCreatePCJ(connectionDetails, connector),
@@ -59,10 +62,11 @@ public class AccumuloRyaClientFactory {
                 Optional.of(new AccumuloListIncrementalQueries(connectionDetails, connector)),
                 new AccumuloBatchUpdatePCJ(connectionDetails, connector),
                 new AccumuloGetInstanceDetails(connectionDetails, connector),
-                new AccumuloInstanceExists(connectionDetails, connector),
+                instanceExists,
                 new AccumuloListInstances(connectionDetails, connector),
                 Optional.of(new AccumuloAddUser(connectionDetails, connector)),
                 Optional.of(new AccumuloRemoveUser(connectionDetails, connector)),
+                new AccumuloSetRyaStreamsConfiguration(instanceExists, connector),
                 new AccumuloUninstall(connectionDetails, connector),
                 new AccumuloLoadStatements(connectionDetails, connector),
                 new AccumuloLoadStatementsFile(connectionDetails, connector),

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..f19d9fb
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.SetRyaStreamsConfigurationBase;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An Accumulo implementation of {@link SetRyaStreamsConfiguration}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase {
+
+    private final Connector connector;
+
+    /**
+     * Constructs an instance of {@link AccumuloSetRyaStreamsConfiguration}.
+     *
+     * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+     * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
+     */
+    public AccumuloSetRyaStreamsConfiguration(
+            final InstanceExists instanceExists,
+            final Connector connector) {
+        super(instanceExists);
+        this.connector = requireNonNull(connector);
+    }
+
+    @Override
+    protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) {
+        requireNonNull(ryaInstance);
+        return new AccumuloRyaInstanceDetailsRepository(connector, ryaInstance);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
index fbbec2a..5fa4877 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
@@ -66,6 +66,7 @@ public class MongoRyaClientFactory {
                 new MongoListInstances(adminClient),
                 Optional.empty(),
                 Optional.empty(),
+                new MongoSetRyaStreamsConfiguration(instanceExists, adminClient),
                 new MongoUninstall(adminClient, instanceExists),
                 new MongoLoadStatements(connectionDetails, instanceExists),
                 new MongoLoadStatementsFile(connectionDetails, instanceExists),

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..592e663
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.SetRyaStreamsConfigurationBase;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+
+import com.mongodb.MongoClient;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A MongoDB implementation of {@link SetRyaStreamsConfiguration}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase {
+
+    private final MongoClient client;
+
+    /**
+     * Constructs an instance of {@link MongoSetRyaStreamsConfiguration}.
+     *
+     * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+     * @param client - The MongoDB client used to connect to the Rya storage. (not null)
+     */
+    public MongoSetRyaStreamsConfiguration(
+            final InstanceExists instanceExists,
+            final MongoClient client) {
+        super(instanceExists);
+        this.client = requireNonNull(client);
+    }
+
+    @Override
+    protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) {
+        requireNonNull(ryaInstance);
+        return new MongoRyaInstanceDetailsRepository(client, ryaInstance);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java
new file mode 100644
index 0000000..928a29e
--- /dev/null
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import org.apache.rya.accumulo.AccumuloITBase;
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link AccumuloSetRyaStreamsConfiguration}.
+ */
+public class AccumuloSetRyaStreamsConfigurationIT extends AccumuloITBase {
+
+    @Test(expected = InstanceDoesNotExistException.class)
+    public void instanceDoesNotExist() throws Exception {
+        final String ryaInstance = getRyaInstanceName();
+        final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+                getUsername(),
+                getPassword().toCharArray(),
+                getInstanceName(),
+                getZookeepers());
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
+        // Skip the install step to create error causing situation.
+        final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+        ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+    }
+
+    @Test
+    public void updatesRyaDetails() throws Exception {
+        final String ryaInstance = getRyaInstanceName();
+        final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+                getUsername(),
+                getPassword().toCharArray(),
+                getInstanceName(),
+                getZookeepers());
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
+        // Install an instance of Rya.
+        final Install installRya = ryaClient.getInstall();
+        final InstallConfiguration installConf = InstallConfiguration.builder()
+                .build();
+        installRya.install(ryaInstance, installConf);
+
+        // Fetch its details and show they do not have any RyaStreamsDetails.
+        com.google.common.base.Optional<RyaStreamsDetails> streamsDetails =
+                ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+        assertFalse(streamsDetails.isPresent());
+
+        // Set the details.
+        final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+        ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+
+        // Fetch its details again and show that they are now filled in.
+        streamsDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+        assertEquals(details, streamsDetails.get());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java
new file mode 100644
index 0000000..5fea578
--- /dev/null
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Optional;
+
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.mongodb.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoSetRyaStreamsConfiguration}.
+ */
+public class MongoSetRyaStreamsConfigurationIT extends MongoITBase {
+
+    @Test(expected = InstanceDoesNotExistException.class)
+    public void instanceDoesNotExist() throws Exception {
+        final RyaClient ryaClient = MongoRyaClientFactory.build(getConnectionDetails(), getMongoClient());
+
+        // Skip the install step to create error causing situation.
+        final String ryaInstance = conf.getRyaInstanceName();
+        final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+        ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+    }
+
+    @Test
+    public void updatesRyaDetails() throws Exception {
+        final RyaClient ryaClient = MongoRyaClientFactory.build(getConnectionDetails(), getMongoClient());
+
+        // Install an instance of Rya.
+        final String ryaInstance = conf.getRyaInstanceName();
+        final Install installRya = ryaClient.getInstall();
+        final InstallConfiguration installConf = InstallConfiguration.builder()
+                .build();
+        installRya.install(ryaInstance, installConf);
+
+        // Fetch its details and show they do not have any RyaStreamsDetails.
+        com.google.common.base.Optional<RyaStreamsDetails> streamsDetails =
+                ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+        assertFalse(streamsDetails.isPresent());
+
+        // Set the details.
+        final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+        ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+
+        // Fetch its details again and show that they are now filled in.
+        streamsDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+        assertEquals(details, streamsDetails.get());
+    }
+
+    private MongoConnectionDetails getConnectionDetails() {
+        final Optional<char[]> password = conf.getMongoPassword() != null ?
+                Optional.of(conf.getMongoPassword().toCharArray()) :
+                Optional.empty();
+
+        return new MongoConnectionDetails(
+                conf.getMongoHostname(),
+                Integer.parseInt(conf.getMongoPort()),
+                Optional.ofNullable(conf.getMongoUser()),
+                password);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java
new file mode 100644
index 0000000..ee86e41
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.interactor.AddQuery;
+import org.apache.rya.streams.api.interactor.DeleteQuery;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.interactor.GetQueryResultStream;
+import org.apache.rya.streams.api.interactor.ListQueries;
+import org.apache.rya.streams.api.interactor.StartQuery;
+import org.apache.rya.streams.api.interactor.StopQuery;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides access to a set of Rya Streams functions.Statement
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaStreamsClient implements AutoCloseable {
+
+    private final AddQuery addQuery;
+    private final GetQuery getQuery;
+    private final DeleteQuery deleteQuery;
+    private final GetQueryResultStream<VisibilityStatement> getStatementResultStream;
+    private final GetQueryResultStream<VisibilityBindingSet> getBindingSetResultStream;
+    private final ListQueries listQueries;
+    private final StartQuery startQuery;
+    private final StopQuery stopQuery;
+
+    /**
+     * Constructs an instance of {@link RyaStreamsClient}.
+     */
+    public RyaStreamsClient(
+            final AddQuery addQuery,
+            final GetQuery getQuery,
+            final DeleteQuery deleteQuery,
+            final GetQueryResultStream<VisibilityStatement> getStatementResultStream,
+            final GetQueryResultStream<VisibilityBindingSet> getBindingSetResultStream,
+            final ListQueries listQueries,
+            final StartQuery startQuery,
+            final StopQuery stopQuery) {
+        this.addQuery = requireNonNull(addQuery);
+        this.getQuery = requireNonNull(getQuery);
+        this.deleteQuery = requireNonNull(deleteQuery);
+        this.getStatementResultStream = requireNonNull(getStatementResultStream);
+        this.getBindingSetResultStream = requireNonNull(getBindingSetResultStream);
+        this.listQueries = requireNonNull(listQueries);
+        this.startQuery = requireNonNull(startQuery);
+        this.stopQuery = requireNonNull(stopQuery);
+    }
+
+    /**
+     * @return The connected {@link AddQuery} interactor.
+     */
+    public AddQuery getAddQuery() {
+        return addQuery;
+    }
+
+    /**
+     * @return The connected {@link GetQuery} interactor.
+     */
+    public GetQuery getGetQuery() {
+        return getQuery;
+    }
+
+    /**
+     * @return The connected {@link DeleteQuery} interactor.
+     */
+    public DeleteQuery getDeleteQuery() {
+        return deleteQuery;
+    }
+
+    /**
+     * @return The connected {@link GetQueryResultStream} interactor for a query that produces
+     *   {@link VisibilityStatement}s.
+     */
+    public GetQueryResultStream<VisibilityStatement> getGetStatementResultStream() {
+        return getStatementResultStream;
+    }
+
+    /**
+     * @return The connected {@link GetQueryResultStream} interactor for a query that produces
+     *   {@link VisibilityBindingSet}s.
+     */
+    public GetQueryResultStream<VisibilityBindingSet>  getGetBindingSetResultStream() {
+        return getBindingSetResultStream;
+    }
+
+    /**
+     * @return The connected {@link ListQueries} interactor.
+     */
+    public ListQueries getListQueries() {
+        return listQueries;
+    }
+
+    /**
+     * @return The connected {@link StartQuery} interactor.
+     */
+    public StartQuery getStartQuery() {
+        return startQuery;
+    }
+
+    /**
+     * @return The connected {@link StopQuery} interactor.
+     */
+    public StopQuery getStopQuery() {
+        return stopQuery;
+    }
+
+    /**
+     * By defualt, this client doesn't close anything. If an implementation of the client
+     * requires closing components, then override this method.
+     */
+    @Override
+    public void close() throws Exception { }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java
new file mode 100644
index 0000000..1293714
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor;
+
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Get a {@link StreamsQuery} from Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface GetQuery {
+
+    /**
+     * Get a {@link StreamsQuery} from Rya Streams.
+     *
+     * @param queryId - Identifies the query to fetch. (not null)
+     * @return The {@link StreamsQuery} for the {@code queryId}; if one is stored for the ID.
+     * @throws RyaStreamsException The query could not be fetched.
+     */
+    public Optional<StreamsQuery> getQuery(UUID queryId) throws RyaStreamsException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java
new file mode 100644
index 0000000..14b93ab
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Get a {@link StreamsQuery} from Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public class DefaultGetQuery implements GetQuery {
+    private final QueryRepository repository;
+
+    /**
+     * Constructs an instance of {@link DefaultGetQuery}.
+     *
+     * @param repository - The {@link QueryRepository} to get queries from. (not null)
+     */
+    public DefaultGetQuery(final QueryRepository repository) {
+        this.repository = requireNonNull(repository);
+    }
+
+    @Override
+    public Optional<StreamsQuery> getQuery(final UUID queryId) throws RyaStreamsException {
+        requireNonNull(queryId);
+        return repository.get(queryId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
new file mode 100644
index 0000000..9250d9d
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries;
+import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class KafkaRyaStreamsClientFactory {
+    private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class);
+
+    /**
+     * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams
+     * that is backed by Kafka.
+     *
+     * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null)
+     * @param kafkaHostname - The hostname of the Kafka Broker.
+     * @param kafkaPort - The port of the Kafka Broker.
+     * @return The initialized commands.
+     */
+    public static RyaStreamsClient make(
+            final String ryaInstance,
+            final String kafkaHostname,
+            final int kafkaPort) {
+        requireNonNull(ryaInstance);
+        requireNonNull(kafkaHostname);
+
+        // Setup Query Repository used by the Kafka Rya Streams subsystem.
+        final Producer<?, QueryChange> queryProducer =
+                makeProducer(kafkaHostname, kafkaPort, StringSerializer.class, QueryChangeSerializer.class);
+        final Consumer<?, QueryChange>queryConsumer =
+                fromStartConsumer(kafkaHostname, kafkaPort, StringDeserializer.class, QueryChangeDeserializer.class);
+        final String changeLogTopic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+        final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+        final QueryRepository queryRepo = new InMemoryQueryRepository(changeLog);
+
+        // Create the Rya Streams client that is backed by a Kafka Query Change Log.
+        return new RyaStreamsClient(
+                new DefaultAddQuery(queryRepo),
+                new DefaultGetQuery(queryRepo),
+                new DefaultDeleteQuery(queryRepo),
+                new KafkaGetQueryResultStream<>(kafkaHostname, "" + kafkaPort, VisibilityStatementDeserializer.class),
+                new KafkaGetQueryResultStream<>(kafkaHostname, "" + kafkaPort, VisibilityBindingSetDeserializer.class),
+                new DefaultListQueries(queryRepo),
+                new DefaultStartQuery(queryRepo),
+                new DefaultStopQuery(queryRepo)) {
+
+            /**
+             * Close the QueryRepository used by the returned client.
+             */
+            @Override
+            public void close() {
+                try {
+                    queryRepo.close();
+                } catch (final Exception e) {
+                    log.warn("Couldn't close a QueryRepository.", e);
+                }
+            }
+        };
+    }
+
+    /**
+     * Create a {@link Producer} that is able to write to a topic in Kafka.
+     *
+     * @param kafkaHostname - The Kafka broker hostname. (not null)
+     * @param kafkaPort - The Kafka broker port.
+     * @param keySerializerClass - Serializes the keys. (not null)
+     * @param valueSerializerClass - Serializes the values. (not null)
+     * @return A {@link Producer} that can be used to write records to a topic.
+     */
+    private static <K, V> Producer<K, V> makeProducer(
+            final String kafkaHostname,
+            final int kakfaPort,
+            final Class<? extends Serializer<K>> keySerializerClass,
+            final Class<? extends Serializer<V>> valueSerializerClass) {
+        requireNonNull(kafkaHostname);
+        requireNonNull(keySerializerClass);
+        requireNonNull(valueSerializerClass);
+
+        final Properties producerProps = new Properties();
+        producerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
+        producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
+        producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
+        return new KafkaProducer<>(producerProps);
+    }
+
+    /**
+     * Create a {@link Consumer} that has a unique group ID and reads everything from a topic in Kafka
+     * starting at the earliest point by default.
+     *
+     * @param kafkaHostname - The Kafka broker hostname. (not null)
+     * @param kafkaPort - The Kafka broker port.
+     * @param keyDeserializerClass - Deserializes the keys. (not null)
+     * @param valueDeserializerClass - Deserializes the values. (not null)
+     * @return A {@link Consumer} that can be used to read records from a topic.
+     */
+    private static <K, V> Consumer<K, V> fromStartConsumer(
+            final String kafkaHostname,
+            final int kakfaPort,
+            final Class<? extends Deserializer<K>> keyDeserializerClass,
+            final Class<? extends Deserializer<V>> valueDeserializerClass) {
+        requireNonNull(kafkaHostname);
+        requireNonNull(keyDeserializerClass);
+        requireNonNull(valueDeserializerClass);
+
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
+        return new KafkaConsumer<>(consumerProps);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/pom.xml
----------------------------------------------------------------------
diff --git a/extras/shell/pom.xml b/extras/shell/pom.xml
index 1a07400..fcca909 100644
--- a/extras/shell/pom.xml
+++ b/extras/shell/pom.xml
@@ -59,6 +59,10 @@
             <artifactId>rya.pcj.fluo.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.streams.kafka</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.fluo</groupId>
             <artifactId>fluo-core</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
index b6fddb0..b4168af 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
@@ -34,10 +34,15 @@ import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.api.client.mongo.MongoConnectionDetails;
 import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
 import org.apache.rya.shell.SharedShellState.ConnectionState;
+import org.apache.rya.shell.SharedShellState.ShellState;
 import org.apache.rya.shell.SharedShellState.StorageType;
 import org.apache.rya.shell.util.ConnectorFactory;
 import org.apache.rya.shell.util.PasswordPrompt;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.shell.core.CommandMarker;
 import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
@@ -222,31 +227,58 @@ public class RyaConnectionCommands implements CommandMarker {
     @CliCommand(value = CONNECT_INSTANCE_CMD, help = "Connect to a specific Rya instance")
     public void connectToInstance(
             @CliOption(key = {"instance"}, mandatory = true, help = "The name of the Rya instance the shell will interact with.")
-            final String instance) {
+            final String ryaInstance) {
+        final RyaClient ryaClient = sharedState.getShellState().getConnectedCommands().get();
         try {
-            final InstanceExists instanceExists = sharedState.getShellState().getConnectedCommands().get().getInstanceExists();
+            final InstanceExists instanceExists = ryaClient.getInstanceExists();
 
             // Make sure the requested instance exists.
-            if(!instanceExists.exists(instance)) {
-                throw new RuntimeException(String.format("'%s' does not match an existing Rya instance.", instance));
+            if(!instanceExists.exists(ryaInstance)) {
+                throw new RuntimeException(String.format("'%s' does not match an existing Rya instance.", ryaInstance));
+            }
+
+            // Store the instance name in the shared state.
+            sharedState.connectedToInstance(ryaInstance);
+
+            // If the Rya instance is configured to interact with Rya Streams, then connect the
+            // Rya Streams client to the shared state.
+            final com.google.common.base.Optional<RyaDetails> ryaDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance);
+            if(ryaDetails.isPresent()) {
+                final com.google.common.base.Optional<RyaStreamsDetails> streamsDetails = ryaDetails.get().getRyaStreamsDetails();
+                if(streamsDetails.isPresent()) {
+                    final String kafkaHostname = streamsDetails.get().getHostname();
+                    final int kafkaPort = streamsDetails.get().getPort();
+                    final RyaStreamsClient streamsClient = KafkaRyaStreamsClientFactory.make(ryaInstance, kafkaHostname, kafkaPort);
+                    sharedState.connectedToRyaStreams(streamsClient);
+                }
             }
         } catch(final RyaClientException e) {
             throw new RuntimeException("Could not connect to Rya instance. Reason: " + e.getMessage(), e);
         }
-
-        // Store the instance name in the shared state.
-        sharedState.connectedToInstance(instance);
     }
 
     @CliCommand(value = DISCONNECT_COMMAND_NAME_CMD, help = "Disconnect the shell's Rya storage connection (Accumulo).")
     public void disconnect() {
+        final ShellState shellState = sharedState.getShellState();
+
         // If connected to Mongo, there is a client that needs to be closed.
-        final com.google.common.base.Optional<MongoClient> mongoAdminClient = sharedState.getShellState().getMongoAdminClient();
+        final com.google.common.base.Optional<MongoClient> mongoAdminClient = shellState.getMongoAdminClient();
         if(mongoAdminClient.isPresent()) {
             mongoAdminClient.get().close();
         }
 
+        // If connected to Rya Streams, then close the associated resources.
+        final com.google.common.base.Optional<RyaStreamsClient> streamsClient = shellState.getRyaStreamsCommands();
+        if(streamsClient.isPresent()) {
+            try {
+                streamsClient.get().close();
+            } catch (final Exception e) {
+                System.err.print("Could not close the RyaStreamsClient.");
+                e.printStackTrace();
+            }
+        }
+
         // Update the shared state to disconnected.
         sharedState.disconnected();
     }
-}
+}
\ No newline at end of file