You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:47:29 UTC
[37/50] [abbrv] brooklyn-library git commit: adjust synch model for
controller
adjust synch model for controller
should not synch on class as that is used for things like getExecutionContext (and so can deadlock)
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/7742eb74
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/7742eb74
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/7742eb74
Branch: refs/heads/0.7.0-incubating
Commit: 7742eb740825e2119b084067844210a410741de1
Parents: 6d27a23
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Thu Jun 18 08:55:16 2015 -0700
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Jun 24 00:40:32 2015 -0700
----------------------------------------------------------------------
.../entity/proxy/AbstractControllerImpl.java | 141 ++++++++++---------
1 file changed, 76 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/7742eb74/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java
----------------------------------------------------------------------
diff --git a/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java b/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java
index 297364f..a79b98b 100644
--- a/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java
+++ b/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java
@@ -68,7 +68,7 @@ import com.google.common.net.HostAndPort;
public abstract class AbstractControllerImpl extends SoftwareProcessImpl implements AbstractController {
// TODO Should review synchronization model. Currently, all changes to the serverPoolTargets
- // (and checking for potential changes) is done while synchronized on this. That means it
+ // (and checking for potential changes) is done while synchronized on serverPoolAddresses. That means it
// will also call update/reload while holding the lock. This is "conservative", but means
// sub-classes need to be extremely careful about any additional synchronization and of
// their implementations of update/reconfigureService/reload.
@@ -79,7 +79,8 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme
protected volatile boolean updateNeeded = true;
protected AbstractMembershipTrackingPolicy serverPoolMemberTrackerPolicy;
- protected Set<String> serverPoolAddresses = Sets.newLinkedHashSet();
+ // final because this is the synch target
+ final protected Set<String> serverPoolAddresses = Sets.newLinkedHashSet();
protected Map<Entity,String> serverPoolTargets = Maps.newLinkedHashMap();
public AbstractControllerImpl() {
@@ -356,17 +357,19 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme
*/
protected abstract void reconfigureService();
- public synchronized void updateNeeded() {
- if (updateNeeded) return;
- updateNeeded = true;
- LOG.debug("queueing an update-needed task for "+this+"; update will occur shortly");
- Entities.submit(this, Tasks.builder().name("update-needed").body(new Runnable() {
- @Override
- public void run() {
- if (updateNeeded)
- AbstractControllerImpl.this.update();
- }
- }).build());
+ public void updateNeeded() {
+ synchronized (serverPoolAddresses) {
+ if (updateNeeded) return;
+ updateNeeded = true;
+ LOG.debug("queueing an update-needed task for "+this+"; update will occur shortly");
+ Entities.submit(this, Tasks.builder().name("update-needed").body(new Runnable() {
+ @Override
+ public void run() {
+ if (updateNeeded)
+ AbstractControllerImpl.this.update();
+ }
+ }).build());
+ }
}
@Override
@@ -381,30 +384,34 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme
}
}
- public synchronized Task<?> updateAsync() {
- Task<?> result = null;
- if (!isActive()) updateNeeded = true;
- else {
- updateNeeded = false;
- LOG.debug("Updating {} in response to changes", this);
- LOG.info("Updating {}, server pool targets {}", new Object[] {this, getAttribute(SERVER_POOL_TARGETS)});
- reconfigureService();
- LOG.debug("Reloading {} in response to changes", this);
- // reload should happen synchronously
- result = invoke(RELOAD);
+ public Task<?> updateAsync() {
+ synchronized (serverPoolAddresses) {
+ Task<?> result = null;
+ if (!isActive()) updateNeeded = true;
+ else {
+ updateNeeded = false;
+ LOG.debug("Updating {} in response to changes", this);
+ LOG.info("Updating {}, server pool targets {}", new Object[] {this, getAttribute(SERVER_POOL_TARGETS)});
+ reconfigureService();
+ LOG.debug("Reloading {} in response to changes", this);
+ // reload should happen synchronously
+ result = invoke(RELOAD);
+ }
+ return result;
}
- return result;
}
- protected synchronized void onServerPoolMemberChanged(Entity member) {
- if (LOG.isTraceEnabled()) LOG.trace("For {}, considering membership of {} which is in locations {}",
+ protected void onServerPoolMemberChanged(Entity member) {
+ synchronized (serverPoolAddresses) {
+ if (LOG.isTraceEnabled()) LOG.trace("For {}, considering membership of {} which is in locations {}",
new Object[] {this, member, member.getLocations()});
- if (belongsInServerPool(member)) {
- addServerPoolMember(member);
- } else {
- removeServerPoolMember(member);
+ if (belongsInServerPool(member)) {
+ addServerPoolMember(member);
+ } else {
+ removeServerPoolMember(member);
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, member);
}
- if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, member);
}
protected boolean belongsInServerPool(Entity member) {
@@ -420,45 +427,49 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme
return true;
}
- protected synchronized void addServerPoolMember(Entity member) {
- String oldAddress = getAttribute(SERVER_POOL_TARGETS).get(member);
- String newAddress = getAddressOfEntity(member);
- if (Objects.equal(newAddress, oldAddress)) {
- if (LOG.isTraceEnabled())
- if (LOG.isTraceEnabled()) LOG.trace("Ignoring unchanged address {}", oldAddress);
- return;
- } else if (newAddress == null) {
- LOG.info("Removing from {}, member {} with old address {}, because inferred address is now null", new Object[] {this, member, oldAddress});
- } else {
- if (oldAddress != null) {
- LOG.info("Replacing in {}, member {} with old address {}, new address {}", new Object[] {this, member, oldAddress, newAddress});
+ protected void addServerPoolMember(Entity member) {
+ synchronized (serverPoolAddresses) {
+ String oldAddress = getAttribute(SERVER_POOL_TARGETS).get(member);
+ String newAddress = getAddressOfEntity(member);
+ if (Objects.equal(newAddress, oldAddress)) {
+ if (LOG.isTraceEnabled())
+ if (LOG.isTraceEnabled()) LOG.trace("Ignoring unchanged address {}", oldAddress);
+ return;
+ } else if (newAddress == null) {
+ LOG.info("Removing from {}, member {} with old address {}, because inferred address is now null", new Object[] {this, member, oldAddress});
} else {
- LOG.info("Adding to {}, new member {} with address {}", new Object[] {this, member, newAddress});
+ if (oldAddress != null) {
+ LOG.info("Replacing in {}, member {} with old address {}, new address {}", new Object[] {this, member, oldAddress, newAddress});
+ } else {
+ LOG.info("Adding to {}, new member {} with address {}", new Object[] {this, member, newAddress});
+ }
}
+
+ if (Objects.equal(oldAddress, newAddress)) {
+ if (LOG.isTraceEnabled()) LOG.trace("For {}, ignoring change in member {} because address still {}", new Object[] {this, member, newAddress});
+ return;
+ }
+
+ // TODO this does it synchronously; an async method leaning on `updateNeeded` and `update` might
+ // be more appropriate, especially when this is used in a listener
+ MapAttribute.put(this, SERVER_POOL_TARGETS, member, newAddress);
+ updateAsync();
}
-
- if (Objects.equal(oldAddress, newAddress)) {
- if (LOG.isTraceEnabled()) LOG.trace("For {}, ignoring change in member {} because address still {}", new Object[] {this, member, newAddress});
- return;
- }
-
- // TODO this does it synchronously; an async method leaning on `updateNeeded` and `update` might
- // be more appropriate, especially when this is used in a listener
- MapAttribute.put(this, SERVER_POOL_TARGETS, member, newAddress);
- updateAsync();
}
- protected synchronized void removeServerPoolMember(Entity member) {
- if (!getAttribute(SERVER_POOL_TARGETS).containsKey(member)) {
- if (LOG.isTraceEnabled()) LOG.trace("For {}, not removing as don't have member {}", new Object[] {this, member});
- return;
+ protected void removeServerPoolMember(Entity member) {
+ synchronized (serverPoolAddresses) {
+ if (!getAttribute(SERVER_POOL_TARGETS).containsKey(member)) {
+ if (LOG.isTraceEnabled()) LOG.trace("For {}, not removing as don't have member {}", new Object[] {this, member});
+ return;
+ }
+
+ String address = MapAttribute.remove(this, SERVER_POOL_TARGETS, member);
+
+ LOG.info("Removing from {}, member {} with address {}", new Object[] {this, member, address});
+
+ updateAsync();
}
-
- String address = MapAttribute.remove(this, SERVER_POOL_TARGETS, member);
-
- LOG.info("Removing from {}, member {} with address {}", new Object[] {this, member, address});
-
- updateAsync();
}
protected String getAddressOfEntity(Entity member) {