You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2015/09/17 15:27:55 UTC

[3/7] incubator-brooklyn git commit: makes connection string available on MongoDbReplicaSet

makes connection string available on MongoDbReplicaSet


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/16b8b1e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/16b8b1e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/16b8b1e3

Branch: refs/heads/master
Commit: 16b8b1e393d525c88ba5b540ad880dab899888d6
Parents: b3ce256
Author: Robert Moss <ro...@gmail.com>
Authored: Tue Sep 15 15:06:43 2015 +0100
Committer: Robert Moss <ro...@gmail.com>
Committed: Wed Sep 16 15:35:28 2015 +0100

----------------------------------------------------------------------
 .../nosql/mongodb/AbstractMongoDBSshDriver.java |  8 ++--
 .../entity/nosql/mongodb/MongoDBReplicaSet.java |  4 +-
 .../nosql/mongodb/MongoDBReplicaSetImpl.java    | 48 ++++++++++++++-----
 .../entity/nosql/mongodb/MongoDBServerImpl.java | 50 ++++++++++----------
 4 files changed, 68 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java
index c182355..14c495e 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java
@@ -27,15 +27,15 @@ import org.apache.brooklyn.api.location.OsDetails;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver;
 import org.apache.brooklyn.entity.software.base.lifecycle.ScriptHelper;
-import org.apache.brooklyn.util.core.internal.ssh.SshTool;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.core.internal.ssh.SshTool;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.net.Networking;
 import org.apache.brooklyn.util.os.Os;
 import org.apache.brooklyn.util.ssh.BashCommands;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Strings;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
index 12bbe87..6ebc17e 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java
@@ -26,6 +26,7 @@ import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.entity.database.DatastoreMixins.HasDatastoreUrl;
 import org.apache.brooklyn.entity.group.Cluster;
 import org.apache.brooklyn.entity.group.DynamicCluster;
 import org.apache.brooklyn.util.core.flags.SetFromFlag;
@@ -46,7 +47,7 @@ import com.google.common.reflect.TypeToken;
  * @see <a href="http://docs.mongodb.org/manual/replication/">http://docs.mongodb.org/manual/replication/</a>
  */
 @ImplementedBy(MongoDBReplicaSetImpl.class)
