You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:47:29 UTC

[37/50] [abbrv] brooklyn-library git commit: adjust synch model for controller

adjust synch model for controller

should not synch on class as that is used for things like getExecutionContext (and so can deadlock)


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/7742eb74
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/7742eb74
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/7742eb74

Branch: refs/heads/0.7.0-incubating
Commit: 7742eb740825e2119b084067844210a410741de1
Parents: 6d27a23
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Thu Jun 18 08:55:16 2015 -0700
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Jun 24 00:40:32 2015 -0700

----------------------------------------------------------------------
 .../entity/proxy/AbstractControllerImpl.java    | 141 ++++++++++---------
 1 file changed, 76 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/7742eb74/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java
----------------------------------------------------------------------
diff --git a/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java b/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java
index 297364f..a79b98b 100644
--- a/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java
+++ b/software/webapp/src/main/java/brooklyn/entity/proxy/AbstractControllerImpl.java
@@ -68,7 +68,7 @@ import com.google.common.net.HostAndPort;
 public abstract class AbstractControllerImpl extends SoftwareProcessImpl implements AbstractController {
     
     // TODO Should review synchronization model. Currently, all changes to the serverPoolTargets
-    // (and checking for potential changes) is done while synchronized on this. That means it 
+    // (and checking for potential changes) is done while synchronized on serverPoolAddresses. That means it 
     // will also call update/reload while holding the lock. This is "conservative", but means
     // sub-classes need to be extremely careful about any additional synchronization and of
     // their implementations of update/reconfigureService/reload.
@@ -79,7 +79,8 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme
     protected volatile boolean updateNeeded = true;
 
     protected AbstractMembershipTrackingPolicy serverPoolMemberTrackerPolicy;
-    protected Set<String> serverPoolAddresses = Sets.newLinkedHashSet();
+    // final because this is the synch target
+    final protected Set<String> serverPoolAddresses = Sets.newLinkedHashSet();
     protected Map<Entity,String> serverPoolTargets = Maps.newLinkedHashMap();
     
     public AbstractControllerImpl() {
@@ -356,17 +357,19 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme
      */
     protected abstract void reconfigureService();
     
-    public synchronized void updateNeeded() {
-        if (updateNeeded) return;
-        updateNeeded = true;
-        LOG.debug("queueing an update-needed task for "+this+"; update will occur shortly");
-        Entities.submit(this, Tasks.builder().name("update-needed").body(new Runnable() {
-            @Override
-            public void run() {
-                if (updateNeeded)
-                    AbstractControllerImpl.this.update();
-            } 
-        }).build());
+    public void updateNeeded() {
+        synchronized (serverPoolAddresses) {
+            if (updateNeeded) return;
+            updateNeeded = true;
+            LOG.debug("queueing an update-needed task for "+this+"; update will occur shortly");
+            Entities.submit(this, Tasks.builder().name("update-needed").body(new Runnable() {
+                @Override
+                public void run() {
+                    if (updateNeeded)
+                        AbstractControllerImpl.this.update();
+                } 
+            }).build());
+        }
     }
     
     @Override
@@ -381,30 +384,34 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme
         }
     }
     
-    public synchronized Task<?> updateAsync() {
-        Task<?> result = null;
-        if (!isActive()) updateNeeded = true;
-        else {
-            updateNeeded = false;
-            LOG.debug("Updating {} in response to changes", this);
-            LOG.info("Updating {}, server pool targets {}", new Object[] {this, getAttribute(SERVER_POOL_TARGETS)});
-            reconfigureService();
-            LOG.debug("Reloading {} in response to changes", this);
-            // reload should happen synchronously
-            result = invoke(RELOAD);
+    public Task<?> updateAsync() {
+        synchronized (serverPoolAddresses) {
+            Task<?> result = null;
+            if (!isActive()) updateNeeded = true;
+            else {
+                updateNeeded = false;
+                LOG.debug("Updating {} in response to changes", this);
+                LOG.info("Updating {}, server pool targets {}", new Object[] {this, getAttribute(SERVER_POOL_TARGETS)});
+                reconfigureService();
+                LOG.debug("Reloading {} in response to changes", this);
+                // reload should happen synchronously
+                result = invoke(RELOAD);
+            }
+            return result;
         }
-        return result;
     }
 
-    protected synchronized void onServerPoolMemberChanged(Entity member) {
-        if (LOG.isTraceEnabled()) LOG.trace("For {}, considering membership of {} which is in locations {}", 
+    protected void onServerPoolMemberChanged(Entity member) {
+        synchronized (serverPoolAddresses) {
+            if (LOG.isTraceEnabled()) LOG.trace("For {}, considering membership of {} which is in locations {}", 
                 new Object[] {this, member, member.getLocations()});
-        if (belongsInServerPool(member)) {
-            addServerPoolMember(member);
-        } else {
-            removeServerPoolMember(member);
+            if (belongsInServerPool(member)) {
+                addServerPoolMember(member);
+            } else {
+                removeServerPoolMember(member);
+            }
+            if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, member);
         }
-        if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, member);
     }
     
     protected boolean belongsInServerPool(Entity member) {
@@ -420,45 +427,49 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme
         return true;
     }
     
-    protected synchronized void addServerPoolMember(Entity member) {
-        String oldAddress = getAttribute(SERVER_POOL_TARGETS).get(member);
-        String newAddress = getAddressOfEntity(member);
-        if (Objects.equal(newAddress, oldAddress)) {
-            if (LOG.isTraceEnabled())
-                if (LOG.isTraceEnabled()) LOG.trace("Ignoring unchanged address {}", oldAddress);
-            return;
-        } else if (newAddress == null) {
-            LOG.info("Removing from {}, member {} with old address {}, because inferred address is now null", new Object[] {this, member, oldAddress});
-        } else {
-            if (oldAddress != null) {
-                LOG.info("Replacing in {}, member {} with old address {}, new address {}", new Object[] {this, member, oldAddress, newAddress});
+    protected void addServerPoolMember(Entity member) {
+        synchronized (serverPoolAddresses) {
+            String oldAddress = getAttribute(SERVER_POOL_TARGETS).get(member);
+            String newAddress = getAddressOfEntity(member);
+            if (Objects.equal(newAddress, oldAddress)) {
+                if (LOG.isTraceEnabled())
+                    if (LOG.isTraceEnabled()) LOG.trace("Ignoring unchanged address {}", oldAddress);
+                return;
+            } else if (newAddress == null) {
+                LOG.info("Removing from {}, member {} with old address {}, because inferred address is now null", new Object[] {this, member, oldAddress});
             } else {
-                LOG.info("Adding to {}, new member {} with address {}", new Object[] {this, member, newAddress});
+                if (oldAddress != null) {
+                    LOG.info("Replacing in {}, member {} with old address {}, new address {}", new Object[] {this, member, oldAddress, newAddress});
+                } else {
+                    LOG.info("Adding to {}, new member {} with address {}", new Object[] {this, member, newAddress});
+                }
             }
+
+            if (Objects.equal(oldAddress, newAddress)) {
+                if (LOG.isTraceEnabled()) LOG.trace("For {}, ignoring change in member {} because address still {}", new Object[] {this, member, newAddress});
+                return;
+            }
+
+            // TODO this does it synchronously; an async method leaning on `updateNeeded` and `update` might
+            // be more appropriate, especially when this is used in a listener
+            MapAttribute.put(this, SERVER_POOL_TARGETS, member, newAddress);
+            updateAsync();
         }
-        
-        if (Objects.equal(oldAddress, newAddress)) {
-            if (LOG.isTraceEnabled()) LOG.trace("For {}, ignoring change in member {} because address still {}", new Object[] {this, member, newAddress});
-            return;
-        }
-        
-        // TODO this does it synchronously; an async method leaning on `updateNeeded` and `update` might
-        // be more appropriate, especially when this is used in a listener
-        MapAttribute.put(this, SERVER_POOL_TARGETS, member, newAddress);
-        updateAsync();
     }
     
-    protected synchronized void removeServerPoolMember(Entity member) {
-        if (!getAttribute(SERVER_POOL_TARGETS).containsKey(member)) {
-            if (LOG.isTraceEnabled()) LOG.trace("For {}, not removing as don't have member {}", new Object[] {this, member});
-            return;
+    protected void removeServerPoolMember(Entity member) {
+        synchronized (serverPoolAddresses) {
+            if (!getAttribute(SERVER_POOL_TARGETS).containsKey(member)) {
+                if (LOG.isTraceEnabled()) LOG.trace("For {}, not removing as don't have member {}", new Object[] {this, member});
+                return;
+            }
+
+            String address = MapAttribute.remove(this, SERVER_POOL_TARGETS, member);
+
+            LOG.info("Removing from {}, member {} with address {}", new Object[] {this, member, address});
+
+            updateAsync();
         }
-        
-        String address = MapAttribute.remove(this, SERVER_POOL_TARGETS, member);
-        
-        LOG.info("Removing from {}, member {} with address {}", new Object[] {this, member, address});
-        
-        updateAsync();
     }
     
     protected String getAddressOfEntity(Entity member) {