You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by gr...@apache.org on 2014/12/15 17:24:14 UTC

[1/2] incubator-brooklyn git commit: Fix deadlock and rebind issue with Riak cluster

Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 3f645fb91 -> dd716b374


Fix deadlock and rebind issue with Riak cluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/18c298fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/18c298fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/18c298fa

Branch: refs/heads/master
Commit: 18c298fab02fae3c74bb6272e0c309cfee907059
Parents: 3f645fb
Author: Andrew Kennedy <gr...@apache.org>
Authored: Fri Dec 12 15:44:46 2014 +0000
Committer: Andrew Kennedy <gr...@apache.org>
Committed: Mon Dec 15 15:50:12 2014 +0000

----------------------------------------------------------------------
 .../entity/nosql/couchbase/CouchbaseNode.java   |   3 +-
 .../brooklyn/entity/nosql/riak/RiakCluster.java |   6 +-
 .../entity/nosql/riak/RiakClusterImpl.java      | 140 +++++++++----------
 3 files changed, 75 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/18c298fa/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
index 01b9562..f09114d 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
@@ -31,7 +31,6 @@ import brooklyn.entity.basic.MethodEffector;
 import brooklyn.entity.basic.SoftwareProcess;
 import brooklyn.entity.effector.Effectors;
 import brooklyn.entity.proxying.ImplementedBy;
-import brooklyn.entity.webapp.WebAppServiceConstants;
 import brooklyn.event.AttributeSensor;
 import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
 import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
@@ -82,7 +81,7 @@ public interface CouchbaseNode extends SoftwareProcess {
 
     AttributeSensor<Boolean> IS_PRIMARY_NODE = Sensors.newBooleanSensor("couchbase.isPrimaryNode", "flag to determine if the current couchbase node is the primary node for the cluster");
     AttributeSensor<Boolean> IS_IN_CLUSTER = Sensors.newBooleanSensor("couchbase.isInCluster", "flag to determine if the current couchbase node has been added to a cluster");
-    public static final AttributeSensor<URI> COUCHBASE_WEB_ADMIN_URL = Attributes.MAIN_URI;
+    AttributeSensor<URI> COUCHBASE_WEB_ADMIN_URL = Attributes.MAIN_URI;
     
     // Interesting stats
     AttributeSensor<Double> OPS = Sensors.newDoubleSensor("couchbase.stats.ops", 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/18c298fa/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
index 72b464e..b5543ee 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
@@ -46,7 +46,9 @@ public interface RiakCluster extends DynamicCluster {
     @SetFromFlag("delayBeforeAdvertisingCluster")
     ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "riak.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.seconds(2 * 60));
 
-    AttributeSensor<Boolean> IS_CLUSTER_INIT = Sensors.newBooleanSensor("riak.cluster.isClusterInit", "flag to determine if the cluster was already initialized");
-    
+    AttributeSensor<Boolean> IS_CLUSTER_INIT = Sensors.newBooleanSensor("riak.cluster.isClusterInit", "Flag to determine if the cluster was already initialized");
+
+    AttributeSensor<Boolean> IS_FIRST_NODE_SET = Sensors.newBooleanSensor("riak.cluster.isFirstNodeSet", "Flag to determine if the first node has been set");
+
     AttributeSensor<String> NODE_LIST = Sensors.newStringSensor("riak.cluster.nodeList", "List of nodes (including ports), comma separated");
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/18c298fa/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
index f16932b..ea1f1b9 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -21,8 +21,8 @@ package brooklyn.entity.nosql.riak;
 import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nullable;
 
@@ -44,16 +44,20 @@ import brooklyn.location.Location;
 import brooklyn.policy.PolicySpec;
 import brooklyn.util.time.Time;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
+
     private static final Logger log = LoggerFactory.getLogger(RiakClusterImpl.class);
-    private AtomicBoolean isFirstNodeSet = new AtomicBoolean();
+
+    private transient Object mutex = new Object[0];
 
     public void init() {
         super.init();
@@ -99,97 +103,93 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
                 .configure("group", this));
     }
 
-    protected synchronized void onServerPoolMemberChanged(Entity member) {
-        if (log.isTraceEnabled()) log.trace("For {}, considering membership of {} which is in locations {}",
-                new Object[]{this, member, member.getLocations()});
+    protected void onServerPoolMemberChanged(Entity member) {
+        synchronized (mutex) {
+            log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{ this, member, member.getLocations() });
 
-        Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
-        if (belongsInServerPool(member)) {
-            // TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there?
-            // TODO and can we do join as part of node starting?
+            Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
+            if (belongsInServerPool(member)) {
+                // TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there?
+                // TODO and can we do join as part of node starting?
 
-            if (nodes == null) {
-                nodes = Maps.newLinkedHashMap();
-            }
-            
-            String riakName = getRiakName(member);
-            Preconditions.checkNotNull(riakName);
-
-            // flag a first node to be the first node in the riak cluster.
-            if (!isFirstNodeSet.getAndSet(true)) {
-                nodes.put(member, riakName);
-                setAttribute(RIAK_CLUSTER_NODES, nodes);
+                if (nodes == null) {
+                    nodes = Maps.newLinkedHashMap();
+                }
+                String riakName = getRiakName(member);
+                Preconditions.checkNotNull(riakName);
 
-                ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);
+                // flag a first node to be the first node in the riak cluster.
+                Boolean firstNode = getAttribute(IS_FIRST_NODE_SET);
+                if (!Boolean.TRUE.equals(firstNode)) {
+                    setAttribute(IS_FIRST_NODE_SET, Boolean.TRUE);
 
-                log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
-            } else {
-                // TODO: be wary of erroneous nodes but are still flagged 'in cluster'
-                // add the new node to be part of the riak cluster.
-                Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
-                    @Override
-                    public boolean apply(@Nullable Entity node) {
-                        return (node instanceof RiakNode && hasMemberJoinedCluster(node));
-                    }
-                });
+                    nodes.put(member, riakName);
+                    setAttribute(RIAK_CLUSTER_NODES, nodes);
 
-                if (anyNodeInCluster.isPresent()) {
-                    if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) {
+                    ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);
 
-                        String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
-                        Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName);
-                        if (getAttribute(IS_CLUSTER_INIT)) {
-                            Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER);
+                    log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
+                } else {
+                    // TODO: be wary of erroneous nodes but are still flagged 'in cluster'
+                    // add the new node to be part of the riak cluster.
+                    Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
+                        @Override
+                        public boolean apply(@Nullable Entity node) {
+                            return (node instanceof RiakNode && hasMemberJoinedCluster(node));
                         }
-                        nodes.put(member, riakName);
-                        setAttribute(RIAK_CLUSTER_NODES, nodes);
-                        log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
+                    });
+
+                    if (anyNodeInCluster.isPresent()) {
+                        if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) {
+                            String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
+                            Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName);
+                            if (getAttribute(IS_CLUSTER_INIT)) {
+                                Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER);
+                            }
+                            nodes.put(member, riakName);
+                            setAttribute(RIAK_CLUSTER_NODES, nodes);
+                            log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
+                        }
+                    } else {
+                        log.error("isFirstNodeSet , but no cluster members found to add {}", member.getId());
                     }