-public interface MongoDBReplicaSet extends DynamicCluster, MongoDBAuthenticationMixins {
+public interface MongoDBReplicaSet extends DynamicCluster, MongoDBAuthenticationMixins, HasDatastoreUrl {
 
     @SetFromFlag("replicaSetName")
     ConfigKey<String> REPLICA_SET_NAME = ConfigKeys.newStringConfigKey(
@@ -60,6 +61,7 @@ public interface MongoDBReplicaSet extends DynamicCluster, MongoDBAuthentication
     @SuppressWarnings("serial")
     AttributeSensor<List<String>> REPLICA_SET_ENDPOINTS = Sensors.newSensor(new TypeToken<List<String>>() {}, 
         "mongodb.replicaSet.endpoints", "Endpoints active for this replica set");
+    
 
     /**
      * The name of the replica set.

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
index f96a56a..c4d675d 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
@@ -54,6 +54,7 @@ import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.api.client.util.Sets;
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
@@ -108,7 +109,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
         @Override public boolean apply(@Nullable Entity input) {
             return input != null
                     && input instanceof MongoDBServer
-                    && ReplicaSetMemberStatus.PRIMARY.equals(input.getAttribute(MongoDBServer.REPLICA_SET_MEMBER_STATUS));
+                    && ReplicaSetMemberStatus.PRIMARY.equals(input.sensors().get(MongoDBServer.REPLICA_SET_MEMBER_STATUS));
         }
     };
 
@@ -118,7 +119,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
             // getSecondaries relies on instanceof check
             return input != null
                     && input instanceof MongoDBServer
-                    && ReplicaSetMemberStatus.SECONDARY.equals(input.getAttribute(MongoDBServer.REPLICA_SET_MEMBER_STATUS));
+                    && ReplicaSetMemberStatus.SECONDARY.equals(input.sensors().get(MongoDBServer.REPLICA_SET_MEMBER_STATUS));
         }
     };
 
@@ -165,7 +166,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
     @Override
     public String getName() {
         // FIXME: Names must be unique if the replica sets are used in a sharded cluster
-        return getConfig(REPLICA_SET_NAME) + this.getId();
+        return config().get(REPLICA_SET_NAME) + this.getId();
     }
 
     @Override
@@ -197,7 +198,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
      */
     private void serverAdded(MongoDBServer server) {
         try {
-            LOG.debug("Server added: {}. SERVICE_UP: {}", server, server.getAttribute(MongoDBServer.SERVICE_UP));
+            LOG.debug("Server added: {}. SERVICE_UP: {}", server, server.sensors().get(MongoDBServer.SERVICE_UP));
 
             // Set the primary if the replica set hasn't been initialised.
             if (mustInitialise.compareAndSet(true, false)) {
@@ -205,8 +206,8 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
                     LOG.info("First server up in {} is: {}", getName(), server);
                 boolean replicaSetInitialised = server.initializeReplicaSet(getName(), nextMemberId.getAndIncrement());
                 if (replicaSetInitialised) {
-                    setAttribute(PRIMARY_ENTITY, server);
-                    setAttribute(Startable.SERVICE_UP, true);
+                    sensors().set(PRIMARY_ENTITY, server);
+                    sensors().set(Startable.SERVICE_UP, true);
                 } else {
                     ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(this, "initialization", "replicaset failed to initialize");
                     ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
@@ -234,7 +235,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
             @Override
             public void run() {
                 // SERVICE_UP is not guaranteed when additional members are added to the set.
-                Boolean isAvailable = secondary.getAttribute(MongoDBServer.SERVICE_UP);
+                Boolean isAvailable = secondary.sensors().get(MongoDBServer.SERVICE_UP);
                 MongoDBServer primary = getPrimary();
                 boolean reschedule;
                 if (Boolean.TRUE.equals(isAvailable) && primary != null) {
@@ -278,14 +279,14 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
             if (LOG.isDebugEnabled())
                 LOG.debug("Scheduling removal of member from {}: {}", getName(), member);
             // FIXME is there a chance of race here?
-            if (member.equals(getAttribute(PRIMARY_ENTITY)))
-                setAttribute(PRIMARY_ENTITY, null);
+            if (member.equals(sensors().get(PRIMARY_ENTITY)))
+                sensors().set(PRIMARY_ENTITY, null);
             executor.submit(new Runnable() {
                 @Override
                 public void run() {
                     // Wait until the server has been stopped before reconfiguring the set. Quoth the MongoDB doc:
                     // for best results always shut down the mongod instance before removing it from a replica set.
-                    Boolean isAvailable = member.getAttribute(MongoDBServer.SERVICE_UP);
+                    Boolean isAvailable = member.sensors().get(MongoDBServer.SERVICE_UP);
                     // Wait for the replica set to elect a new primary if the set is reconfiguring itself.
                     MongoDBServer primary = getPrimary();
                     boolean reschedule;
@@ -377,11 +378,34 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
                             return MutableList.copyOf(endpoints);
                         }})
                 .build());
+        
+        addEnricher(Enrichers.builder()
+                .aggregating(MongoDBServer.MONGO_SERVER_ENDPOINT)
+                .publishing(DATASTORE_URL)
+                .fromMembers()
+                .valueToReportIfNoSensors(null)
+                .computing(new Function<Collection<String>, String>() {
+                        @Override
+                        public String apply(Collection<String> input) {
+                            Set<String> endpoints = Sets.newHashSet();
+                            for (String endpoint: input) {
+                                if (!Strings.isBlank(endpoint)) {
+                                    
+                                    endpoints.add(endpoint);
+                                }
+                            }
+                            String credentials = MongoDBAuthenticationUtils.usesAuthentication(MongoDBReplicaSetImpl.this) ? 
+                                    String.format("%s:%s@", 
+                                            config().get(MongoDBAuthenticationMixins.ROOT_USERNAME), 
+                                            config().get(MongoDBAuthenticationMixins.ROOT_PASSWORD)) : "";
+                            return String.format("mongodb://%s%s", credentials, Strings.join(endpoints, ","));
+                        }})
+                .build());
 
         subscribeToMembers(this, MongoDBServer.IS_PRIMARY_FOR_REPLICA_SET, new SensorEventListener<Boolean>() {
             @Override public void onEvent(SensorEvent<Boolean> event) {
                 if (Boolean.TRUE == event.getValue())
-                    setAttribute(PRIMARY_ENTITY, (MongoDBServer)event.getSource());
+                    sensors().set(PRIMARY_ENTITY, (MongoDBServer)event.getSource());
             }
         });
 
@@ -396,7 +420,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
         // TODO Note that after this the executor will not run if the set is restarted.
         executor.shutdownNow();
         super.stop();
-        setAttribute(Startable.SERVICE_UP, false);
+        sensors().set(Startable.SERVICE_UP, false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
index 941dd8e..040199b 100644
--- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java
@@ -62,13 +62,13 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
         super.connectSensors();
         connectServiceUpIsRunning();
 
-        int port = getAttribute(MongoDBServer.PORT);
+        int port = sensors().get(MongoDBServer.PORT);
         HostAndPort accessibleAddress = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, port);
-        setAttribute(MONGO_SERVER_ENDPOINT, String.format("http://%s:%d",
+        sensors().set(MONGO_SERVER_ENDPOINT, String.format("%s:%d",
                 accessibleAddress.getHostText(), accessibleAddress.getPort()));
 
-        int httpConsolePort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(HTTP_PORT)).getPort();
-        setAttribute(HTTP_INTERFACE_URL, String.format("http://%s:%d",
+        int httpConsolePort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, sensors().get(HTTP_PORT)).getPort();
+        sensors().set(HTTP_INTERFACE_URL, String.format("http://%s:%d",
                 accessibleAddress.getHostText(), httpConsolePort));
 
         try {
@@ -85,7 +85,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
                         .callable(new Callable<BasicBSONObject>() {
                             @Override
                             public BasicBSONObject call() throws Exception {
-                                return MongoDBServerImpl.this.getAttribute(SERVICE_UP)
+                                return MongoDBServerImpl.this.sensors().get(SERVICE_UP)
                                     ? client.getServerStatus()
                                     : null;
                             }
@@ -117,8 +117,8 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
                             .suppressDuplicates(true))
                     .build();
         } else {
-            setAttribute(IS_PRIMARY_FOR_REPLICA_SET, false);
-            setAttribute(IS_SECONDARY_FOR_REPLICA_SET, false);
+            sensors().set(IS_PRIMARY_FOR_REPLICA_SET, false);
+            sensors().set(IS_SECONDARY_FOR_REPLICA_SET, false);
         }
 
         // Take interesting details from STATUS.
@@ -126,29 +126,29 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
                 @Override public void onEvent(SensorEvent<BasicBSONObject> event) {
                     BasicBSONObject map = event.getValue();
                     if (map != null && !map.isEmpty()) {
-                        setAttribute(UPTIME_SECONDS, map.getDouble("uptime", 0));
+                        sensors().set(UPTIME_SECONDS, map.getDouble("uptime", 0));
 
                         // Operations
                         BasicBSONObject opcounters = (BasicBSONObject) map.get("opcounters");
-                        setAttribute(OPCOUNTERS_INSERTS, opcounters.getLong("insert", 0));
-                        setAttribute(OPCOUNTERS_QUERIES, opcounters.getLong("query", 0));
-                        setAttribute(OPCOUNTERS_UPDATES, opcounters.getLong("update", 0));
-                        setAttribute(OPCOUNTERS_DELETES, opcounters.getLong("delete", 0));
-                        setAttribute(OPCOUNTERS_GETMORE, opcounters.getLong("getmore", 0));
-                        setAttribute(OPCOUNTERS_COMMAND, opcounters.getLong("command", 0));
+                        sensors().set(OPCOUNTERS_INSERTS, opcounters.getLong("insert", 0));
+                        sensors().set(OPCOUNTERS_QUERIES, opcounters.getLong("query", 0));
+                        sensors().set(OPCOUNTERS_UPDATES, opcounters.getLong("update", 0));
+                        sensors().set(OPCOUNTERS_DELETES, opcounters.getLong("delete", 0));
+                        sensors().set(OPCOUNTERS_GETMORE, opcounters.getLong("getmore", 0));
+                        sensors().set(OPCOUNTERS_COMMAND, opcounters.getLong("command", 0));
 
                         // Network stats
                         BasicBSONObject network = (BasicBSONObject) map.get("network");
-                        setAttribute(NETWORK_BYTES_IN, network.getLong("bytesIn", 0));
-                        setAttribute(NETWORK_BYTES_OUT, network.getLong("bytesOut", 0));
-                        setAttribute(NETWORK_NUM_REQUESTS, network.getLong("numRequests", 0));
+                        sensors().set(NETWORK_BYTES_IN, network.getLong("bytesIn", 0));
+                        sensors().set(NETWORK_BYTES_OUT, network.getLong("bytesOut", 0));
+                        sensors().set(NETWORK_NUM_REQUESTS, network.getLong("numRequests", 0));
 
                         // Replica set stats
                         BasicBSONObject repl = (BasicBSONObject) map.get("repl");
                         if (isReplicaSetMember() && repl != null) {
-                            setAttribute(IS_PRIMARY_FOR_REPLICA_SET, repl.getBoolean("ismaster"));
-                            setAttribute(IS_SECONDARY_FOR_REPLICA_SET, repl.getBoolean("secondary"));
-                            setAttribute(REPLICA_SET_PRIMARY_ENDPOINT, repl.getString("primary"));
+                            sensors().set(IS_PRIMARY_FOR_REPLICA_SET, repl.getBoolean("ismaster"));
+                            sensors().set(IS_SECONDARY_FOR_REPLICA_SET, repl.getBoolean("secondary"));
+                            sensors().set(REPLICA_SET_PRIMARY_ENDPOINT, repl.getString("primary"));
                         }
                     }
                 }
@@ -165,7 +165,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
 
     @Override
     public MongoDBReplicaSet getReplicaSet() {
-        return getConfig(MongoDBServer.REPLICA_SET);
+        return config().get(MongoDBServer.REPLICA_SET);
     }
 
     @Override
@@ -186,7 +186,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
         // The ReplicaSet uses REPLICA_SET_MEMBER_STATUS to determine which node to call.
         // 
         // Relying on caller to respect the `false` result, to retry.
-        if (!getAttribute(IS_PRIMARY_FOR_REPLICA_SET)) {
+        if (!sensors().get(IS_PRIMARY_FOR_REPLICA_SET)) {
             LOG.warn("Attempted to add {} to replica set at server that is not primary: {}", secondary, this);
             return false;
         }
@@ -195,7 +195,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
 
     @Override
     public boolean removeMemberFromReplicaSet(MongoDBServer server) {
-        if (!getAttribute(IS_PRIMARY_FOR_REPLICA_SET)) {
+        if (!sensors().get(IS_PRIMARY_FOR_REPLICA_SET)) {
             LOG.warn("Attempted to remove {} from replica set at server that is not primary: {}", server, this);
             return false;
         }
@@ -206,8 +206,8 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer
     public String toString() {
         return Objects.toStringHelper(this)
                 .add("id", getId())
-                .add("hostname", getAttribute(HOSTNAME))
-                .add("port", getAttribute(PORT))
+                .add("hostname", sensors().get(HOSTNAME))
+                .add("port", sensors().get(PORT))
                 .toString();
     }
 }