You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2015/09/17 15:27:55 UTC
[3/7] incubator-brooklyn git commit: makes connection string
available on MongoDbReplicaSet
makes connection string available on MongoDbReplicaSet
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/16b8b1e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/16b8b1e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/16b8b1e3
Branch: refs/heads/master
Commit: 16b8b1e393d525c88ba5b540ad880dab899888d6
Parents: b3ce256
Author: Robert Moss <ro...@gmail.com>
Authored: Tue Sep 15 15:06:43 2015 +0100
Committer: Robert Moss <ro...@gmail.com>
Committed: Wed Sep 16 15:35:28 2015 +0100
----------------------------------------------------------------------
.../nosql/mongodb/AbstractMongoDBSshDriver.java | 8 ++--
.../entity/nosql/mongodb/MongoDBReplicaSet.java | 4 +-
.../nosql/mongodb/MongoDBReplicaSetImpl.java | 48 ++++++++++++++-----
.../entity/nosql/mongodb/MongoDBServerImpl.java | 50 ++++++++++----------
4 files changed, 68 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java
index c182355..14c495e 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java
@@ -27,15 +27,15 @@ import org.apache.brooklyn.api.location.OsDetails;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver;
import org.apache.brooklyn.entity.software.base.lifecycle.ScriptHelper;
-import org.apache.brooklyn.util.core.internal.ssh.SshTool;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.internal.ssh.SshTool;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.net.Networking;
import org.apache.brooklyn.util.os.Os;
import org.apache.brooklyn.util.ssh.BashCommands;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
index 12bbe87..6ebc17e 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
@@ -26,6 +26,7 @@ import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.entity.database.DatastoreMixins.HasDatastoreUrl;
import org.apache.brooklyn.entity.group.Cluster;
import org.apache.brooklyn.entity.group.DynamicCluster;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
@@ -46,7 +47,7 @@ import com.google.common.reflect.TypeToken;
* @see <a href="http://docs.mongodb.org/manual/replication/">http://docs.mongodb.org/manual/replication/</a>
*/
@ImplementedBy(MongoDBReplicaSetImpl.class)
-public interface MongoDBReplicaSet extends DynamicCluster, MongoDBAuthenticationMixins {
+public interface MongoDBReplicaSet extends DynamicCluster, MongoDBAuthenticationMixins, HasDatastoreUrl {
@SetFromFlag("replicaSetName")
ConfigKey<String> REPLICA_SET_NAME = ConfigKeys.newStringConfigKey(
@@ -60,6 +61,7 @@ public interface MongoDBReplicaSet extends DynamicCluster, MongoDBAuthentication
@SuppressWarnings("serial")
AttributeSensor<List<String>> REPLICA_SET_ENDPOINTS = Sensors.newSensor(new TypeToken<List<String>>() {},
"mongodb.replicaSet.endpoints", "Endpoints active for this replica set");
+
/**
* The name of the replica set.
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
index f96a56a..c4d675d 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
@@ -54,6 +54,7 @@ import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.api.client.util.Sets;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
@@ -108,7 +109,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
@Override public boolean apply(@Nullable Entity input) {
return input != null
&& input instanceof MongoDBServer
- && ReplicaSetMemberStatus.PRIMARY.equals(input.getAttribute(MongoDBServer.REPLICA_SET_MEMBER_STATUS));
+ && ReplicaSetMemberStatus.PRIMARY.equals(input.sensors().get(MongoDBServer.REPLICA_SET_MEMBER_STATUS));
}
};
@@ -118,7 +119,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
// getSecondaries relies on instanceof check
return input != null
&& input instanceof MongoDBServer
- && ReplicaSetMemberStatus.SECONDARY.equals(input.getAttribute(MongoDBServer.REPLICA_SET_MEMBER_STATUS));
+ && ReplicaSetMemberStatus.SECONDARY.equals(input.sensors().get(MongoDBServer.REPLICA_SET_MEMBER_STATUS));
}
};
@@ -165,7 +166,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
@Override
public String getName() {
// FIXME: Names must be unique if the replica sets are used in a sharded cluster
- return getConfig(REPLICA_SET_NAME) + this.getId();
+ return config().get(REPLICA_SET_NAME) + this.getId();
}
@Override
@@ -197,7 +198,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
*/
private void serverAdded(MongoDBServer server) {
try {
- LOG.debug("Server added: {}. SERVICE_UP: {}", server, server.getAttribute(MongoDBServer.SERVICE_UP));
+ LOG.debug("Server added: {}. SERVICE_UP: {}", server, server.sensors().get(MongoDBServer.SERVICE_UP));
// Set the primary if the replica set hasn't been initialised.
if (mustInitialise.compareAndSet(true, false)) {
@@ -205,8 +206,8 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
LOG.info("First server up in {} is: {}", getName(), server);
boolean replicaSetInitialised = server.initializeReplicaSet(getName(), nextMemberId.getAndIncrement());
if (replicaSetInitialised) {
- setAttribute(PRIMARY_ENTITY, server);
- setAttribute(Startable.SERVICE_UP, true);
+ sensors().set(PRIMARY_ENTITY, server);
+ sensors().set(Startable.SERVICE_UP, true);
} else {
ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(this, "initialization", "replicaset failed to initialize");
ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
@@ -234,7 +235,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
@Override
public void run() {
// SERVICE_UP is not guaranteed when additional members are added to the set.
- Boolean isAvailable = secondary.getAttribute(MongoDBServer.SERVICE_UP);
+ Boolean isAvailable = secondary.sensors().get(MongoDBServer.SERVICE_UP);
MongoDBServer primary = getPrimary();
boolean reschedule;
if (Boolean.TRUE.equals(isAvailable) && primary != null) {
@@ -278,14 +279,14 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
if (LOG.isDebugEnabled())
LOG.debug("Scheduling removal of member from {}: {}", getName(), member);
// FIXME is there a chance of race here?
- if (member.equals(getAttribute(PRIMARY_ENTITY)))
- setAttribute(PRIMARY_ENTITY, null);
+ if (member.equals(sensors().get(PRIMARY_ENTITY)))
+ sensors().set(PRIMARY_ENTITY, null);
executor.submit(new Runnable() {
@Override
public void run() {
// Wait until the server has been stopped before reconfiguring the set. Quoth the MongoDB doc:
// for best results always shut down the mongod instance before removing it from a replica set.
- Boolean isAvailable = member.getAttribute(MongoDBServer.SERVICE_UP);
+ Boolean isAvailable = member.sensors().get(MongoDBServer.SERVICE_UP);
// Wait for the replica set to elect a new primary if the set is reconfiguring itself.
MongoDBServer primary = getPrimary();
boolean reschedule;
@@ -377,11 +378,34 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
return MutableList.copyOf(endpoints);
}})
.build());
+
+ addEnricher(Enrichers.builder()
+ .aggregating(MongoDBServer.MONGO_SERVER_ENDPOINT)
+ .publishing(DATASTORE_URL)
+ .fromMembers()
+ .valueToReportIfNoSensors(null)
+ .computing(new Function<Collection<String>, String>() {
+ @Override
+ public String apply(Collection<String> input) {
+ Set<String> endpoints = Sets.newHashSet();
+ for (String endpoint: input) {
+ if (!Strings.isBlank(endpoint)) {
+
+ endpoints.add(endpoint);
+ }
+ }
+ String credentials = MongoDBAuthenticationUtils.usesAuthentication(MongoDBReplicaSetImpl.this) ?
+ String.format("%s:%s@",
+ config().get(MongoDBAuthenticationMixins.ROOT_USERNAME),
+ config().get(MongoDBAuthenticationMixins.ROOT_PASSWORD)) : "";
+ return String.format("mongodb://%s%s", credentials, Strings.join(endpoints, ","));
+ }})
+ .build());
subscribeToMembers(this, MongoDBServer.IS_PRIMARY_FOR_REPLICA_SET, new SensorEventListener<Boolean>() {
@Override public void onEvent(SensorEvent<Boolean> event) {
if (Boolean.TRUE == event.getValue())
- setAttribute(PRIMARY_ENTITY, (MongoDBServer)event.getSource());
+ sensors().set(PRIMARY_ENTITY, (MongoDBServer)event.getSource());
}
});
@@ -396,7 +420,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
// TODO Note that after this the executor will not run if the set is restarted.
executor.shutdownNow();
super.stop();
- setAttribute(Startable.SERVICE_UP, false);
+ sensors().set(Startable.SERVICE_UP, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
index 941dd8e..040199b 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
@@ -62,13 +62,13 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
super.connectSensors();
connectServiceUpIsRunning();
- int port = getAttribute(MongoDBServer.PORT);
+ int port = sensors().get(MongoDBServer.PORT);
HostAndPort accessibleAddress = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, port);
- setAttribute(MONGO_SERVER_ENDPOINT, String.format("http://%s:%d",
+ sensors().set(MONGO_SERVER_ENDPOINT, String.format("%s:%d",
accessibleAddress.getHostText(), accessibleAddress.getPort()));
- int httpConsolePort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(HTTP_PORT)).getPort();
- setAttribute(HTTP_INTERFACE_URL, String.format("http://%s:%d",
+ int httpConsolePort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, sensors().get(HTTP_PORT)).getPort();
+ sensors().set(HTTP_INTERFACE_URL, String.format("http://%s:%d",
accessibleAddress.getHostText(), httpConsolePort));
try {
@@ -85,7 +85,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
.callable(new Callable<BasicBSONObject>() {
@Override
public BasicBSONObject call() throws Exception {
- return MongoDBServerImpl.this.getAttribute(SERVICE_UP)
+ return MongoDBServerImpl.this.sensors().get(SERVICE_UP)
? client.getServerStatus()
: null;
}
@@ -117,8 +117,8 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
.suppressDuplicates(true))
.build();
} else {
- setAttribute(IS_PRIMARY_FOR_REPLICA_SET, false);
- setAttribute(IS_SECONDARY_FOR_REPLICA_SET, false);
+ sensors().set(IS_PRIMARY_FOR_REPLICA_SET, false);
+ sensors().set(IS_SECONDARY_FOR_REPLICA_SET, false);
}
// Take interesting details from STATUS.
@@ -126,29 +126,29 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
@Override public void onEvent(SensorEvent<BasicBSONObject> event) {
BasicBSONObject map = event.getValue();
if (map != null && !map.isEmpty()) {
- setAttribute(UPTIME_SECONDS, map.getDouble("uptime", 0));
+ sensors().set(UPTIME_SECONDS, map.getDouble("uptime", 0));
// Operations
BasicBSONObject opcounters = (BasicBSONObject) map.get("opcounters");
- setAttribute(OPCOUNTERS_INSERTS, opcounters.getLong("insert", 0));
- setAttribute(OPCOUNTERS_QUERIES, opcounters.getLong("query", 0));
- setAttribute(OPCOUNTERS_UPDATES, opcounters.getLong("update", 0));
- setAttribute(OPCOUNTERS_DELETES, opcounters.getLong("delete", 0));
- setAttribute(OPCOUNTERS_GETMORE, opcounters.getLong("getmore", 0));
- setAttribute(OPCOUNTERS_COMMAND, opcounters.getLong("command", 0));
+ sensors().set(OPCOUNTERS_INSERTS, opcounters.getLong("insert", 0));
+ sensors().set(OPCOUNTERS_QUERIES, opcounters.getLong("query", 0));
+ sensors().set(OPCOUNTERS_UPDATES, opcounters.getLong("update", 0));
+ sensors().set(OPCOUNTERS_DELETES, opcounters.getLong("delete", 0));
+ sensors().set(OPCOUNTERS_GETMORE, opcounters.getLong("getmore", 0));
+ sensors().set(OPCOUNTERS_COMMAND, opcounters.getLong("command", 0));
// Network stats
BasicBSONObject network = (BasicBSONObject) map.get("network");
- setAttribute(NETWORK_BYTES_IN, network.getLong("bytesIn", 0));
- setAttribute(NETWORK_BYTES_OUT, network.getLong("bytesOut", 0));
- setAttribute(NETWORK_NUM_REQUESTS, network.getLong("numRequests", 0));
+ sensors().set(NETWORK_BYTES_IN, network.getLong("bytesIn", 0));
+ sensors().set(NETWORK_BYTES_OUT, network.getLong("bytesOut", 0));
+ sensors().set(NETWORK_NUM_REQUESTS, network.getLong("numRequests", 0));
// Replica set stats
BasicBSONObject repl = (BasicBSONObject) map.get("repl");
if (isReplicaSetMember() && repl != null) {
- setAttribute(IS_PRIMARY_FOR_REPLICA_SET, repl.getBoolean("ismaster"));
- setAttribute(IS_SECONDARY_FOR_REPLICA_SET, repl.getBoolean("secondary"));
- setAttribute(REPLICA_SET_PRIMARY_ENDPOINT, repl.getString("primary"));
+ sensors().set(IS_PRIMARY_FOR_REPLICA_SET, repl.getBoolean("ismaster"));
+ sensors().set(IS_SECONDARY_FOR_REPLICA_SET, repl.getBoolean("secondary"));
+ sensors().set(REPLICA_SET_PRIMARY_ENDPOINT, repl.getString("primary"));
}
}
}
@@ -165,7 +165,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
@Override
public MongoDBReplicaSet getReplicaSet() {
- return getConfig(MongoDBServer.REPLICA_SET);
+ return config().get(MongoDBServer.REPLICA_SET);
}
@Override
@@ -186,7 +186,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
// The ReplicaSet uses REPLICA_SET_MEMBER_STATUS to determine which node to call.
//
// Relying on caller to respect the `false` result, to retry.
- if (!getAttribute(IS_PRIMARY_FOR_REPLICA_SET)) {
+ if (!sensors().get(IS_PRIMARY_FOR_REPLICA_SET)) {
LOG.warn("Attempted to add {} to replica set at server that is not primary: {}", secondary, this);
return false;
}
@@ -195,7 +195,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
@Override
public boolean removeMemberFromReplicaSet(MongoDBServer server) {
- if (!getAttribute(IS_PRIMARY_FOR_REPLICA_SET)) {
+ if (!sensors().get(IS_PRIMARY_FOR_REPLICA_SET)) {
LOG.warn("Attempted to remove {} from replica set at server that is not primary: {}", server, this);
return false;
}
@@ -206,8 +206,8 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
public String toString() {
return Objects.toStringHelper(this)
.add("id", getId())
- .add("hostname", getAttribute(HOSTNAME))
- .add("port", getAttribute(PORT))
+ .add("hostname", sensors().get(HOSTNAME))
+ .add("port", sensors().get(PORT))
.toString();
}
}