You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by gr...@apache.org on 2014/12/15 17:24:14 UTC
[1/2] incubator-brooklyn git commit: Fix deadlock and rebind issue
with Riak cluster
Repository: incubator-brooklyn
Updated Branches:
refs/heads/master 3f645fb91 -> dd716b374
Fix deadlock and rebind issue with Riak cluster
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/18c298fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/18c298fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/18c298fa
Branch: refs/heads/master
Commit: 18c298fab02fae3c74bb6272e0c309cfee907059
Parents: 3f645fb
Author: Andrew Kennedy <gr...@apache.org>
Authored: Fri Dec 12 15:44:46 2014 +0000
Committer: Andrew Kennedy <gr...@apache.org>
Committed: Mon Dec 15 15:50:12 2014 +0000
----------------------------------------------------------------------
.../entity/nosql/couchbase/CouchbaseNode.java | 3 +-
.../brooklyn/entity/nosql/riak/RiakCluster.java | 6 +-
.../entity/nosql/riak/RiakClusterImpl.java | 140 +++++++++----------
3 files changed, 75 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/18c298fa/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
index 01b9562..f09114d 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java
@@ -31,7 +31,6 @@ import brooklyn.entity.basic.MethodEffector;
import brooklyn.entity.basic.SoftwareProcess;
import brooklyn.entity.effector.Effectors;
import brooklyn.entity.proxying.ImplementedBy;
-import brooklyn.entity.webapp.WebAppServiceConstants;
import brooklyn.event.AttributeSensor;
import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
@@ -82,7 +81,7 @@ public interface CouchbaseNode extends SoftwareProcess {
AttributeSensor<Boolean> IS_PRIMARY_NODE = Sensors.newBooleanSensor("couchbase.isPrimaryNode", "flag to determine if the current couchbase node is the primary node for the cluster");
AttributeSensor<Boolean> IS_IN_CLUSTER = Sensors.newBooleanSensor("couchbase.isInCluster", "flag to determine if the current couchbase node has been added to a cluster");
- public static final AttributeSensor<URI> COUCHBASE_WEB_ADMIN_URL = Attributes.MAIN_URI;
+ AttributeSensor<URI> COUCHBASE_WEB_ADMIN_URL = Attributes.MAIN_URI;
// Interesting stats
AttributeSensor<Double> OPS = Sensors.newDoubleSensor("couchbase.stats.ops",
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/18c298fa/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
index 72b464e..b5543ee 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
@@ -46,7 +46,9 @@ public interface RiakCluster extends DynamicCluster {
@SetFromFlag("delayBeforeAdvertisingCluster")
ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "riak.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.seconds(2 * 60));
- AttributeSensor<Boolean> IS_CLUSTER_INIT = Sensors.newBooleanSensor("riak.cluster.isClusterInit", "flag to determine if the cluster was already initialized");
-
+ AttributeSensor<Boolean> IS_CLUSTER_INIT = Sensors.newBooleanSensor("riak.cluster.isClusterInit", "Flag to determine if the cluster was already initialized");
+
+ AttributeSensor<Boolean> IS_FIRST_NODE_SET = Sensors.newBooleanSensor("riak.cluster.isFirstNodeSet", "Flag to determine if the first node has been set");
+
AttributeSensor<String> NODE_LIST = Sensors.newStringSensor("riak.cluster.nodeList", "List of nodes (including ports), comma separated");
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/18c298fa/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
index f16932b..ea1f1b9 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -21,8 +21,8 @@ package brooklyn.entity.nosql.riak;
import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
@@ -44,16 +44,20 @@ import brooklyn.location.Location;
import brooklyn.policy.PolicySpec;
import brooklyn.util.time.Time;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
+
private static final Logger log = LoggerFactory.getLogger(RiakClusterImpl.class);
- private AtomicBoolean isFirstNodeSet = new AtomicBoolean();
+
+ private transient Object mutex = new Object[0];
public void init() {
super.init();
@@ -99,97 +103,93 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
.configure("group", this));
}
- protected synchronized void onServerPoolMemberChanged(Entity member) {
- if (log.isTraceEnabled()) log.trace("For {}, considering membership of {} which is in locations {}",
- new Object[]{this, member, member.getLocations()});
+ protected void onServerPoolMemberChanged(Entity member) {
+ synchronized (mutex) {
+ log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{ this, member, member.getLocations() });
- Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
- if (belongsInServerPool(member)) {
- // TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there?
- // TODO and can we do join as part of node starting?
+ Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
+ if (belongsInServerPool(member)) {
+ // TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there?
+ // TODO and can we do join as part of node starting?
- if (nodes == null) {
- nodes = Maps.newLinkedHashMap();
- }
-
- String riakName = getRiakName(member);
- Preconditions.checkNotNull(riakName);
-
- // flag a first node to be the first node in the riak cluster.
- if (!isFirstNodeSet.getAndSet(true)) {
- nodes.put(member, riakName);
- setAttribute(RIAK_CLUSTER_NODES, nodes);
+ if (nodes == null) {
+ nodes = Maps.newLinkedHashMap();
+ }
+ String riakName = getRiakName(member);
+ Preconditions.checkNotNull(riakName);
- ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);
+ // flag a first node to be the first node in the riak cluster.
+ Boolean firstNode = getAttribute(IS_FIRST_NODE_SET);
+ if (!Boolean.TRUE.equals(firstNode)) {
+ setAttribute(IS_FIRST_NODE_SET, Boolean.TRUE);
- log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
- } else {
- // TODO: be wary of erroneous nodes but are still flagged 'in cluster'
- // add the new node to be part of the riak cluster.
- Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
- @Override
- public boolean apply(@Nullable Entity node) {
- return (node instanceof RiakNode && hasMemberJoinedCluster(node));
- }
- });
+ nodes.put(member, riakName);
+ setAttribute(RIAK_CLUSTER_NODES, nodes);
- if (anyNodeInCluster.isPresent()) {
- if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) {
+ ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);
- String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
- Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName);
- if (getAttribute(IS_CLUSTER_INIT)) {
- Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER);
+ log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
+ } else {
+ // TODO: be wary of erroneous nodes but are still flagged 'in cluster'
+ // add the new node to be part of the riak cluster.
+ Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
+ @Override
+ public boolean apply(@Nullable Entity node) {
+ return (node instanceof RiakNode && hasMemberJoinedCluster(node));
}
- nodes.put(member, riakName);
- setAttribute(RIAK_CLUSTER_NODES, nodes);
- log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
+ });
+
+ if (anyNodeInCluster.isPresent()) {
+ if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) {
+ String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
+ Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName);
+ if (getAttribute(IS_CLUSTER_INIT)) {
+ Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER);
+ }
+ nodes.put(member, riakName);
+ setAttribute(RIAK_CLUSTER_NODES, nodes);
+ log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
+ }
+ } else {
+ log.error("isFirstNodeSet , but no cluster members found to add {}", member.getId());
}
- } else {
- log.error("isFirstNodeSet , but no cluster members found to add {}", member.getId());
}
- }
- } else {
- if (nodes != null && nodes.containsKey(member)) {
- final Entity memberToBeRemoved = member;
-
- Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
+ } else {
+ if (nodes != null && nodes.containsKey(member)) {
+ final Entity memberToBeRemoved = member;
- @Override
- public boolean apply(@Nullable Entity node) {
- return (node instanceof RiakNode && hasMemberJoinedCluster(node) && !node.equals(memberToBeRemoved));
+ Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() {
+ @Override
+ public boolean apply(@Nullable Entity node) {
+ return (node instanceof RiakNode && hasMemberJoinedCluster(node) && !node.equals(memberToBeRemoved));
+ }
+ });
+ if (anyNodeInCluster.isPresent()) {
+ Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(memberToBeRemoved));
}
- });
- if (anyNodeInCluster.isPresent()) {
- Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(memberToBeRemoved));
+
+ nodes.remove(member);
+ setAttribute(RIAK_CLUSTER_NODES, nodes);
+ log.info("Removing riak node {}: {}; {} from cluster", new Object[]{this, member, getRiakName(member)});
}
+ }
- nodes.remove(member);
- setAttribute(RIAK_CLUSTER_NODES, nodes);
- log.info("Removing riak node {}: {}; {} from cluster", new Object[]{this, member, getRiakName(member)});
+ ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES);
+ if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member);
- }
+ calculateClusterAddresses();
}
-
- ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES);
- if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member);
-
- calculateClusterAddresses();
}
private void calculateClusterAddresses() {
- String addresses = "";
+ List<String> addresses = Lists.newArrayList();
for (Entity entity : this.getMembers()) {
if (entity instanceof RiakNode && entity.getAttribute(Attributes.SERVICE_UP)) {
RiakNode riakNode = (RiakNode) entity;
- addresses += riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute(RiakNode.RIAK_WEB_PORT) + ",";
+ addresses.add(riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute(RiakNode.RIAK_WEB_PORT));
}
}
- if (addresses.length() > 0) {
- setAttribute(RiakCluster.NODE_LIST, addresses.substring(0, addresses.length() -1));
- } else {
- setAttribute(RiakCluster.NODE_LIST, null);
- }
+ setAttribute(RiakCluster.NODE_LIST, Joiner.on(",").join(addresses));
}
protected boolean belongsInServerPool(Entity member) {
[2/2] incubator-brooklyn git commit: This closes #393
Posted by gr...@apache.org.
This closes #393
* github/pr/393:
Fix deadlock and rebind issue with Riak cluster
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/dd716b37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/dd716b37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/dd716b37
Branch: refs/heads/master
Commit: dd716b374090e0f1b7e574f8fa4df0aa3d554592
Parents: 3f645fb 18c298f
Author: Andrew Kennedy <gr...@apache.org>
Authored: Mon Dec 15 16:24:10 2014 +0000
Committer: Andrew Kennedy <gr...@apache.org>
Committed: Mon Dec 15 16:24:10 2014 +0000
----------------------------------------------------------------------
.../entity/nosql/couchbase/CouchbaseNode.java | 3 +-
.../brooklyn/entity/nosql/riak/RiakCluster.java | 6 +-
.../entity/nosql/riak/RiakClusterImpl.java | 140 +++++++++----------
3 files changed, 75 insertions(+), 74 deletions(-)
----------------------------------------------------------------------