You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/29 20:36:45 UTC
[2/2] incubator-rya git commit: RYA-440 Added commands to Rya Shell
used to interact with Rya Streams. Closes #267.
RYA-440 Added commands to Rya Shell used to interact with Rya Streams. Closes #267.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/59b20263
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/59b20263
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/59b20263
Branch: refs/heads/master
Commit: 59b20263c7c574d46d8462985fd7be126e8c4ea1
Parents: 3d4a5d0
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Jan 16 17:40:08 2018 -0500
Committer: caleb <ca...@parsons.com>
Committed: Mon Jan 29 14:01:11 2018 -0500
----------------------------------------------------------------------
.../org/apache/rya/api/client/RyaClient.java | 10 +
.../api/client/SetRyaStreamsConfiguration.java | 41 ++
.../org/apache/rya/api/instance/RyaDetails.java | 104 ++++-
.../apache/rya/api/instance/RyaDetailsTest.java | 8 +-
.../mongodb/instance/MongoDetailsAdapter.java | 87 +++-
.../instance/MongoDetailsAdapterTest.java | 9 +-
.../client/SetRyaStreamsConfigurationBase.java | 83 ++++
.../accumulo/AccumuloRyaClientFactory.java | 6 +-
.../AccumuloSetRyaStreamsConfiguration.java | 59 +++
.../api/client/mongo/MongoRyaClientFactory.java | 1 +
.../mongo/MongoSetRyaStreamsConfiguration.java | 60 +++
.../AccumuloSetRyaStreamsConfigurationIT.java | 81 ++++
.../MongoSetRyaStreamsConfigurationIT.java | 85 ++++
.../rya/streams/api/RyaStreamsClient.java | 137 ++++++
.../rya/streams/api/interactor/GetQuery.java | 44 ++
.../interactor/defaults/DefaultGetQuery.java | 55 +++
.../kafka/KafkaRyaStreamsClientFactory.java | 170 +++++++
extras/shell/pom.xml | 4 +
.../apache/rya/shell/RyaConnectionCommands.java | 50 +-
.../apache/rya/shell/RyaStreamsCommands.java | 297 ++++++++++++
.../org/apache/rya/shell/SharedShellState.java | 58 ++-
.../rya/shell/util/StreamsQueryFormatter.java | 106 +++++
.../META-INF/spring/spring-shell-plugin.xml | 3 +-
.../rya/shell/RyaStreamsCommandsTest.java | 461 +++++++++++++++++++
.../apache/rya/shell/SharedShellStateTest.java | 31 ++
.../shell/util/StreamsQueryFormatterTest.java | 102 ++++
.../src/test/resources/RyaShellTest-context.xml | 1 +
27 files changed, 2101 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
index 92b18a1..c122f43 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
@@ -45,6 +45,7 @@ public class RyaClient {
private final ListInstances listInstances;
private final Optional<AddUser> addUser;
private final Optional<RemoveUser> removeUser;
+ private final SetRyaStreamsConfiguration setRyaStreamsConfig;
private final Uninstall uninstall;
private final LoadStatements loadStatements;
private final LoadStatementsFile loadStatementsFile;
@@ -65,6 +66,7 @@ public class RyaClient {
final ListInstances listInstances,
final Optional<AddUser> addUser,
final Optional<RemoveUser> removeUser,
+ final SetRyaStreamsConfiguration setRyaStreamsConfig,
final Uninstall uninstall,
final LoadStatements loadStatements,
final LoadStatementsFile loadStatementsFile,
@@ -81,6 +83,7 @@ public class RyaClient {
this.listInstances = requireNonNull(listInstances);
this.addUser = requireNonNull(addUser);
this.removeUser = requireNonNull(removeUser);
+ this.setRyaStreamsConfig = requireNonNull(setRyaStreamsConfig);
this.uninstall = requireNonNull(uninstall);
this.loadStatements = requireNonNull(loadStatements);
this.loadStatementsFile = requireNonNull(loadStatementsFile);
@@ -176,6 +179,13 @@ public class RyaClient {
}
/**
+ * @return An instance of {@link SetRyaStreamsConfiguration} that is connected to a Rya storage.
+ */
+ public SetRyaStreamsConfiguration getSetRyaStreamsConfiguration() {
+ return setRyaStreamsConfig;
+ }
+
+ /**
* @return An instance of {@link Uninstall} that is connected to a Rya storage.
*/
public Uninstall getUninstall() {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java b/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..5e75f06
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfiguration.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update which Rya Streams subsystem a Rya instance is connected to.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface SetRyaStreamsConfiguration {
+
+ /**
+ * Update which Rya Streams subsystem a Rya instance is connected to.
+ *
+ * @param instanceName - Indicates which Rya instance will have a Rya Streams subsystem assigned to it. (not null)
+ * @param streamsDetails - Indicates which Rya Streams subsystem the instance will use. (not null)
+ * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+ * @throws RyaClientException Something caused the command to fail.
+ */
+ public void setRyaStreamsConfiguration(String ryaInstance, RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java b/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
index 9d2c1e5..bda7390 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/instance/RyaDetails.java
@@ -29,25 +29,22 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
-
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
/**
* Details about how a Rya instance's state.
*/
@Immutable
@DefaultAnnotation(NonNull.class)
public class RyaDetails implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
// General metadata about the instance.
private final String instanceName;
@@ -68,6 +65,9 @@ public class RyaDetails implements Serializable {
private final ProspectorDetails prospectorDetails;
private final JoinSelectivityDetails joinSelectivityDetails;
+ // Rya Streams Details.
+ private final Optional<RyaStreamsDetails> ryaStreamsDetails;
+
/**
* Private to prevent initialization through the constructor. To build
* instances of this class, use the {@link Builder}.
@@ -82,7 +82,8 @@ public class RyaDetails implements Serializable {
final TemporalIndexDetails temporalDetails,
final FreeTextIndexDetails freeTextDetails,
final ProspectorDetails prospectorDetails,
- final JoinSelectivityDetails joinSelectivityDetails) {
+ final JoinSelectivityDetails joinSelectivityDetails,
+ final Optional<RyaStreamsDetails> ryaStreamsDetails) {
this.instanceName = requireNonNull(instanceName);
this.version = requireNonNull(version);
this.users = requireNonNull(users);
@@ -93,6 +94,7 @@ public class RyaDetails implements Serializable {
this.freeTextDetails = requireNonNull(freeTextDetails);
this.prospectorDetails = requireNonNull(prospectorDetails);
this.joinSelectivityDetails = requireNonNull(joinSelectivityDetails);
+ this.ryaStreamsDetails = requireNonNull(ryaStreamsDetails);
}
/**
@@ -168,6 +170,13 @@ public class RyaDetails implements Serializable {
return joinSelectivityDetails;
}
+ /**
+ * @return Information about the instance's Rya Streams integration, if it was set.
+ */
+ public Optional<RyaStreamsDetails> getRyaStreamsDetails() {
+ return ryaStreamsDetails;
+ }
+
@Override
public int hashCode() {
return Objects.hash(
@@ -179,7 +188,8 @@ public class RyaDetails implements Serializable {
temporalDetails,
freeTextDetails,
prospectorDetails,
- joinSelectivityDetails);
+ joinSelectivityDetails,
+ ryaStreamsDetails);
}
@Override
@@ -197,7 +207,8 @@ public class RyaDetails implements Serializable {
Objects.equals(temporalDetails, details.temporalDetails) &&
Objects.equals(freeTextDetails, details.freeTextDetails) &&
Objects.equals(prospectorDetails, details.prospectorDetails) &&
- Objects.equals(joinSelectivityDetails, details.joinSelectivityDetails);
+ Objects.equals(joinSelectivityDetails, details.joinSelectivityDetails) &&
+ Objects.equals(ryaStreamsDetails, details.ryaStreamsDetails);
}
return false;
}
@@ -239,6 +250,9 @@ public class RyaDetails implements Serializable {
private ProspectorDetails prospectorDetails;
private JoinSelectivityDetails joinSelectivityDetails;
+ // Rya Streams Details.
+ private RyaStreamsDetails ryaStreamsDetails;
+
/**
* Construcst an empty instance of {@link Builder}.
*/
@@ -262,6 +276,7 @@ public class RyaDetails implements Serializable {
freeTextDetails = details.freeTextDetails;
prospectorDetails = details.prospectorDetails;
joinSelectivityDetails = details.joinSelectivityDetails;
+ ryaStreamsDetails = details.ryaStreamsDetails.orNull();
}
/**
@@ -375,6 +390,15 @@ public class RyaDetails implements Serializable {
}
/**
+ * @param ryaStreamsDetails - Information about the instance's Rya Streams integration.
+ * @return This {@link Builder} so that method invocations may be chained.
+ */
+ public Builder setRyaStreamsDetails(@Nullable final RyaStreamsDetails ryaStreamsDetails) {
+ this.ryaStreamsDetails = ryaStreamsDetails;
+ return this;
+ }
+
+ /**
* @return An instance of {@link RyaDetails} built using this
* builder's values.
*/
@@ -389,7 +413,8 @@ public class RyaDetails implements Serializable {
temporalDetails,
freeTextDetails,
prospectorDetails,
- joinSelectivityDetails);
+ joinSelectivityDetails,
+ Optional.fromNullable(ryaStreamsDetails));
}
}
@@ -1071,4 +1096,57 @@ public class RyaDetails implements Serializable {
return false;
}
}
+
+ /**
+ * Details about the Rya instance's Rya Streams integration.
+ */
+ public static class RyaStreamsDetails implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String hostname;
+ private final int port;
+
+ /**
+ * Constructs an instance of {@link RyaStreamsDetails}.
+ *
+ * @param hostname - The hostname used to communicate with the Rya Streams subsystem. (not null)
+ * @param port - The port used to communicate with the Rya Streams subsystem.
+ */
+ public RyaStreamsDetails(final String hostname, final int port) {
+ this.hostname = requireNonNull(hostname);
+ this.port = port;
+ }
+
+ /**
+ * @return The hostname used to communicate with the Rya Streams subsystem.
+ */
+ public String getHostname() {
+ return hostname;
+ }
+
+ /**
+ * @return The port used to communicate with the Rya Streams subsystem.
+ */
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(hostname, port);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if(this == obj) {
+ return true;
+ }
+ if(obj instanceof RyaStreamsDetails) {
+ final RyaStreamsDetails other = (RyaStreamsDetails) obj;
+ return Objects.equals(hostname, other.hostname) &&
+ port == other.port;
+ }
+ return false;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java b/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
index a356877..b6e92e0 100644
--- a/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
+++ b/common/rya.api/src/test/java/org/apache/rya/api/instance/RyaDetailsTest.java
@@ -31,6 +31,7 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
import org.junit.Test;
@@ -65,7 +66,8 @@ public class RyaDetailsTest {
.setId("pcj 2")
.setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
.setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
- .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) );
+ .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5));
final RyaDetails details1 = builder.build();
final RyaDetails details2 = builder.build();
@@ -96,7 +98,8 @@ public class RyaDetailsTest {
.setId("pcj 2")
.setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
.setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
- .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) );
+ .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5));
final RyaDetails details1 = builder.build();
final RyaDetails details2 = builder.build();
@@ -127,6 +130,7 @@ public class RyaDetailsTest {
.setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL)))
.setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
.setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 5))
.build();
// Create a new Builder using another RyaDetails object.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
index 8010808..f86c150 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
@@ -32,10 +32,10 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
@@ -48,7 +48,6 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* Serializes configuration details for use in Mongo.
* The {@link DBObject} will look like:
* <pre>
- * {@code
* {
* "instanceName": <string>,
* "version": <string>?,
@@ -68,6 +67,10 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* "freeTextDetails": <boolean>,
* "prospectorDetails": <date>,
* "joinSelectivityDetails": <date>
+ * "ryaStreamsDetails": {
+ * "hostname": <string>
+ * "port": <int>
+ * }
* }
* </pre>
*/
@@ -91,13 +94,19 @@ public class MongoDetailsAdapter {
public static final String PROSPECTOR_DETAILS_KEY = "prospectorDetails";
public static final String JOIN_SELECTIVITY_DETAILS_KEY = "joinSelectivitiyDetails";
+ public static final String RYA_STREAMS_DETAILS_KEY = "ryaStreamsDetails";
+ public static final String RYA_STREAMS_HOSTNAME_KEY = "hostname";
+ public static final String RYA_STREAMS_PORT_KEY = "port";
+
/**
- * Serializes {@link RyaDetails} to mongo {@link DBObject}.
- * @param details - The details to be serialized.
- * @return The mongo {@link DBObject}.
+ * Converts a {@link RyaDetails} object into its MongoDB {@link DBObject} equivalent.
+ *
+ * @param details - The details to convert. (not null)
+ * @return The MongoDB {@link DBObject} equivalent.
*/
public static BasicDBObject toDBObject(final RyaDetails details) {
- Preconditions.checkNotNull(details);
+ requireNonNull(details);
+
final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start()
.add(INSTANCE_KEY, details.getRyaInstanceName())
.add(VERSION_KEY, details.getRyaVersion())
@@ -106,12 +115,29 @@ public class MongoDetailsAdapter {
.add(PCJ_DETAILS_KEY, toDBObject(details.getPCJIndexDetails()))
.add(TEMPORAL_DETAILS_KEY, details.getTemporalIndexDetails().isEnabled())
.add(FREETEXT_DETAILS_KEY, details.getFreeTextIndexDetails().isEnabled());
+
if(details.getProspectorDetails().getLastUpdated().isPresent()) {
builder.add(PROSPECTOR_DETAILS_KEY, details.getProspectorDetails().getLastUpdated().get());
}
+
if(details.getJoinSelectivityDetails().getLastUpdated().isPresent()) {
builder.add(JOIN_SELECTIVITY_DETAILS_KEY, details.getJoinSelectivityDetails().getLastUpdated().get());
}
+
+ // If the Rya Streams Details are present, then add them.
+ if(details.getRyaStreamsDetails().isPresent()) {
+ final RyaStreamsDetails ryaStreamsDetails = details.getRyaStreamsDetails().get();
+
+ // The embedded object that holds onto the fields.
+ final DBObject ryaStreamsFields = BasicDBObjectBuilder.start()
+ .add(RYA_STREAMS_HOSTNAME_KEY, ryaStreamsDetails.getHostname())
+ .add(RYA_STREAMS_PORT_KEY, ryaStreamsDetails.getPort())
+ .get();
+
+ // Add them to the main builder.
+ builder.add(RYA_STREAMS_DETAILS_KEY, ryaStreamsFields);
+ }
+
return (BasicDBObject) builder.get();
}
@@ -154,20 +180,38 @@ public class MongoDetailsAdapter {
return builder.get();
}
+ /**
+ * Converts a MongoDB {@link DBObject} into its {@link RyaDetails} equivalent.
+ *
+ * @param mongoObj - The MongoDB object to convert. (not null)
+ * @return The equivalent {@link RyaDetails} object.
+ * @throws MalformedRyaDetailsException The MongoDB object could not be converted.
+ */
public static RyaDetails toRyaDetails(final DBObject mongoObj) throws MalformedRyaDetailsException {
+ requireNonNull(mongoObj);
final BasicDBObject basicObj = (BasicDBObject) mongoObj;
try {
- return RyaDetails.builder()
- .setRyaInstanceName(basicObj.getString(INSTANCE_KEY))
- .setRyaVersion(basicObj.getString(VERSION_KEY))
- .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY)))
- //RYA-215 .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY)))
- .setPCJIndexDetails(getPCJIndexDetails(basicObj))
- .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY)))
- .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY)))
- .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY))))
- .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY))))
- .build();
+ final RyaDetails.Builder builder = RyaDetails.builder()
+ .setRyaInstanceName(basicObj.getString(INSTANCE_KEY))
+ .setRyaVersion(basicObj.getString(VERSION_KEY))
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY)))
+ //RYA-215 .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY)))
+ .setPCJIndexDetails(getPCJIndexDetails(basicObj))
+ .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY)))
+ .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY)))
+ .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY))))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY))));
+
+ // If the Rya Streams Details are present, then add them.
+ if(basicObj.containsField(RYA_STREAMS_DETAILS_KEY)) {
+ final BasicDBObject streamsObject = (BasicDBObject) basicObj.get(RYA_STREAMS_DETAILS_KEY);
+ final String hostname = streamsObject.getString(RYA_STREAMS_HOSTNAME_KEY);
+ final int port = streamsObject.getInt(RYA_STREAMS_PORT_KEY);
+ builder.setRyaStreamsDetails(new RyaStreamsDetails(hostname, port));
+ }
+
+ return builder.build();
+
} catch(final Exception e) {
throw new MalformedRyaDetailsException("Failed to make RyaDetail from Mongo Object, it is malformed.", e);
}
@@ -213,14 +257,15 @@ public class MongoDetailsAdapter {
}
/**
- * Exception thrown when a MongoDB {@link DBObject} is malformed when attemptin
- * to adapt it into a {@link RyaDetails}.
+ * Indicates a MongoDB {@link DBObject} was malformed when attempting
+ * to convert it into a {@link RyaDetails} object.
*/
public static class MalformedRyaDetailsException extends Exception {
private static final long serialVersionUID = 1L;
/**
- * Creates a new {@link MalformedRyaDetailsException}
+ * Creates a new {@link MalformedRyaDetailsException}.
+ *
* @param message - The message to be displayed by the exception.
* @param e - The source cause of the exception.
*/
@@ -228,4 +273,4 @@ public class MongoDetailsAdapter {
super(message, e);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
index 0ea9456..f5845c2 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
@@ -32,6 +32,7 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
import org.apache.rya.mongodb.instance.MongoDetailsAdapter.MalformedRyaDetailsException;
import org.junit.Test;
@@ -72,6 +73,7 @@ public class MongoDetailsAdapterTest {
.setFreeTextDetails(new FreeTextIndexDetails(true))
.setProspectorDetails(new ProspectorDetails(Optional.fromNullable(new Date(0L))))
.setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.fromNullable(new Date(1L))))
+ .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 6))
.build();
final BasicDBObject actual = MongoDetailsAdapter.toDBObject(details);
@@ -100,7 +102,8 @@ public class MongoDetailsAdapterTest {
+ "temporalDetails : true,"
+ "freeTextDetails : true,"
+ "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"},"
- + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}"
+ + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"},"
+ + "ryaStreamsDetails : { hostname : \"localhost\" , port : 6}"
+ "}"
);
@@ -134,7 +137,8 @@ public class MongoDetailsAdapterTest {
+ "temporalDetails : true,"
+ "freeTextDetails : true,"
+ "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"},"
- + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}"
+ + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"},"
+ + "ryaStreamsDetails : { hostname : \"localhost\" , port : 6}"
+ "}"
);
@@ -163,6 +167,7 @@ public class MongoDetailsAdapterTest {
.setFreeTextDetails(new FreeTextIndexDetails(true))
.setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(new Date(0L))))
.setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(new Date(1L))))
+ .setRyaStreamsDetails(new RyaStreamsDetails("localhost", 6))
.build();
assertEquals(expected, actual);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java b/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
new file mode 100644
index 0000000..92a4c44
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.instance.RyaDetailsUpdater;
+import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A base class that implements the core functionality of the {@link SetRyaStreamsConfiguration} interactor.
+ * Subclasses just need to implement {@link #getRyaDetailsRepo(String)} so that the common code may update
+ * any implementation of that repository.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class SetRyaStreamsConfigurationBase implements SetRyaStreamsConfiguration {
+
+ private final InstanceExists instanceExists;
+
+ /**
+ * Constructs an instance of {@link SetRyaStreamsConfigurationBase}.
+ *
+ * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+ */
+ public SetRyaStreamsConfigurationBase(final InstanceExists instanceExists) {
+ this.instanceExists = requireNonNull(instanceExists);
+ }
+
+ /**
+ * Get a {@link RyaDetailsRepository} that is connected to a specific instance of Rya.
+ *
+ * @param ryaInstance - The Rya instance the repository must be connected to. (not null)
+ * @return A {@link RyaDetailsRepository} connected to the specified Rya instance.
+ */
+ protected abstract RyaDetailsRepository getRyaDetailsRepo(String ryaInstance);
+
+ @Override
+ public void setRyaStreamsConfiguration(final String ryaInstance, final RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException{
+ requireNonNull(ryaInstance);
+ requireNonNull(streamsDetails);
+
+ // Verify the Rya Instance exists.
+ if(!instanceExists.exists(ryaInstance)) {
+ throw new InstanceDoesNotExistException("There is no Rya instance named '" + ryaInstance + "' in this storage.");
+ }
+
+ // Update the old details object using the provided Rya Streams details.
+ final RyaDetailsRepository repo = getRyaDetailsRepo(ryaInstance);
+ try {
+ new RyaDetailsUpdater(repo).update(oldDetails -> {
+ final RyaDetails.Builder builder = RyaDetails.builder(oldDetails);
+ builder.setRyaStreamsDetails(streamsDetails);
+ return builder.build();
+ });
+ } catch (CouldNotApplyMutationException | RyaDetailsRepositoryException e) {
+ throw new RyaClientException("Unable to update which Rya Streams subsystem is used by the '" +
+ ryaInstance + "' Rya instance.", e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index 17fddaa..fdabea9 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -23,6 +23,7 @@ import static java.util.Objects.requireNonNull;
import java.util.Optional;
import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.api.client.InstanceExists;
import org.apache.rya.api.client.RyaClient;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -50,6 +51,8 @@ public class AccumuloRyaClientFactory {
requireNonNull(connector);
// Build the RyaCommands option with the initialized commands.
+ final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, connector);
+
return new RyaClient(
new AccumuloInstall(connectionDetails, connector),
new AccumuloCreatePCJ(connectionDetails, connector),
@@ -59,10 +62,11 @@ public class AccumuloRyaClientFactory {
Optional.of(new AccumuloListIncrementalQueries(connectionDetails, connector)),
new AccumuloBatchUpdatePCJ(connectionDetails, connector),
new AccumuloGetInstanceDetails(connectionDetails, connector),
- new AccumuloInstanceExists(connectionDetails, connector),
+ instanceExists,
new AccumuloListInstances(connectionDetails, connector),
Optional.of(new AccumuloAddUser(connectionDetails, connector)),
Optional.of(new AccumuloRemoveUser(connectionDetails, connector)),
+ new AccumuloSetRyaStreamsConfiguration(instanceExists, connector),
new AccumuloUninstall(connectionDetails, connector),
new AccumuloLoadStatements(connectionDetails, connector),
new AccumuloLoadStatementsFile(connectionDetails, connector),
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..f19d9fb
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.SetRyaStreamsConfigurationBase;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An Accumulo implementation of {@link SetRyaStreamsConfiguration}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase {
+
+ private final Connector connector;
+
+ /**
+ * Constructs an instance of {@link AccumuloSetRyaStreamsConfiguration}.
+ *
+ * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+ * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
+ */
+ public AccumuloSetRyaStreamsConfiguration(
+ final InstanceExists instanceExists,
+ final Connector connector) {
+ super(instanceExists);
+ this.connector = requireNonNull(connector);
+ }
+
+ @Override
+ protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) {
+ requireNonNull(ryaInstance);
+ return new AccumuloRyaInstanceDetailsRepository(connector, ryaInstance);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
index fbbec2a..5fa4877 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoRyaClientFactory.java
@@ -66,6 +66,7 @@ public class MongoRyaClientFactory {
new MongoListInstances(adminClient),
Optional.empty(),
Optional.empty(),
+ new MongoSetRyaStreamsConfiguration(instanceExists, adminClient),
new MongoUninstall(adminClient, instanceExists),
new MongoLoadStatements(connectionDetails, instanceExists),
new MongoLoadStatementsFile(connectionDetails, instanceExists),
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java
new file mode 100644
index 0000000..592e663
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.SetRyaStreamsConfiguration;
+import org.apache.rya.api.client.SetRyaStreamsConfigurationBase;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+
+import com.mongodb.MongoClient;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A MongoDB implementation of {@link SetRyaStreamsConfiguration}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase {
+
+ private final MongoClient client;
+
+ /**
+ * Constructs an instance of {@link MongoSetRyaStreamsConfiguration}.
+ *
+ * @param instanceExists - The interactor used to verify Rya instances exist. (not null)
+ * @param client - The MongoDB client used to connect to the Rya storage. (not null)
+ */
+ public MongoSetRyaStreamsConfiguration(
+ final InstanceExists instanceExists,
+ final MongoClient client) {
+ super(instanceExists);
+ this.client = requireNonNull(client);
+ }
+
+ @Override
+ protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) {
+ requireNonNull(ryaInstance);
+ return new MongoRyaInstanceDetailsRepository(client, ryaInstance);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java
new file mode 100644
index 0000000..928a29e
--- /dev/null
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import org.apache.rya.accumulo.AccumuloITBase;
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link AccumuloSetRyaStreamsConfiguration}.
+ */
+public class AccumuloSetRyaStreamsConfigurationIT extends AccumuloITBase {
+
+ @Test(expected = InstanceDoesNotExistException.class)
+ public void instanceDoesNotExist() throws Exception {
+ final String ryaInstance = getRyaInstanceName();
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ getUsername(),
+ getPassword().toCharArray(),
+ getInstanceName(),
+ getZookeepers());
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
+ // Skip the install step to create error causing situation.
+ final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+ ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+ }
+
+ @Test
+ public void updatesRyaDetails() throws Exception {
+ final String ryaInstance = getRyaInstanceName();
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ getUsername(),
+ getPassword().toCharArray(),
+ getInstanceName(),
+ getZookeepers());
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
+ // Install an instance of Rya.
+ final Install installRya = ryaClient.getInstall();
+ final InstallConfiguration installConf = InstallConfiguration.builder()
+ .build();
+ installRya.install(ryaInstance, installConf);
+
+ // Fetch its details and show they do not have any RyaStreamsDetails.
+ com.google.common.base.Optional<RyaStreamsDetails> streamsDetails =
+ ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+ assertFalse(streamsDetails.isPresent());
+
+ // Set the details.
+ final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+ ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+
+ // Fetch its details again and show that they are now filled in.
+ streamsDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+ assertEquals(details, streamsDetails.get());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java
new file mode 100644
index 0000000..5fea578
--- /dev/null
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfigurationIT.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Optional;
+
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.mongodb.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoSetRyaStreamsConfiguration}.
+ */
+public class MongoSetRyaStreamsConfigurationIT extends MongoITBase {
+
+ @Test(expected = InstanceDoesNotExistException.class)
+ public void instanceDoesNotExist() throws Exception {
+ final RyaClient ryaClient = MongoRyaClientFactory.build(getConnectionDetails(), getMongoClient());
+
+ // Skip the install step to create error causing situation.
+ final String ryaInstance = conf.getRyaInstanceName();
+ final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+ ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+ }
+
+ @Test
+ public void updatesRyaDetails() throws Exception {
+ final RyaClient ryaClient = MongoRyaClientFactory.build(getConnectionDetails(), getMongoClient());
+
+ // Install an instance of Rya.
+ final String ryaInstance = conf.getRyaInstanceName();
+ final Install installRya = ryaClient.getInstall();
+ final InstallConfiguration installConf = InstallConfiguration.builder()
+ .build();
+ installRya.install(ryaInstance, installConf);
+
+ // Fetch its details and show they do not have any RyaStreamsDetails.
+ com.google.common.base.Optional<RyaStreamsDetails> streamsDetails =
+ ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+ assertFalse(streamsDetails.isPresent());
+
+ // Set the details.
+ final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6);
+ ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details);
+
+ // Fetch its details again and show that they are now filled in.
+ streamsDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance).get().getRyaStreamsDetails();
+ assertEquals(details, streamsDetails.get());
+ }
+
+ private MongoConnectionDetails getConnectionDetails() {
+ final Optional<char[]> password = conf.getMongoPassword() != null ?
+ Optional.of(conf.getMongoPassword().toCharArray()) :
+ Optional.empty();
+
+ return new MongoConnectionDetails(
+ conf.getMongoHostname(),
+ Integer.parseInt(conf.getMongoPort()),
+ Optional.ofNullable(conf.getMongoUser()),
+ password);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java
new file mode 100644
index 0000000..ee86e41
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/RyaStreamsClient.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.api.interactor.AddQuery;
+import org.apache.rya.streams.api.interactor.DeleteQuery;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.interactor.GetQueryResultStream;
+import org.apache.rya.streams.api.interactor.ListQueries;
+import org.apache.rya.streams.api.interactor.StartQuery;
+import org.apache.rya.streams.api.interactor.StopQuery;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides access to a set of Rya Streams functions.Statement
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaStreamsClient implements AutoCloseable {
+
+ private final AddQuery addQuery;
+ private final GetQuery getQuery;
+ private final DeleteQuery deleteQuery;
+ private final GetQueryResultStream<VisibilityStatement> getStatementResultStream;
+ private final GetQueryResultStream<VisibilityBindingSet> getBindingSetResultStream;
+ private final ListQueries listQueries;
+ private final StartQuery startQuery;
+ private final StopQuery stopQuery;
+
+ /**
+ * Constructs an instance of {@link RyaStreamsClient}.
+ */
+ public RyaStreamsClient(
+ final AddQuery addQuery,
+ final GetQuery getQuery,
+ final DeleteQuery deleteQuery,
+ final GetQueryResultStream<VisibilityStatement> getStatementResultStream,
+ final GetQueryResultStream<VisibilityBindingSet> getBindingSetResultStream,
+ final ListQueries listQueries,
+ final StartQuery startQuery,
+ final StopQuery stopQuery) {
+ this.addQuery = requireNonNull(addQuery);
+ this.getQuery = requireNonNull(getQuery);
+ this.deleteQuery = requireNonNull(deleteQuery);
+ this.getStatementResultStream = requireNonNull(getStatementResultStream);
+ this.getBindingSetResultStream = requireNonNull(getBindingSetResultStream);
+ this.listQueries = requireNonNull(listQueries);
+ this.startQuery = requireNonNull(startQuery);
+ this.stopQuery = requireNonNull(stopQuery);
+ }
+
+ /**
+ * @return The connected {@link AddQuery} interactor.
+ */
+ public AddQuery getAddQuery() {
+ return addQuery;
+ }
+
+ /**
+ * @return The connected {@link GetQuery} interactor.
+ */
+ public GetQuery getGetQuery() {
+ return getQuery;
+ }
+
+ /**
+ * @return The connected {@link DeleteQuery} interactor.
+ */
+ public DeleteQuery getDeleteQuery() {
+ return deleteQuery;
+ }
+
+ /**
+ * @return The connected {@link GetQueryResultStream} interactor for a query that produces
+ * {@link VisibilityStatement}s.
+ */
+ public GetQueryResultStream<VisibilityStatement> getGetStatementResultStream() {
+ return getStatementResultStream;
+ }
+
+ /**
+ * @return The connected {@link GetQueryResultStream} interactor for a query that produces
+ * {@link VisibilityBindingSet}s.
+ */
+ public GetQueryResultStream<VisibilityBindingSet> getGetBindingSetResultStream() {
+ return getBindingSetResultStream;
+ }
+
+ /**
+ * @return The connected {@link ListQueries} interactor.
+ */
+ public ListQueries getListQueries() {
+ return listQueries;
+ }
+
+ /**
+ * @return The connected {@link StartQuery} interactor.
+ */
+ public StartQuery getStartQuery() {
+ return startQuery;
+ }
+
+ /**
+ * @return The connected {@link StopQuery} interactor.
+ */
+ public StopQuery getStopQuery() {
+ return stopQuery;
+ }
+
+ /**
+ * By defualt, this client doesn't close anything. If an implementation of the client
+ * requires closing components, then override this method.
+ */
+ @Override
+ public void close() throws Exception { }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java
new file mode 100644
index 0000000..1293714
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/GetQuery.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor;
+
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Get a {@link StreamsQuery} from Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface GetQuery {
+
+ /**
+ * Get a {@link StreamsQuery} from Rya Streams.
+ *
+ * @param queryId - Identifies the query to fetch. (not null)
+ * @return The {@link StreamsQuery} for the {@code queryId}; if one is stored for the ID.
+ * @throws RyaStreamsException The query could not be fetched.
+ */
+ public Optional<StreamsQuery> getQuery(UUID queryId) throws RyaStreamsException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java
new file mode 100644
index 0000000..14b93ab
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.GetQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Get a {@link StreamsQuery} from Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public class DefaultGetQuery implements GetQuery {
+ private final QueryRepository repository;
+
+ /**
+ * Constructs an instance of {@link DefaultGetQuery}.
+ *
+ * @param repository - The {@link QueryRepository} to get queries from. (not null)
+ */
+ public DefaultGetQuery(final QueryRepository repository) {
+ this.repository = requireNonNull(repository);
+ }
+
+ @Override
+ public Optional<StreamsQuery> getQuery(final UUID queryId) throws RyaStreamsException {
+ requireNonNull(queryId);
+ return repository.get(queryId);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
new file mode 100644
index 0000000..9250d9d
--- /dev/null
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries;
+import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class KafkaRyaStreamsClientFactory {
+ private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class);
+
+ /**
+ * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams
+ * that is backed by Kafka.
+ *
+ * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null)
+ * @param kafkaHostname - The hostname of the Kafka Broker.
+ * @param kafkaPort - The port of the Kafka Broker.
+ * @return The initialized commands.
+ */
+ public static RyaStreamsClient make(
+ final String ryaInstance,
+ final String kafkaHostname,
+ final int kafkaPort) {
+ requireNonNull(ryaInstance);
+ requireNonNull(kafkaHostname);
+
+ // Setup Query Repository used by the Kafka Rya Streams subsystem.
+ final Producer<?, QueryChange> queryProducer =
+ makeProducer(kafkaHostname, kafkaPort, StringSerializer.class, QueryChangeSerializer.class);
+ final Consumer<?, QueryChange>queryConsumer =
+ fromStartConsumer(kafkaHostname, kafkaPort, StringDeserializer.class, QueryChangeDeserializer.class);
+ final String changeLogTopic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+ final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
+ final QueryRepository queryRepo = new InMemoryQueryRepository(changeLog);
+
+ // Create the Rya Streams client that is backed by a Kafka Query Change Log.
+ return new RyaStreamsClient(
+ new DefaultAddQuery(queryRepo),
+ new DefaultGetQuery(queryRepo),
+ new DefaultDeleteQuery(queryRepo),
+ new KafkaGetQueryResultStream<>(kafkaHostname, "" + kafkaPort, VisibilityStatementDeserializer.class),
+ new KafkaGetQueryResultStream<>(kafkaHostname, "" + kafkaPort, VisibilityBindingSetDeserializer.class),
+ new DefaultListQueries(queryRepo),
+ new DefaultStartQuery(queryRepo),
+ new DefaultStopQuery(queryRepo)) {
+
+ /**
+ * Close the QueryRepository used by the returned client.
+ */
+ @Override
+ public void close() {
+ try {
+ queryRepo.close();
+ } catch (final Exception e) {
+ log.warn("Couldn't close a QueryRepository.", e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Create a {@link Producer} that is able to write to a topic in Kafka.
+ *
+ * @param kafkaHostname - The Kafka broker hostname. (not null)
+ * @param kafkaPort - The Kafka broker port.
+ * @param keySerializerClass - Serializes the keys. (not null)
+ * @param valueSerializerClass - Serializes the values. (not null)
+ * @return A {@link Producer} that can be used to write records to a topic.
+ */
+ private static <K, V> Producer<K, V> makeProducer(
+ final String kafkaHostname,
+ final int kakfaPort,
+ final Class<? extends Serializer<K>> keySerializerClass,
+ final Class<? extends Serializer<V>> valueSerializerClass) {
+ requireNonNull(kafkaHostname);
+ requireNonNull(keySerializerClass);
+ requireNonNull(valueSerializerClass);
+
+ final Properties producerProps = new Properties();
+ producerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
+ producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
+ producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
+ return new KafkaProducer<>(producerProps);
+ }
+
+ /**
+ * Create a {@link Consumer} that has a unique group ID and reads everything from a topic in Kafka
+ * starting at the earliest point by default.
+ *
+ * @param kafkaHostname - The Kafka broker hostname. (not null)
+ * @param kafkaPort - The Kafka broker port.
+ * @param keyDeserializerClass - Deserializes the keys. (not null)
+ * @param valueDeserializerClass - Deserializes the values. (not null)
+ * @return A {@link Consumer} that can be used to read records from a topic.
+ */
+ private static <K, V> Consumer<K, V> fromStartConsumer(
+ final String kafkaHostname,
+ final int kakfaPort,
+ final Class<? extends Deserializer<K>> keyDeserializerClass,
+ final Class<? extends Deserializer<V>> valueDeserializerClass) {
+ requireNonNull(kafkaHostname);
+ requireNonNull(keyDeserializerClass);
+ requireNonNull(valueDeserializerClass);
+
+ final Properties consumerProps = new Properties();
+ consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname + ":" + kakfaPort);
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
+ return new KafkaConsumer<>(consumerProps);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/pom.xml
----------------------------------------------------------------------
diff --git a/extras/shell/pom.xml b/extras/shell/pom.xml
index 1a07400..fcca909 100644
--- a/extras/shell/pom.xml
+++ b/extras/shell/pom.xml
@@ -59,6 +59,10 @@
<artifactId>rya.pcj.fluo.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.streams.kafka</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-core</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/59b20263/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
index b6fddb0..b4168af 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaConnectionCommands.java
@@ -34,10 +34,15 @@ import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.api.client.mongo.MongoConnectionDetails;
import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
import org.apache.rya.shell.SharedShellState.ConnectionState;
+import org.apache.rya.shell.SharedShellState.ShellState;
import org.apache.rya.shell.SharedShellState.StorageType;
import org.apache.rya.shell.util.ConnectorFactory;
import org.apache.rya.shell.util.PasswordPrompt;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
@@ -222,31 +227,58 @@ public class RyaConnectionCommands implements CommandMarker {
@CliCommand(value = CONNECT_INSTANCE_CMD, help = "Connect to a specific Rya instance")
public void connectToInstance(
@CliOption(key = {"instance"}, mandatory = true, help = "The name of the Rya instance the shell will interact with.")
- final String instance) {
+ final String ryaInstance) {
+ final RyaClient ryaClient = sharedState.getShellState().getConnectedCommands().get();
try {
- final InstanceExists instanceExists = sharedState.getShellState().getConnectedCommands().get().getInstanceExists();
+ final InstanceExists instanceExists = ryaClient.getInstanceExists();
// Make sure the requested instance exists.
- if(!instanceExists.exists(instance)) {
- throw new RuntimeException(String.format("'%s' does not match an existing Rya instance.", instance));
+ if(!instanceExists.exists(ryaInstance)) {
+ throw new RuntimeException(String.format("'%s' does not match an existing Rya instance.", ryaInstance));
+ }
+
+ // Store the instance name in the shared state.
+ sharedState.connectedToInstance(ryaInstance);
+
+ // If the Rya instance is configured to interact with Rya Streams, then connect the
+ // Rya Streams client to the shared state.
+ final com.google.common.base.Optional<RyaDetails> ryaDetails = ryaClient.getGetInstanceDetails().getDetails(ryaInstance);
+ if(ryaDetails.isPresent()) {
+ final com.google.common.base.Optional<RyaStreamsDetails> streamsDetails = ryaDetails.get().getRyaStreamsDetails();
+ if(streamsDetails.isPresent()) {
+ final String kafkaHostname = streamsDetails.get().getHostname();
+ final int kafkaPort = streamsDetails.get().getPort();
+ final RyaStreamsClient streamsClient = KafkaRyaStreamsClientFactory.make(ryaInstance, kafkaHostname, kafkaPort);
+ sharedState.connectedToRyaStreams(streamsClient);
+ }
}
} catch(final RyaClientException e) {
throw new RuntimeException("Could not connect to Rya instance. Reason: " + e.getMessage(), e);
}
-
- // Store the instance name in the shared state.
- sharedState.connectedToInstance(instance);
}
@CliCommand(value = DISCONNECT_COMMAND_NAME_CMD, help = "Disconnect the shell's Rya storage connection (Accumulo).")
public void disconnect() {
+ final ShellState shellState = sharedState.getShellState();
+
// If connected to Mongo, there is a client that needs to be closed.
- final com.google.common.base.Optional<MongoClient> mongoAdminClient = sharedState.getShellState().getMongoAdminClient();
+ final com.google.common.base.Optional<MongoClient> mongoAdminClient = shellState.getMongoAdminClient();
if(mongoAdminClient.isPresent()) {
mongoAdminClient.get().close();
}
+ // If connected to Rya Streams, then close the associated resources.
+ final com.google.common.base.Optional<RyaStreamsClient> streamsClient = shellState.getRyaStreamsCommands();
+ if(streamsClient.isPresent()) {
+ try {
+ streamsClient.get().close();
+ } catch (final Exception e) {
+ System.err.print("Could not close the RyaStreamsClient.");
+ e.printStackTrace();
+ }
+ }
+
// Update the shared state to disconnected.
sharedState.disconnected();
}
-}
+}
\ No newline at end of file