-                } else {
-                    log.error("isFirstNodeSet , but no cluster members found to add {}", member.getId());
                 }
-            }
-        } else {
-            if (nodes != null && nodes.containsKey(member)) {
-                final Entity memberToBeRemoved = member;
-
-                Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
+            } else {
+                if (nodes != null && nodes.containsKey(member)) {
+                    final Entity memberToBeRemoved = member;
 
-                    @Override
-                    public boolean apply(@Nullable Entity node) {
-                        return (node instanceof RiakNode && hasMemberJoinedCluster(node) && !node.equals(memberToBeRemoved));
+                    Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
+                        @Override
+                        public boolean apply(@Nullable Entity node) {
+                            return (node instanceof RiakNode && hasMemberJoinedCluster(node) && !node.equals(memberToBeRemoved));
+                        }
+                    });
+                    if (anyNodeInCluster.isPresent()) {
+                        Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(memberToBeRemoved));
                     }
-                });
-                if (anyNodeInCluster.isPresent()) {
-                    Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(memberToBeRemoved));
+
+                    nodes.remove(member);
+                    setAttribute(RIAK_CLUSTER_NODES, nodes);
+                    log.info("Removing riak node {}: {}; {} from cluster", new Object[]{this, member, getRiakName(member)});
                 }
+            }
 
-                nodes.remove(member);
-                setAttribute(RIAK_CLUSTER_NODES, nodes);
-                log.info("Removing riak node {}: {}; {} from cluster", new Object[]{this, member, getRiakName(member)});
+            ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES);
+            if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member);
 
-            }
+            calculateClusterAddresses();
         }
-        
-        ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES);
-        if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member);
-        
-        calculateClusterAddresses();
     }
 
     private void calculateClusterAddresses() {
-        String addresses = "";
+        List<String> addresses = Lists.newArrayList();
         for (Entity entity : this.getMembers()) {
             if (entity instanceof RiakNode && entity.getAttribute(Attributes.SERVICE_UP)) {
                 RiakNode riakNode = (RiakNode) entity;
-                addresses += riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute(RiakNode.RIAK_WEB_PORT) + ",";
+                addresses.add(riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute(RiakNode.RIAK_WEB_PORT));
             }
         }
-        if (addresses.length() > 0) {
-            setAttribute(RiakCluster.NODE_LIST, addresses.substring(0, addresses.length() -1));
-        } else {
-            setAttribute(RiakCluster.NODE_LIST, null);
-        }
+        setAttribute(RiakCluster.NODE_LIST, Joiner.on(",").join(addresses));
     }
 
     protected boolean belongsInServerPool(Entity member) {


[2/2] incubator-brooklyn git commit: This closes #393

Posted by gr...@apache.org.
This closes #393

* github/pr/393:
  Fix deadlock and rebind issue with Riak cluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/dd716b37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/dd716b37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/dd716b37

Branch: refs/heads/master
Commit: dd716b374090e0f1b7e574f8fa4df0aa3d554592
Parents: 3f645fb 18c298f
Author: Andrew Kennedy <gr...@apache.org>
Authored: Mon Dec 15 16:24:10 2014 +0000
Committer: Andrew Kennedy <gr...@apache.org>
Committed: Mon Dec 15 16:24:10 2014 +0000

----------------------------------------------------------------------
 .../entity/nosql/couchbase/CouchbaseNode.java   |   3 +-
 .../brooklyn/entity/nosql/riak/RiakCluster.java |   6 +-
 .../entity/nosql/riak/RiakClusterImpl.java      | 140 +++++++++----------
 3 files changed, 75 insertions(+), 74 deletions(-)
----------------------------------------------------------------------