You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/06 08:06:25 UTC

[01/23] brooklyn-server git commit: task optimization: some queued-or-submitted tasks use foreground for executing

Repository: brooklyn-server
Updated Branches:
  refs/heads/master 54f9c708a -> a73ee1728


task optimization: some queued-or-submitted tasks use foreground for executing


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4430f769
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4430f769
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4430f769

Branch: refs/heads/master
Commit: 4430f769077210bf253a8d70a69482c1c2119d39
Parents: 7f4d7bd
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Sep 15 11:07:27 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 17:10:54 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/util/core/task/DynamicTasks.java       | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4430f769/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
index 15b062a..5426baf 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
@@ -107,7 +107,7 @@ public class DynamicTasks {
             this.execContext = ((EntityInternal)entity).getExecutionContext();
             return this;
         }
-        private boolean orSubmitInternal() {
+        private boolean orSubmitInternal(boolean samethread) {
             if (!wasQueued()) {
                 if (isQueuedOrSubmitted()) {
                     log.warn("Redundant call to execute "+getTask()+"; skipping");
@@ -118,7 +118,8 @@ public class DynamicTasks {
                         ec = BasicExecutionContext.getCurrentExecutionContext();
                     if (ec==null)
                         throw new IllegalStateException("Cannot execute "+getTask()+" without an execution context; ensure caller is in an ExecutionContext");
-                    ec.submit(getTask());
+                    if (samethread) ec.get(getTask());
+                    else ec.submit(getTask());
                     return true;
                 }
             } else {
@@ -128,7 +129,7 @@ public class DynamicTasks {
         /** causes the task to be submitted (asynchronously) if it hasn't already been,
          * requiring an entity execution context (will try to find a default if not set) */
         public TaskQueueingResult<T> orSubmitAsync() {
-            orSubmitInternal();
+            orSubmitInternal(false);
             return this;
         }
         /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */
@@ -141,7 +142,7 @@ public class DynamicTasks {
          * (which assumes all commands complete immediately);
          * requiring an entity execution context (will try to find a default if not set) */
         public TaskQueueingResult<T> orSubmitAndBlock() {
-            if (orSubmitInternal()) task.getUnchecked();
+            orSubmitInternal(true);
             return this;
         }
         /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */
@@ -288,12 +289,9 @@ public class DynamicTasks {
         return task;
     }
     
-    /** submits/queues the given task if needed, and gets the result (unchecked) 
-     * only permitted in a queueing context (ie a DST main job) if the task is not yet submitted */
-    // things get really confusing if you try to queueInTaskHierarchy -- easy to cause deadlocks!
+    /** submits/queues the given task if needed, and gets the result (unchecked) */
     public static <T> T get(TaskAdaptable<T> t) {
-        // TODO do in foreground?
-        return queueIfNeeded(t).asTask().getUnchecked();
+        return queueIfPossible(t).orSubmitAndBlock().andWaitForSuccess();
     }
 
     /** As {@link #drain(Duration, boolean)} waiting forever and throwing the first error 


[07/23] brooklyn-server git commit: fix deadlock in initial publication of sensors on setting up a subscription

Posted by he...@apache.org.
fix deadlock in initial publication of sensors on setting up a subscription


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/0a1acecb
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/0a1acecb
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/0a1acecb

Branch: refs/heads/master
Commit: 0a1acecbe7b70ca752e0636605b34317f24a5a8e
Parents: 84d24d1
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 19 13:09:59 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Sep 19 13:16:36 2017 +0100

----------------------------------------------------------------------
 .../mgmt/internal/LocalSubscriptionManager.java  | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0a1acecb/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 983b307..a9fb70b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -42,6 +42,7 @@ import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.sensor.BasicSensorEvent;
 import org.apache.brooklyn.util.collections.MutableList;
@@ -260,6 +261,8 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
             "displayName", name.toString(),
             "description", description.toString());
         
+        boolean isEntityStarting = s.subscriber instanceof Entity && isInitial;
+        // will have entity (and adjunct) execution context from tags, so can skip getting exec context
         em.submit(execFlags, new Runnable() {
             @Override
             public String toString() {
@@ -272,6 +275,22 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
             @Override
             public void run() {
                 try {
+                    if (isEntityStarting) {
+                        /* don't let sub deliveries start until this is completed;
+                         * this is a pragmatic way to ensure the publish events 
+                         * if submitted during management starting, aren't executed
+                         * until after management is starting.
+                         *   without this we can get deadlocks as this goes to publish,
+                         * has the attribute sensors lock, and waits on the publish lock
+                         * (any of management support, local subs, queueing subs).
+                         * meanwhile the management startup has those three locks,
+                         * then goes to publish and in the process looks up a sensor value.
+                         *   usually this is not an issue because some other task
+                         * does something (eg entity.getExecutionContext()) which
+                         * also has a wait-on-management-support semantics.
+                         */
+                        synchronized (((EntityInternal)s.subscriber).getManagementSupport()) {}
+                    }
                     int count = s.eventCount.incrementAndGet();
                     if (count > 0 && count % 1000 == 0) LOG.debug("{} events for subscriber {}", count, s);
                     


[09/23] brooklyn-server git commit: task GC and visibility: tidy GC code, don't delete some things eg subscriptions quite so aggressively

Posted by he...@apache.org.
task GC and visibility: tidy GC code, don't delete some things eg subscriptions quite so aggressively


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/aeecd3e9
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/aeecd3e9
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/aeecd3e9

Branch: refs/heads/master
Commit: aeecd3e90e049064e7c4cdfdaf9b40404032815a
Parents: b0556de
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Sep 20 09:09:08 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Sep 20 10:10:25 2017 +0100

----------------------------------------------------------------------
 .../mgmt/internal/BrooklynGarbageCollector.java | 93 +++++++++++++-------
 .../util/core/task/BasicExecutionManager.java   |  2 +
 2 files changed, 65 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/aeecd3e9/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
index 7995768..e0e6dec 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
@@ -305,27 +305,20 @@ public class BrooklynGarbageCollector {
         if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG))
             return false;
         
-        if (task.getSubmittedByTask()!=null) {
-            Task<?> parent = task.getSubmittedByTask();
-            if (executionManager.getTask(parent.getId())==null) {
-                // parent is already cleaned up
-                return true;
-            }
-            if (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)) {
-                // it is a child, let the parent manage this task's death
-                return false;
-            }
-            Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task);
-            if (associatedEntity!=null) {
-                // this is associated to an entity; destroy only if the entity is unmanaged
-                return !Entities.isManaged(associatedEntity);
-            }
-            // if not associated to an entity, then delete immediately
-            return true;
+        if (!isSubmitterExpired(task)) {
+            return false;
+        }
+        if (isChild(task)) {
+            // parent should manage this task's death; but above already kicks in if parent is not expired, so probably shouldn't come here?
+            LOG.warn("Unexpected expiry candidacy for "+task);
+            return false;
+        }
+        if (isAssociatedToActiveEntity(task)) {
+            return false;
         }
         
         // e.g. scheduled tasks, sensor events, etc
-        // TODO (in future may keep some of these with another limit, based on a new TagCategory)
+        // (in future may keep some of these with another limit, based on a new TagCategory)
         // there may also be a server association for server-side tasks which should be kept
         // (but be careful not to keep too many subscriptions!)
         
@@ -337,7 +330,7 @@ public class BrooklynGarbageCollector {
      * {@link #maxTasksPerTag} and {@link #maxTaskAge}.
      */
     protected synchronized int gcTasks() {
-        // TODO Must be careful with memory usage here: have seen OOME if we get crazy lots of tasks.
+        // NB: be careful with memory usage here: have seen OOME if we get crazy lots of tasks.
         // hopefully the use new limits, filters, and use of live lists in some places (added Sep 2014) will help.
         // 
         // An option is for getTasksWithTag(tag) to return an ArrayList rather than a LinkedHashSet. That
@@ -397,11 +390,21 @@ public class BrooklynGarbageCollector {
         int deletedCount = 0;
         deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false);
         deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true);
-        deletedCount += expireSubTasksWhoseSubmitterIsExpired();
         
-        int deletedGlobally = expireIfOverCapacityGlobally();
-        deletedCount += deletedGlobally;
-        if (deletedGlobally>0) deletedCount += expireSubTasksWhoseSubmitterIsExpired();
+        // if expensive we could optimize task GC here to avoid repeated lookups by
+        // counting all expired above (not just prev two lines) and skipping if none
+        // but that seems unlikely
+        int deletedHere = 0;
+        while ((deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()) > 0) {
+            // delete in loop so we don't have descendants sticking around until deleted in later cycles
+            deletedCount += deletedHere; 
+        }
+        
+        deletedHere = expireIfOverCapacityGlobally();
+        deletedCount += deletedHere;
+        while (deletedHere > 0) {
+            deletedCount += (deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()); 
+        }
         
         return deletedCount;
     }
@@ -471,7 +474,9 @@ public class BrooklynGarbageCollector {
         }
     }
     
-    protected int expireSubTasksWhoseSubmitterIsExpired() {
+    protected int expireHistoricTasksNowReadyForImmediateDeletion() {
+        // find tasks which weren't ready for immediate deletion, but which now are 
+        
         // ideally we wouldn't have this; see comments on CHECK_SUBTASK_SUBMITTERS
         if (!brooklynProperties.getConfig(CHECK_SUBTASK_SUBMITTERS))
             return 0;
@@ -480,13 +485,15 @@ public class BrooklynGarbageCollector {
         Collection<Task<?>> tasksToDelete = MutableList.of();
         try {
             for (Task<?> task: allTasks) {
-                if (!task.isDone()) continue;
-                Task<?> submitter = task.getSubmittedByTask();
-                // if we've leaked, ie a subtask which is not a child task, 
-                // and the submitter is GC'd, then delete this also
-                if (submitter!=null && submitter.isDone() && executionManager.getTask(submitter.getId())==null) {
-                    tasksToDelete.add(task);
+                if (!shouldDeleteTaskImmediately(task)) {
+                    // 2017-09 previously we only checked done and submitter expired, and deleted if both were true
+                    // so could pick up even things that were non_transient -- now much stricter
+                    continue;
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("Deleting task which really is no longer wanted: "+task+" (submitted by "+task.getSubmittedByTask()+")");
                 }
+                
+                tasksToDelete.add(task);
             }
             
         } catch (ConcurrentModificationException e) {
@@ -500,6 +507,32 @@ public class BrooklynGarbageCollector {
         return tasksToDelete.size();
     }
     
+    private boolean isAssociatedToActiveEntity(Task<?> task) {
+        Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task);
+        if (associatedEntity==null) {
+            return false;
+        }
+        // this is associated to an entity; destroy only if the entity is unmanaged
+        return Entities.isManaged(associatedEntity);
+    }
+    
+    private boolean isChild(Task<?> task) {
+        Task<?> parent = task.getSubmittedByTask();
+        return (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task));
+    }
+    
+    private boolean isSubmitterExpired(Task<?> task) {
+        if (Strings.isBlank(task.getSubmittedByTaskId())) {
+            return false;
+        }
+        Task<?> submitter = task.getSubmittedByTask();
+        if (submitter!=null && (!submitter.isDone() || executionManager.getTask(submitter.getId())!=null)) {
+            return false;
+        }
+        // submitter task is GC'd
+        return true;
+    }
+
     protected enum TagCategory { 
         ENTITY, NON_ENTITY_NORMAL;
         

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/aeecd3e9/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 4fcfadb..46e501e8 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -57,6 +57,7 @@ import org.apache.brooklyn.core.config.Sanitizer;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
@@ -796,6 +797,7 @@ public class BasicExecutionManager implements ExecutionManager {
                 .description("Details of the original task have been forgotten.")
                 .body(Callables.returning((T)null)).build();
             ((BasicTask<T>)t).ignoreIfNotRun();
+            ((BasicTask<T>)t).cancelled = true;
             return t;
         }
     }


[18/23] brooklyn-server git commit: switch queue-or-submit-blocking-then-get invocations to new simpler DynamicTasks.get. but note some things fail getImmediately if they are running a queueing context eg EffectorSayHiTest, until fixed in the next PR.

Posted by he...@apache.org.
switch queue-or-submit-blocking-then-get invocations to new simpler DynamicTasks.get.
but note some things fail getImmediately if they are running a queueing context eg EffectorSayHiTest,
until fixed in the next PR.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/98888708
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/98888708
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/98888708

Branch: refs/heads/master
Commit: 98888708957d0cea0f6bf219c5ea30b243094138
Parents: 0c2e1f6
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Oct 2 15:43:02 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:18 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/ExecutionContext.java     | 16 +++---
 .../core/entity/trait/StartableMethods.java     |  6 +--
 .../core/location/BasicMachineDetails.java      |  5 +-
 .../location/access/BrooklynAccessUtils.java    |  2 +-
 .../core/objs/proxy/EntityProxyImpl.java        |  2 +-
 .../entity/group/DynamicClusterImpl.java        |  7 +--
 .../brooklyn/util/core/task/DynamicTasks.java   | 53 +++++++++++++++-----
 .../apache/brooklyn/core/feed/PollerTest.java   |  2 +-
 .../core/mgmt/rebind/RebindManagerTest.java     | 13 +----
 .../entity/group/DynamicClusterTest.java        |  8 +--
 .../SameServerDriverLifecycleEffectorTasks.java |  5 +-
 11 files changed, 63 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index 3ef3470..2d8fe23 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -101,16 +101,18 @@ public interface ExecutionContext extends Executor {
     <T> Maybe<T> getImmediately(Task<T> callableOrSupplierOrTask);
 
     /**
-     * Efficient shortcut for {@link #submit(TaskAdaptable)} followed by an immediate {@link Task#get()}.
+     * Efficient implementation of common case when {@link #submit(TaskAdaptable)} is followed by an immediate {@link Task#get()}.
      * <p>
-     * Implementations will typically attempt to execute in the current thread, with appropriate
-     * configuration to make it look like it is in a sub-thread, 
-     * ie registering this as a task and allowing
-     * context methods on tasks to see the given sub-task.
+     * This is efficient in that it may typically attempt to execute in the current thread, 
+     * with appropriate configuration to make it look like it is in a sub-thread, 
+     * ie registering this as a task and allowing context methods on tasks to see the given sub-task.
+     * However it will normally be non-blocking which reduces overhead and 
+     * is permissible within a {@link #getImmediately(Object)} task
      * <p>
-     * If the argument has already been submitted it simply blocks on it.
+     * If the argument has already been submitted it simply blocks on it 
+     * (i.e. no additional execution, and in that case would fail within a {@link #getImmediately(Object)}).
      * 
-     * @param task
+     * @param task the task whose result is being sought
      * @return result of the task execution
      */
     @Beta

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java b/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
index 1883166..30b2348 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java
@@ -49,20 +49,20 @@ public class StartableMethods {
     /** Common implementation for start in parent nodes; just invokes start on all children of the entity */
     public static void start(Entity e, Collection<? extends Location> locations) {
         log.debug("Starting entity "+e+" at "+locations);
-        DynamicTasks.queueIfPossible(startingChildren(e, locations)).orSubmitAsync(e).getTask().getUnchecked();
+        DynamicTasks.get(startingChildren(e, locations), e);
     }
     
     /** Common implementation for stop in parent nodes; just invokes stop on all children of the entity */
     public static void stop(Entity e) {
         log.debug("Stopping entity "+e);
-        DynamicTasks.queueIfPossible(stoppingChildren(e)).orSubmitAsync(e).getTask().getUnchecked();
+        DynamicTasks.get(stoppingChildren(e), e);
         if (log.isDebugEnabled()) log.debug("Stopped entity "+e);
     }
 
     /** Common implementation for restart in parent nodes; just invokes restart on all children of the entity */
     public static void restart(Entity e) {
         log.debug("Restarting entity "+e);
-        DynamicTasks.queueIfPossible(restartingChildren(e)).orSubmitAsync(e).getTask().getUnchecked();
+        DynamicTasks.get(restartingChildren(e), e);
         if (log.isDebugEnabled()) log.debug("Restarted entity "+e);
     }
     

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java b/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
index f8a7cce..7906616 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java
@@ -96,10 +96,7 @@ public class BasicMachineDetails implements MachineDetails {
      */
     @Beta
     public static BasicMachineDetails forSshMachineLocationLive(SshMachineLocation location) {
-        return TaskTags.markInessential(DynamicTasks.queueIfPossible(taskForSshMachineLocation(location))
-                .orSubmitAsync()
-                .asTask())
-                .getUnchecked();
+        return DynamicTasks.get(TaskTags.markInessential(taskForSshMachineLocation(location)));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java b/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
index 40f71e2..35bc2f8 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java
@@ -119,7 +119,7 @@ public class BrooklynAccessUtils {
     public static String getResolvedAddress(Entity entity, SshMachineLocation origin, String hostnameTarget) {
         ProcessTaskWrapper<Integer> task = SshTasks.newSshExecTaskFactory(origin, "ping -c 1 -t 1 "+hostnameTarget)
             .summary("checking resolution of "+hostnameTarget).allowingNonZeroExitCode().newTask();
-        DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).asTask().blockUntilEnded();
+        DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).getTask().blockUntilEnded();
         if (task.asTask().isError()) {
             log.warn("ping could not be run, at "+entity+" / "+origin+": "+Tasks.getError(task.asTask()));
             return "";

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
index daaf18b..4b6e704 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java
@@ -209,7 +209,7 @@ public class EntityProxyImpl implements java.lang.reflect.InvocationHandler {
                     TaskAdaptable<?> task = ((EffectorWithBody)eff).getBody().newTask(delegate, eff, ConfigBag.newInstance(parameters));
                     // as per LocalManagementContext.runAtEntity(Entity entity, TaskAdaptable<T> task) 
                     TaskTags.markInessential(task);
-                    result = DynamicTasks.queueIfPossible(task.asTask()).orSubmitAsync(delegate).andWaitForSuccess();
+                    result = DynamicTasks.get(task.asTask(), delegate);
                 } else {
                     result = m.invoke(delegate, nonNullArgs);
                 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 23b48f0..b9a9041 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -827,10 +827,8 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
         for (Entity member : removedStartables) {
             tasks.add(newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap()));
         }
-        Task<?> invoke = Tasks.parallel(tasks.build());
-        DynamicTasks.queueIfPossible(invoke).orSubmitAsync();
         try {
-            invoke.get();
+            DynamicTasks.get( Tasks.parallel(tasks.build()) );
             return removedEntities;
         } catch (Exception e) {
             throw Exceptions.propagate(e);
@@ -1075,8 +1073,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
         try {
             if (member instanceof Startable) {
                 Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String, Object>emptyMap());
-                DynamicTasks.queueIfPossible(task).orSubmitAsync();
-                task.getUnchecked();
+                DynamicTasks.get(task);
             }
         } finally {
             Entities.unmanage(member);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
index 5426baf..20b80ab 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
@@ -41,6 +41,16 @@ import com.google.common.collect.Iterables;
 
 /** 
  * Contains static methods which detect and use the current {@link TaskQueueingContext} to execute tasks.
+ * <p>
+ * Queueing is supported by some task contexts (eg {@link DynamicSequentialTask}) to let that task
+ * build up a complex sequence of tasks and logic. This utility class gives conveniences to allow:
+ * <p>
+ * <li> "queue-if-possible-else-submit-async", so that it is backgrounded, using queueing semantics if available;
+ * <li> "queue-if-possible-else-submit-blocking", so that it is in the queue if there is one, else it will complete synchronously;
+ * <li> "queue-if-possible-else-submit-and-in-both-cases-block", so that it is returned immediately, but waits in its queue if there is one.
+ * <p>
+ * Over time the last mode has been the most prevalent and {@link #get(TaskAdaptable)} is introduced here
+ * as a convenience.  If a timeout is desired then the first should be used.
  * 
  * @since 0.6.0
  */
@@ -126,36 +136,50 @@ public class DynamicTasks {
                 return false;
             }
         }
-        /** causes the task to be submitted (asynchronously) if it hasn't already been,
-         * requiring an entity execution context (will try to find a default if not set) */
+        /** Causes the task to be submitted (asynchronously) if it hasn't already been,
+         * such as if a previous {@link DynamicTasks#queueIfPossible(TaskAdaptable)} did not have a queueing context.
+         * <p>
+         * An {@link #executionContext(ExecutionContext)} should typically have been set
+         * (or use {@link #orSubmitAsync(Entity)}).
+         */
         public TaskQueueingResult<T> orSubmitAsync() {
             orSubmitInternal(false);
             return this;
         }
-        /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */
+        /** Convenience for setting {@link #executionContext(Entity)} then {@link #orSubmitAsync()}. */
         public TaskQueueingResult<T> orSubmitAsync(Entity entity) {
             executionContext(entity);
             return orSubmitAsync();
         }
-        /** causes the task to be submitted *synchronously* if it hasn't already been submitted;
-         * useful in contexts such as libraries where callers may be either on a legacy call path 
-         * (which assumes all commands complete immediately);
-         * requiring an entity execution context (will try to find a default if not set) */
+        /** Alternative to {@link #orSubmitAsync()} but where, if the submission is needed
+         * (usually because a previous {@link DynamicTasks#queueIfPossible(TaskAdaptable)} did not have a queueing context)
+         * it will wait until execution completes (and in fact will execute the task in this thread,
+         * as per {@link ExecutionContext#get(TaskAdaptable)}. 
+         * <p>
+         * If the task is already queued, this method does nothing, not even blocks,
+         * to permit cases where a caller is building up a set of tasks to be executed sequentially:
+         * with a queueing context the caller can line them all up, but without that the caller needs this task
+         * finished before submitting subsequent tasks. 
+         * <p>
+         * If blocking is desired in all cases and this call should fail on task failure, invoke {@link #andWaitForSuccess()} on the result,
+         * or consider using {@link DynamicTasks#get(TaskAdaptable)} instead of this method,
+         * or {@link DynamicTasks#get(TaskAdaptable, Entity)} if an execuiton context a la {@link #orSubmitAndBlock(Entity)} is needed. */
         public TaskQueueingResult<T> orSubmitAndBlock() {
             orSubmitInternal(true);
             return this;
         }
-        /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */
+        /** Variant of {@link #orSubmitAndBlock()} doing what {@link #orSubmitAsync(Entity)} does for {@link #orSubmitAsync()}. */
         public TaskQueueingResult<T> orSubmitAndBlock(Entity entity) {
             executionContext(entity);
             return orSubmitAndBlock();
         }
-        /** blocks for the task to be completed
+        /** Blocks for the task to be completed, throwing if there are any errors
+         * and otherwise returning the value.
          * <p>
-         * needed in any context where subsequent commands assume the task has completed.
+         * In addition to cases where a result is wanted, this is needed in any context where subsequent commands assume the task has completed.
          * not needed in a context where the task is simply being built up and queued.
          * <p>
-         * throws if there are any errors
+         * 
          */
         public T andWaitForSuccess() {
             return task.getUnchecked();
@@ -170,7 +194,7 @@ public class DynamicTasks {
     /**
      * Tries to add the task to the current addition context if there is one, otherwise does nothing.
      * <p/>
-     * Call {@link TaskQueueingResult#orSubmitAsync() orSubmitAsync()} on the returned
+     * Call {@link TaskQueueingResult#orSubmitAsync()} on the returned
      * {@link TaskQueueingResult TaskQueueingResult} to handle execution of tasks in a
      * {@link BasicExecutionContext}.
      */
@@ -332,6 +356,11 @@ public class DynamicTasks {
     public static <T> Task<T> submit(TaskAdaptable<T> task, Entity entity) {
         return queueIfPossible(task).orSubmitAsync(entity).asTask();
     }
+    
+    /** queues the task if possible and waits for the result, otherwise executes synchronously as per {@link ExecutionContext#get(TaskAdaptable)} */
+    public static <T> T get(TaskAdaptable<T> task, Entity e) {
+        return queueIfPossible(task).orSubmitAndBlock(e).andWaitForSuccess();
+    }
 
     /** Breaks the parent-child relation between Tasks.current() and the task passed,
      *  making the new task a top-level one at the target entity.

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
index 2251153..f270e54 100644
--- a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java
@@ -133,7 +133,7 @@ public class PollerTest extends BrooklynAppUnitTestSupport {
                         }
                     })
                     .build();
-            return DynamicTasks.queueIfPossible(t).orSubmitAsync().asTask().getUnchecked();
+            return DynamicTasks.get(t);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
index 45589b1..25eb00f 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java
@@ -20,11 +20,8 @@ package org.apache.brooklyn.core.mgmt.rebind;
 
 import static org.testng.Assert.assertEquals;
 
-import java.util.concurrent.Callable;
-
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.core.test.entity.TestEntityImpl;
 import org.apache.brooklyn.util.core.task.BasicTask;
@@ -48,15 +45,7 @@ public class RebindManagerTest extends RebindTestFixtureWithApp {
         @Override
         public void rebind() {
             super.rebind();
-            Task<String> task = new BasicTask<String>(new Callable<String>() {
-                @Override public String call() {
-                    return "abc";
-                }});
-            String val = DynamicTasks.queueIfPossible(task)
-                    .orSubmitAsync()
-                    .asTask()
-                    .getUnchecked();
-            sensors().set(TestEntity.NAME, val);
+            sensors().set(TestEntity.NAME, DynamicTasks.get(new BasicTask<String>(() -> "abc")));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index 80f1be6..0827664 100644
--- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -1325,12 +1325,8 @@ public class DynamicClusterTest extends AbstractDynamicClusterOrFabricTest {
                 .body(new Callable<Boolean>() {
                     @Override
                     public Boolean call() throws Exception {
-                        Task<Entity> first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST);
-                        DynamicTasks.queueIfPossible(first).orSubmitAsync();
-                        final Entity source = first.get();
-                        final Task<Boolean> booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP);
-                        DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
-                        return booleanTask.get();
+                        final Entity source = DynamicTasks.get( DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST) );
+                        return DynamicTasks.get( DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP) );
                     }
                 })
                 .build();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/98888708/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
index faed3db..52ce82f 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java
@@ -111,10 +111,7 @@ public class SameServerDriverLifecycleEffectorTasks extends MachineLifecycleEffe
 
     @Override
     protected String startProcessesAtMachine(Supplier<MachineLocation> machineS) {
-        DynamicTasks.queueIfPossible(StartableMethods.startingChildren(entity(), machineS.get()))
-                .orSubmitAsync(entity())
-                .getTask()
-                .getUnchecked();
+        DynamicTasks.get(StartableMethods.startingChildren(entity(), machineS.get()), entity());
         DynamicTasks.waitForLast();
         return "children started";
     }


[14/23] brooklyn-server git commit: deprecated since is now 0.13.0 not 0.12.0

Posted by he...@apache.org.
deprecated since is now 0.13.0 not 0.12.0


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/6dfe4987
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/6dfe4987
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/6dfe4987

Branch: refs/heads/master
Commit: 6dfe49875e5e06396a7ff23a2e90cff5741e830a
Parents: 80446ab
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Sep 25 10:41:37 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 25 10:41:37 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java | 4 ++--
 .../main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java | 4 ++--
 .../java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java     | 4 ++--
 .../org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java     | 2 +-
 .../apache/brooklyn/util/core/task/BasicExecutionContext.java    | 2 +-
 5 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/6dfe4987/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index c940ca0..3bf7c73 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -53,12 +53,12 @@ public interface ExecutionContext extends Executor {
     <T> Task<T> submit(Map<?,?> properties, Callable<T> callable);
 
     /** {@link ExecutionManager#submit(Runnable) 
-     * @deprecated since 0.12.0 pass a display name or a more detailed map */
+     * @deprecated since 0.13.0 pass a display name or a more detailed map */
     @Deprecated
     Task<?> submit(Runnable runnable);
  
     /** {@link ExecutionManager#submit(Callable)
-     * @deprecated since 0.12.0 pass a display name or a more detailed map */
+     * @deprecated since 0.13.0 pass a display name or a more detailed map */
     @Deprecated
     <T> Task<T> submit(Callable<T> callable);
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/6dfe4987/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
index d4d1db4..347a127 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
@@ -80,12 +80,12 @@ public interface ExecutionManager {
 //    public Set<Task<?>> getAllTasks();
 
     /** see {@link #submit(Map, TaskAdaptable)} 
-     * @deprecated since 0.12.0 pass displayName or map */
+     * @deprecated since 0.13.0 pass displayName or map */
     @Deprecated
     public Task<?> submit(Runnable r);
 
     /** see {@link #submit(Map, TaskAdaptable)} 
-     * @deprecated since 0.12.0 pass displayName or map */
+     * @deprecated since 0.13.0 pass displayName or map */
     @Deprecated
     public <T> Task<T> submit(Callable<T> r);
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/6dfe4987/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
index a68df32..7d35ac4 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
@@ -88,7 +88,7 @@ public class BrooklynTaskTags extends TaskTags {
     // ------------- entity tags -------------------------
     
     public abstract static class WrappedItem<T> {
-        /** @deprecated since 0.12.0 going private; use {@link #getWrappingType()} */
+        /** @deprecated since 0.13.0 going private; use {@link #getWrappingType()} */
         @Deprecated
         public final String wrappingType;
         protected WrappedItem(String wrappingType) {
@@ -117,7 +117,7 @@ public class BrooklynTaskTags extends TaskTags {
         }
     }
     public static class WrappedEntity extends WrappedItem<Entity> {
-        /** @deprecated since 0.12.0 going private; use {@link #unwrap()} */
+        /** @deprecated since 0.13.0 going private; use {@link #unwrap()} */
         @Deprecated
         public final Entity entity;
         protected WrappedEntity(String wrappingType, Entity entity) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/6dfe4987/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index 9018d5d..c6cb74c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -89,7 +89,7 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
     @Deprecated
     protected Map<String,Object> leftoverProperties = Maps.newLinkedHashMap();
 
-    /** @deprecated since 0.12.0, going private, use {@link #getExecutionContext()} */
+    /** @deprecated since 0.13.0, going private, use {@link #getExecutionContext()} */
     @Deprecated
     protected transient ExecutionContext execution;
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/6dfe4987/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 1236219..33412d8 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -84,7 +84,7 @@ public class BasicExecutionContext extends AbstractExecutionContext {
      * Supported flags are {@code tag} and {@code tags}
      * 
      * @see ExecutionManager#submit(Map, TaskAdaptable)
-     * @deprecated since 0.12.0 use {@link #BasicExecutionContext(ExecutionManager, Iterable)}
+     * @deprecated since 0.13.0 use {@link #BasicExecutionContext(ExecutionManager, Iterable)}
      */
     @Deprecated
     public BasicExecutionContext(Map<?, ?> flags, ExecutionManager executionManager) {


[15/23] brooklyn-server git commit: Merge branch 'tasks-better' into tasks-better-tree

Posted by he...@apache.org.
Merge branch 'tasks-better' into tasks-better-tree


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/d7b086b7
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/d7b086b7
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/d7b086b7

Branch: refs/heads/master
Commit: d7b086b731922dcce0f2d9a367d924e842c22047
Parents: 6dfe498 afc17ee
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Thu Sep 28 17:28:45 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Thu Sep 28 17:28:45 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/ExecutionContext.java     |  8 ++++-
 .../util/core/task/BasicExecutionContext.java   | 28 +++++++++++------
 .../util/core/task/BasicExecutionManager.java   | 11 ++++---
 .../brooklyn/util/core/task/BasicTask.java      | 23 +++++---------
 .../brooklyn/util/core/task/ForwardingTask.java |  4 +--
 .../brooklyn/util/core/task/TaskInternal.java   | 10 +++---
 .../task/TaskInternalCancellableWithMode.java   | 32 ++++++++++++++++++++
 .../brooklyn/util/core/task/ValueResolver.java  |  8 +++--
 .../core/effector/EffectorSayHiTest.java        |  1 +
 .../util/core/task/BasicTasksFutureTest.java    | 23 +++++++-------
 .../core/task/NonBasicTaskExecutionTest.java    |  8 +----
 .../org/apache/brooklyn/util/guava/Maybe.java   |  2 +-
 .../apache/brooklyn/util/repeat/Repeater.java   |  7 +++--
 13 files changed, 102 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d7b086b7/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d7b086b7/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d7b086b7/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------


[16/23] brooklyn-server git commit: change when cancellation is done for getImmediate - means effector invocation now works

Posted by he...@apache.org.
change when cancellation is done for getImmediate - means effector invocation now works


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/bb26d320
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/bb26d320
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/bb26d320

Branch: refs/heads/master
Commit: bb26d320906ad86888c28cc32b2fa36709e70d1a
Parents: d7b086b
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Thu Sep 28 16:55:33 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Thu Sep 28 17:29:59 2017 +0100

----------------------------------------------------------------------
 .../util/core/task/BasicExecutionContext.java     | 18 +++++++-----------
 .../brooklyn/core/effector/EffectorSayHiTest.java |  6 +++---
 2 files changed, 10 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/bb26d320/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index f9f431a..2c8ddab 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -287,17 +287,6 @@ public class BasicExecutionContext extends AbstractExecutionContext {
         try {
             return runInSameThread(fakeTaskForContext, new Callable<Maybe<T>>() {
                 public Maybe<T> call() {
-                    // could try to make this work for more types of tasks by not cancelling, just interrupting;
-                    // however there is a danger that immediate-submission tasks are leaked if we don't cancel.
-                    // for instance with DSTs the thread interrupt may apply only to the main job queue.andWait blocking,
-                    // leaving other tasks leaked.
-                    //
-                    // this method is best-effort so fine if it doesn't succeed.  good if we can expand
-                    // coverage but NOT at the expense of major leaks of course!
-                    //
-                    // see WIP test in EffectorSayHiTest
-                    fakeTaskForContext.cancel();
-                    
                     boolean wasAlreadyInterrupted = Thread.interrupted();
                     try {
                         return job.getImmediately();
@@ -305,6 +294,13 @@ public class BasicExecutionContext extends AbstractExecutionContext {
                         if (wasAlreadyInterrupted) {
                             Thread.currentThread().interrupt();
                         }
+                        // we've acknowledged that getImmediate may wreck (cancel) the task,
+                        // their first priority is to prevent them from leaking;
+                        // however previously we did the cancel before running, 
+                        // doing it after means more tasks successfully execute 
+                        // (the interrupt is sufficient to prevent them blocking); 
+                        // see test EffectorSayHiTest.testInvocationGetImmediately
+                        fakeTaskForContext.cancel();
                     }
                 } });
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/bb26d320/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
index bc41d45..e7dc626 100644
--- a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
@@ -61,6 +61,7 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport {
     //TODO test edge/error conditions
     //(missing parameters, wrong number of params, etc)
 
+    @SuppressWarnings("unused")
     private static final Logger log = LoggerFactory.getLogger(EffectorSayHiTest.class);
 
     private MyEntity e;
@@ -109,11 +110,10 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport {
             .get( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
     }
     
-    @Test(groups="WIP")  // see comments at BasicExecutionContext.getImmediately
-    // TODO this will be fixed soon by #835
+    @Test(invocationCount=100)
     public void testInvocationGetImmediately() throws Exception {
         assertEquals(((EntityInternal)e).getExecutionContext()
-            .getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
+            .getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ).get(), "hi Bob");
     }
 
     @Test


[19/23] brooklyn-server git commit: Tasks.tryQueueing won't queue if calling thread is interrupted

Posted by he...@apache.org.
Tasks.tryQueueing won't queue if calling thread is interrupted

means more things work in immediate mode.
could be un-done if DynamicSequentialTask removes the DST manager thread.
fixes a few failing tests in the previous commit.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/8b727697
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/8b727697
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/8b727697

Branch: refs/heads/master
Commit: 8b7276976721ef440c858ee8253227169c32f5e0
Parents: 9888870
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Oct 4 10:57:17 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:19 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/util/core/task/DynamicTasks.java   | 21 ++++++--------------
 .../apache/brooklyn/util/core/task/Tasks.java   |  6 +++---
 2 files changed, 9 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8b727697/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
index 20b80ab..9026798 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java
@@ -199,11 +199,7 @@ public class DynamicTasks {
      * {@link BasicExecutionContext}.
      */
     public static <T> TaskQueueingResult<T> queueIfPossible(TaskAdaptable<T> task) {
-        TaskQueueingContext adder = getTaskQueuingContext();
-        boolean result = false;
-        if (adder!=null)
-            result = Tasks.tryQueueing(adder, task);
-        return new TaskQueueingResult<T>(task, result);
+        return new TaskQueueingResult<T>(task, Tasks.tryQueueing(getTaskQueuingContext(), task));
     }
 
     /** @see #queueIfPossible(TaskAdaptable) */
@@ -214,22 +210,17 @@ public class DynamicTasks {
     /** adds the given task to the nearest task addition context,
      * either set as a thread-local, or in the current task, or the submitter of the task, etc
      * <p>
-     * throws if it cannot add */
+     * throws if it cannot add or addition/execution would fail including if calling thread is interrupted */
     public static <T> Task<T> queueInTaskHierarchy(Task<T> task) {
         Preconditions.checkNotNull(task, "Task to queue cannot be null");
         Preconditions.checkState(!Tasks.isQueuedOrSubmitted(task), "Task to queue must not yet be submitted: {}", task);
         
-        TaskQueueingContext adder = getTaskQueuingContext();
-        if (adder!=null) { 
-            if (Tasks.tryQueueing(adder, task)) {
-                log.debug("Queued task {} at context {} (no hierarchy)", task, adder);
-                return task;
-            }
+        if (Tasks.tryQueueing(getTaskQueuingContext(), task)) {
+            log.debug("Queued task {} at context {} (no hierarchy)", task, getTaskQueuingContext());
+            return task;
         }
         
-        Task<?> t = Tasks.current();
-        Preconditions.checkState(t!=null || adder!=null, "No task addition context available for queueing task "+task);
-        
+        Task<?> t = Tasks.current();        
         while (t!=null) {
             if (t instanceof TaskQueueingContext) {
                 if (Tasks.tryQueueing((TaskQueueingContext)t, task)) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8b727697/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
index 12247c4..90a6bdc 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
@@ -278,11 +278,11 @@ public class Tasks {
     }
     
     /**
-     * Adds the given task to the given context. Does not throw an exception if the addition fails.
-     * @return true if the task was added, false otherwise.
+     * Adds the given task to the given context. Does not throw an exception if the addition fails or would fail.
+     * @return true if the task was added, false otherwise including if context is null or thread is interrupted.
      */
     public static boolean tryQueueing(TaskQueueingContext adder, TaskAdaptable<?> task) {
-        if (task==null || isQueued(task))
+        if (task==null || adder==null || isQueued(task) || Thread.currentThread().isInterrupted())
             return false;
         try {
             adder.queue(task.asTask());


[20/23] brooklyn-server git commit: more assertion routines, map equality and unordered iterable equality

Posted by he...@apache.org.
more assertion routines, map equality and unordered iterable equality


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/508183b1
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/508183b1
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/508183b1

Branch: refs/heads/master
Commit: 508183b1b85f7ee118d948739e4cc72121623a85
Parents: 8b72769
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Oct 4 11:08:51 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:39 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/brooklyn/test/Asserts.java  | 20 ++++++++++++++++++--
 .../org/apache/brooklyn/test/AssertsTest.java   | 17 +++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/508183b1/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
index dc672b5..3cd70b1 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
@@ -1452,8 +1452,24 @@ public class Asserts {
     }
 
     public static void assertSize(Iterable<?> list, int expectedSize) {
-        if (list==null) fail("List is null");
-        if (Iterables.size(list)!=expectedSize) fail("List has wrong size "+Iterables.size(list)+" (expected "+expectedSize+"): "+list);
+        if (list==null) fail("Collection is null");
+        if (Iterables.size(list)!=expectedSize) fail("Collection has wrong size "+Iterables.size(list)+" (expected "+expectedSize+"): "+list);
+    }
+
+    public static void assertSize(Map<?,?> map, int expectedSize) {
+        if (map==null) fail("Map is null");
+        if (Iterables.size(map.keySet())!=expectedSize) fail("Map has wrong size "+map.size()+" (expected "+expectedSize+"): "+map);
+    }
+
+    /** Ignores duplicates and order */
+    public static void assertSameUnorderedContents(Iterable<?> i1, Iterable<?> i2) {
+        if (i1==null || i2==null) {
+            if (i1==null && i2==null) {
+                return ;
+            }
+            fail("Collections differ in that one is null: "+i1+" and "+i2);
+        }
+        assertEquals(MutableSet.copyOf(i1), MutableSet.copyOf(i2));
     }
 
     public static void assertInstanceOf(Object obj, Class<?> type) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/508183b1/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java
----------------------------------------------------------------------
diff --git a/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java b/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java
index bd411b7..1105acd 100644
--- a/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java
+++ b/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java
@@ -25,7 +25,9 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.brooklyn.test.Asserts.ShouldHaveFailedPreviouslyAssertionError;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Duration;
 import org.testng.Assert;
@@ -170,4 +172,19 @@ public class AssertsTest {
         // check code flowed the way we expected
         Asserts.assertEquals(reached, 3);
     }
+    
+    @Test
+    public void testAssertSize() {
+        Asserts.assertSize(MutableList.of("x", "x", "y"), 3);
+        Asserts.assertSize(MutableSet.of("x", "x", "y"), 2);
+        Asserts.assertSize(MutableMap.of("x", "x", "y", "y"), 2);
+    }
+    
+    @Test
+    public void testAssertSetListEqualityAndSameUnoderderedContents() {
+        Assert.assertEquals(MutableSet.of("x", "x", "y"), MutableSet.of("x", "y", "x"));
+        Assert.assertNotEquals(MutableList.of("x", "x", "y"), MutableList.of("x", "y", "x"));
+        // above are baseline checks
+        Asserts.assertSameUnorderedContents(MutableList.of("x", "x", "y"), MutableList.of("y", "x"));
+    }
 }


[21/23] brooklyn-server git commit: fix message publish synching to guarantee in-order delivery

Posted by he...@apache.org.
fix message publish synching to guarantee in-order delivery

both for initial subscriptions and in-life - two changes, synching in AttributesInternal for the in-life delivery, and in subscribe/publish for the initial


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/9213f0e2
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/9213f0e2
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/9213f0e2

Branch: refs/heads/master
Commit: 9213f0e25288114718f2a06a50ee206bf0120561
Parents: 508183b
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Oct 4 13:49:42 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:39 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/SubscriptionManager.java  |   6 +-
 .../brooklyn/core/entity/AbstractEntity.java    |   1 +
 .../mgmt/internal/LocalSubscriptionManager.java | 108 +++++++++++++------
 .../brooklyn/core/sensor/AttributeMap.java      |  15 ++-
 .../core/effector/EffectorSayHiTest.java        |   2 +-
 .../core/entity/EntitySubscriptionTest.java     |   9 +-
 .../internal/LocalSubscriptionManagerTest.java  |  91 ++++++++++++++++
 .../policy/basic/PolicySubscriptionTest.java    |  35 ++++--
 8 files changed, 224 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
index 1fa327e..8302ba8 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
@@ -42,8 +42,10 @@ public interface SubscriptionManager {
      * 
      * The method returns an id which can be used to {@link #unsubscribe(SubscriptionHandle)} later.
      * <p>
-     * The listener callback is in-order single-threaded and synchronized on this object. The flags
-     * parameters can include the following:
+     * The listener callback is in-order single-threaded and synchronized on this object.
+     * In other words message delivery from a producer to a given subscriber is in publish order
+     * (or in the case of a late subscriber getting initial values, in subscribe order). 
+     * The flags parameters can include the following:
      * <ul>
      * <li>subscriber - object to identify the subscriber (e.g. entity, or console session uid) 
      * <li><i>in future</i> - control parameters for the subscription (period, minimum delta for updates, etc)

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index bd7df06..dfbbc8f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -529,6 +529,7 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
      * through this method. Internally, all attribute updates synch on this object. Code wishing to
      * update attributes or publish while holding some other lock should acquire the monitor on this
      * object first to prevent deadlock. */
+    @Beta
     protected Object getAttributesSynchObjectInternal() {
         return attributesInternal.getSynchObjectInternal();
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index a927a89..a726059 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -54,6 +54,7 @@ import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
@@ -129,15 +130,9 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
         
         if (LOG.isDebugEnabled()) LOG.debug("Creating subscription {} for {} on {} {} in {}", new Object[] {s.id, s.subscriber, producer, sensor, this});
         allSubscriptions.put(s.id, s);
-        addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor), s);
-        if (s.subscriber!=null) {
-            addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s);
-        }
-        if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) {
-            ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class);
-        }
-
+        T lastVal;
         if (notifyOfInitialValue) {
+            notifyOfInitialValue = false;
             if (producer == null) {
                 LOG.warn("Cannot notifyOfInitialValue for subscription with wildcard producer: "+s);
             } else if (sensor == null) {
@@ -145,16 +140,58 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
             } else if (!(sensor instanceof AttributeSensor)) {
                 LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
             } else {
-                if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
-                em.submit(
-                    MutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(s.producer), BrooklynTaskTags.SENSOR_TAG),
-                        "displayName", "Initial publication of "+s.sensor.getName()),
-                    () -> {
-                        T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
-                        submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
-                    });
+                notifyOfInitialValue = true;
             }
         }
+        if (notifyOfInitialValue) {
+            lastVal = (T) s.producer.sensors().get((AttributeSensor<?>) s.sensor);
+        } else {
+            lastVal = null;  // won't be used
+        }
+        addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor), s);
+        if (s.subscriber!=null) {
+            addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s);
+        }
+        if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) {
+            ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class);
+        }
+
+        if (notifyOfInitialValue) {
+            if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
+            // this is run asynchronously to prevent deadlock when trying to get attribute and publish;
+            // however we want it:
+            // (a) to run in the same order as subscriptions are made, so use the manager tag scheduler
+            // (b) ideally to use the last value that was not published to this target, and
+            // (c) to deliver before any subsequent value notification
+            // but we can't guarantee either (b) or (c) without taking a lock from before we added 
+            // the subscriber above, mutexing other sets and publications, which feels heavy and dangerous.
+            // so the compromise is to skip this delivery in cases where the last value has obviously changed -
+            // because a more recent notification is guaranteed to be sent.
+            // we may occasionally still send a duplicate, if delivery got sent in the tiny
+            // window between adding the subscription and taking the last value,
+            // we will think the last value hasn't changed.  but we will never send a
+            // wrong value as this backs out if there is any confusion over the last value.
+            em.submit(
+                MutableMap.of("tags", getPublishTags(s, s.producer),
+                    "displayName", "Initial value publication on subscription to "+s.sensor.getName()),
+                () -> {
+                    T val = (T) s.producer.sensors().get((AttributeSensor<?>) s.sensor);
+                    if (!Objects.equal(lastVal, val)) {
+                        // bail out - value has been changed;
+                        // this might be a duplicate if value changed in small window earlier,
+                        // but it won't be delivering an old value later than a newer value
+                        if (LOG.isDebugEnabled()) LOG.debug("skipping initial value delivery of {} -> {} to {} as value changed from {} to {}", new Object[] {s.producer, s.sensor, s, lastVal, val});
+                        return;
+                    }
+                    // guard against case where other thread changes the val and publish
+                    // while we are publishing, and our older val is then delivered after theirs.
+                    // 2017-10 previously we did not do this, then looked at doing it with the attribute lock object
+                    // synchronized (((AbstractEntity)s.producer).getAttributesSynchObjectInternal()) {
+                    // but realized a better thing is to have initial delivery _done_, not just submitted, 
+                    // by ourselves, as we are already in the right thread now and can prevent interleaving this way
+                    submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
+                });
+        }
         
         return s;
     }
@@ -210,7 +247,6 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
         // (recommend exactly one per subscription to prevent deadlock)
         // this is done with:
         // em.setTaskSchedulerForTag(subscriberId, SingleThreadedScheduler.class);
-        
         //note, generating the notifications must be done in the calling thread to preserve order
         //e.g. emit(A); emit(B); should cause onEvent(A); onEvent(B) in that order
         if (LOG.isTraceEnabled()) LOG.trace("{} got event {}", this, event);
@@ -228,18 +264,11 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
     }
     
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void submitPublishEvent(final Subscription s, final SensorEvent<?> event, final boolean isInitial) {
+    private void submitPublishEvent(final Subscription s, final SensorEvent<?> event, final boolean isInitialPublicationOfOldValueInCorrectScheduledThread) {
         if (s.eventFilter!=null && !s.eventFilter.apply(event))
             return;
         
-        List<Object> tags = MutableList.builder()
-            .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
-            .add(s.subscriberExecutionManagerTag)
-            .add(BrooklynTaskTags.SENSOR_TAG)
-            // associate the publish event with the publisher (though on init it might be triggered by subscriber)
-            .addIfNotNull(event.getSource()!=null ? BrooklynTaskTags.tagForTargetEntity(event.getSource()) : null)
-            .build()
-            .asUnmodifiable();
+        List<Object> tags = getPublishTags(s, event.getSource()).asUnmodifiable();
         
         StringBuilder name = new StringBuilder("sensor ");
         StringBuilder description = new StringBuilder("Sensor ");
@@ -271,12 +300,12 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
             "displayName", name.toString(),
             "description", description.toString());
         
-        boolean isEntityStarting = s.subscriber instanceof Entity && isInitial;
+        boolean isEntityStarting = s.subscriber instanceof Entity && isInitialPublicationOfOldValueInCorrectScheduledThread;
         // will have entity (and adjunct) execution context from tags, so can skip getting exec context
-        em.submit(execFlags, new Runnable() {
+        Runnable deliverer = new Runnable() {
             @Override
             public String toString() {
-                if (isInitial) {
+                if (isInitialPublicationOfOldValueInCorrectScheduledThread) {
                     return "LSM.publishInitial("+event+")";
                 } else {
                     return "LSM.publish("+event+")";
@@ -312,7 +341,26 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
                         LOG.warn("Error processing subscriptions to "+this+": "+t, t);
                     }
                 }
-            }});
+            }};
+        if (!isInitialPublicationOfOldValueInCorrectScheduledThread) {
+            em.submit(execFlags, deliverer);
+        } else {
+            // for initial, caller guarantees he is running in the right thread/context
+            // where the above submission would take place, typically the
+            // subscriber single threaded executor with the entity context;
+            // this allows caller to do extra assertions and bailout steps at the right time
+            deliverer.run();
+        }
+    }
+
+    private MutableList<Object> getPublishTags(final Subscription<?> s, final Entity source) {
+        return MutableList.builder()
+            .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+            .add(s.subscriberExecutionManagerTag)
+            .add(BrooklynTaskTags.SENSOR_TAG)
+            // associate the publish event with the publisher (though on init it might be triggered by subscriber)
+            .addIfNotNull(source!=null ? BrooklynTaskTags.tagForTargetEntity(source) : null)
+            .build();
     }
     
     protected boolean includeDescriptionForSensorTask(SensorEvent<?> event) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
index dee0700..d9da6ae 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
@@ -139,9 +139,18 @@ public final class AttributeMap {
     }
 
     public <T> T update(AttributeSensor<T> attribute, T newValue) {
-        T oldValue = updateWithoutPublishing(attribute, newValue);
-        entity.emitInternal(attribute, newValue);
-        return oldValue;
+        // 2017-10 this was unsynched which meant if two threads updated
+        // the last publication would not correspond to the last value.
+        // could introduce deadlock but emit internal and publish should
+        // not seek any locks. _subscribe_ and _delivery_ might, but they
+        // won't be in this block. an issue with _subscribe-and-get-initial_
+        // should be resolved by initial subscription queueing the publication
+        // to a context where locks are not held.
+        synchronized (values) {
+            T oldValue = updateWithoutPublishing(attribute, newValue);
+            entity.emitInternal(attribute, newValue);
+            return oldValue;
+        }
     }
     
     public <T> T updateWithoutPublishing(AttributeSensor<T> attribute, T newValue) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
index e7dc626..4b991ad 100644
--- a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
@@ -110,7 +110,7 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport {
             .get( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
     }
     
-    @Test(invocationCount=100)
+    @Test
     public void testInvocationGetImmediately() throws Exception {
         assertEquals(((EntityInternal)e).getExecutionContext()
             .getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ).get(), "hi Bob");

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index ea91f36..8ec2794 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -34,6 +34,8 @@ import org.apache.brooklyn.core.test.policy.TestEnricher;
 import org.apache.brooklyn.core.test.policy.TestPolicy;
 import org.apache.brooklyn.entity.group.BasicGroup;
 import org.apache.brooklyn.test.Asserts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -44,7 +46,9 @@ import com.google.common.collect.Iterables;
 
 public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
 
-    // TODO Duplication between this and PolicySubscriptionTest
+    // TODO Duplication between this and PolicySubscriptionTest and LocalSubscriptionManagerTest
+
+    private static final Logger log = LoggerFactory.getLogger(EntitySubscriptionTest.class);
     
     private static final long SHORT_WAIT_MS = 100;
 
@@ -221,6 +225,7 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
     }
     
     @Test
+    @SuppressWarnings("unused")
     public void testUnsubscribeUsingHandleStopsEvents() {
         SubscriptionHandle handle1 = entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
         SubscriptionHandle handle2 = entity.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
@@ -300,6 +305,8 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
     
     @Test
     public void testContextEntityOnSubscriptionCallbackTask() {
+        log.info("Observing "+observedEntity+" from "+entity);
+        
         observedEntity.sensors().set(TestEntity.NAME, "myval");
         entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener);
         

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
index 4e47090..1373545 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.brooklyn.api.entity.EntitySpec;
@@ -32,12 +33,22 @@ import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.SubscriptionManager;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.entity.group.BasicGroup;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
 /**
  * testing the {@link SubscriptionManager} and associated classes.
  */
@@ -170,4 +181,84 @@ public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport {
         if (threadException.get() != null) throw threadException.get();
     }
 
+    @Test
+    // same test as in PolicySubscriptionTest, but for entities / simpler
+    public void testSubscriptionReceivesInitialValueEventsInOrder() {
+        RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+        
+        entity.sensors().set(TestEntity.NAME, "myname");
+        entity.sensors().set(TestEntity.SEQUENCE, 123);
+        entity.sensors().emit(TestEntity.MY_NOTIF, -1);
+
+        // delivery should be in subscription order, so 123 then 456
+        entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener);
+        // wait for the above delivery - otherwise it might get dropped
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> { 
+            Asserts.assertSize(listener.getEvents(), 1); });
+        entity.sensors().set(TestEntity.SEQUENCE, 456);
+        
+        // notifications don't have "initial value" so don't get -1
+        entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.MY_NOTIF, listener);
+        // but do get 1, after 456
+        entity.sensors().emit(TestEntity.MY_NOTIF, 1);
+        
+        // STOPPING and myname received, in subscription order, after everything else
+        entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING);
+        entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener);
+        entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener);
+        
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable() {
+            @Override public void run() {
+                Asserts.assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123),
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 456),
+                        new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, entity, 1),
+                        new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING),
+                        new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")),
+                    "actually got: "+listener.getEvents());
+            }});
+    }
+    
+    @Test
+    public void testNotificationOrderMatchesSetValueOrderWhenSynched() {
+        RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+        
+        AtomicInteger count = new AtomicInteger();
+        Runnable set = () -> { 
+            synchronized (count) { 
+                entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet()); 
+            } 
+        };
+        entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE, listener);
+        for (int i=0; i<10; i++) {
+            new Thread(set).start();
+        }
+        
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> { 
+            Asserts.assertSize(listener.getEvents(), 10); });
+        for (int i=0; i<10; i++) {
+            Assert.assertEquals(listener.getEvents().get(i).getValue(), i+1);
+        }
+    }
+
+    @Test
+    public void testNotificationOrderMatchesSetValueOrderWhenNotSynched() {
+        RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+        
+        AtomicInteger count = new AtomicInteger();
+        Runnable set = () -> { 
+            // as this is not synched, the sets may interleave
+            entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet()); 
+        };
+        entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE, listener);
+        for (int i=0; i<10; i++) {
+            new Thread(set).start();
+        }
+        
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> { 
+            Asserts.assertSize(listener.getEvents(), 10); });
+        // all we expect for sure is that the last value is whatever the sensor is at the end - internal update and publish is mutexed
+        Assert.assertEquals(listener.getEvents().get(9).getValue(), entity.sensors().get(TestEntity.SEQUENCE));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
index 0f3310e..4d733e6 100644
--- a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
@@ -24,12 +24,15 @@ import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.policy.PolicySpec;
 import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
 import org.apache.brooklyn.core.location.SimulatedLocation;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.core.sensor.BasicSensorEvent;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -102,6 +105,7 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
     }
     
     @Test
+    @SuppressWarnings("unused")
     public void testUnsubscribeUsingHandleStopsEvents() throws Exception {
         SubscriptionHandle handle1 = policy.subscriptions().subscribe(entity, TestEntity.SEQUENCE, listener);
         SubscriptionHandle handle2 = policy.subscriptions().subscribe(entity, TestEntity.NAME, listener);
@@ -122,18 +126,37 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
     }
 
     @Test
-    public void testSubscriptionReceivesInitialValueEvents() {
-        entity.sensors().set(TestEntity.SEQUENCE, 123);
+    public void testSubscriptionReceivesInitialValueEventsInOrder() {
         entity.sensors().set(TestEntity.NAME, "myname");
-        
+        entity.sensors().set(TestEntity.SEQUENCE, 123);
+        entity.sensors().emit(TestEntity.MY_NOTIF, -1);
+
+        // delivery should be in subscription order, so 123 then 456
         policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener);
+        // wait for the above delivery - otherwise it might get dropped
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> { 
+            Asserts.assertSize(listener.getEvents(), 1); });
+        entity.sensors().set(TestEntity.SEQUENCE, 456);
+        
+        // notifications don't have "initial value" so don't get -1
+        policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.MY_NOTIF, listener);
+        // but do get 1, after 456
+        entity.sensors().emit(TestEntity.MY_NOTIF, 1);
+        
+        // STOPPING and myname received, in subscription order, after everything else
+        entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING);
+        policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener);
         policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener);
         
-        Asserts.succeedsEventually(new Runnable() {
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable() {
             @Override public void run() {
                 assertEquals(listener.getEvents(), ImmutableList.of(
                         new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123),
-                        new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")));
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 456),
+                        new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, entity, 1),
+                        new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING),
+                        new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")),
+                    "actually got: "+listener.getEvents());
             }});
     }
     
@@ -147,7 +170,7 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
         
         Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() {
             @Override public void run() {
-                assertEquals(listener.getEvents(), ImmutableList.of());
+                Asserts.assertSize(listener.getEvents(), 0);
             }});
     }
     


[03/23] brooklyn-server git commit: task visibility: validation of config

Posted by he...@apache.org.
task visibility: validation of config


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/37b6b114
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/37b6b114
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/37b6b114

Branch: refs/heads/master
Commit: 37b6b11452b9619decde71376f95b73eab0836b4
Parents: 2bdcf1a
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 12 11:49:51 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 17:10:54 2017 +0100

----------------------------------------------------------------------
 .../apache/brooklyn/core/config/ConfigConstraints.java | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/37b6b114/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
index 2d0cf7d..b891c2b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
+++ b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
@@ -28,9 +28,11 @@ import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.objs.BrooklynObject;
 import org.apache.brooklyn.api.objs.EntityAdjunct;
 import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
 import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
 import org.apache.brooklyn.core.objs.BrooklynObjectPredicate;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,12 +114,17 @@ public abstract class ConfigConstraints<T extends BrooklynObject> {
     abstract Iterable<ConfigKey<?>> getBrooklynObjectTypeConfigKeys();
 
     public Iterable<ConfigKey<?>> getViolations() {
-        // TODO in new task
-        return validateAll();
+        if (getBrooklynObject() instanceof EntityInternal) {
+            return ((EntityInternal) getBrooklynObject()).getExecutionContext().get(
+                Tasks.<Iterable<ConfigKey<?>>>builder().dynamic(false).displayName("Validating config").body(
+                    () -> validateAll() ).build() );
+        } else {
+            return validateAll();
+        }
     }
 
     @SuppressWarnings("unchecked")
-    private Iterable<ConfigKey<?>> validateAll() {
+    protected Iterable<ConfigKey<?>> validateAll() {
         List<ConfigKey<?>> violating = Lists.newLinkedList();
         Iterable<ConfigKey<?>> configKeys = getBrooklynObjectTypeConfigKeys();
         LOG.trace("Checking config keys on {}: {}", getBrooklynObject(), configKeys);


[23/23] brooklyn-server git commit: This closes #835

Posted by he...@apache.org.
This closes #835


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/a73ee172
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/a73ee172
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/a73ee172

Branch: refs/heads/master
Commit: a73ee172847766be69c9ffd6128e2d9679bc3a61
Parents: 54f9c70 2dcb0a0
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Oct 6 09:06:06 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Oct 6 09:06:06 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/ExecutionContext.java     |  33 +++--
 .../brooklyn/api/mgmt/ExecutionManager.java     |  14 ++-
 .../brooklyn/api/mgmt/ManagementContext.java    |   7 ++
 .../brooklyn/api/mgmt/SubscriptionManager.java  |   6 +-
 .../camp/brooklyn/ConfigParametersYamlTest.java |  13 +-
 .../brooklyn/camp/brooklyn/spi/dsl/DslTest.java |   4 +-
 .../brooklyn/core/config/ConfigConstraints.java |  18 ++-
 .../brooklyn/core/entity/AbstractEntity.java    |   1 +
 .../core/entity/trait/StartableMethods.java     |   6 +-
 .../org/apache/brooklyn/core/feed/Poller.java   |   2 +-
 .../core/location/BasicMachineDetails.java      |   5 +-
 .../location/access/BrooklynAccessUtils.java    |   2 +-
 .../brooklyn/core/mgmt/BrooklynTaskTags.java    |  64 ++++++++--
 .../core/mgmt/EntityManagementUtils.java        |   8 +-
 .../internal/AbstractManagementContext.java     |  25 ++++
 .../internal/AbstractSubscriptionManager.java   |   5 +
 .../mgmt/internal/BrooklynGarbageCollector.java |  93 +++++++++-----
 .../mgmt/internal/EntityManagementSupport.java  |  41 ++++---
 .../mgmt/internal/LocalSubscriptionManager.java | 123 +++++++++++++++----
 .../NonDeploymentManagementContext.java         |  29 ++++-
 .../internal/QueueingSubscriptionManager.java   |   5 +-
 .../core/mgmt/internal/Subscription.java        |   1 +
 .../core/mgmt/rebind/RebindManagerImpl.java     |  11 +-
 .../AbstractConfigurationSupportInternal.java   |  12 +-
 .../core/objs/AbstractEntityAdjunct.java        |  28 ++++-
 .../brooklyn/core/objs/AdjunctConfigMap.java    |   4 +-
 .../core/objs/BrooklynObjectInternal.java       |   2 +
 .../core/objs/proxy/EntityProxyImpl.java        |   2 +-
 .../core/objs/proxy/InternalEntityFactory.java  |   4 +-
 .../brooklyn/core/sensor/AttributeMap.java      |  15 ++-
 .../entity/group/DynamicClusterImpl.java        |   7 +-
 .../SshCommandMembershipTrackingPolicy.java     |   2 +-
 .../apache/brooklyn/feed/shell/ShellFeed.java   |   2 +-
 .../core/task/AbstractExecutionContext.java     |  10 +-
 .../util/core/task/BasicExecutionContext.java   |  30 ++---
 .../util/core/task/BasicExecutionManager.java   |  13 +-
 .../brooklyn/util/core/task/DynamicTasks.java   |  90 ++++++++------
 .../apache/brooklyn/util/core/task/Tasks.java   |   6 +-
 .../core/config/DeferredConfigTest.java         |   8 +-
 .../core/effector/EffectorSayHiTest.java        |   6 +-
 .../entity/ApplicationLifecycleStateTest.java   |  18 +--
 .../brooklyn/core/entity/EntityAssertsTest.java |  93 +++-----------
 .../core/entity/EntitySubscriptionTest.java     |   9 +-
 .../core/entity/hello/LocalEntitiesTest.java    |  20 ++-
 .../apache/brooklyn/core/feed/PollerTest.java   |   2 +-
 .../internal/EntityExecutionManagerTest.java    |  48 ++++++--
 .../internal/LocalSubscriptionManagerTest.java  |  89 ++++++++++++++
 .../mgmt/persist/XmlMementoSerializerTest.java  |   2 +-
 .../core/mgmt/rebind/RebindFeedWithHaTest.java  |   8 +-
 .../core/mgmt/rebind/RebindManagerTest.java     |  13 +-
 .../policy/basic/PolicySubscriptionTest.java    |  35 +++++-
 .../qa/performance/TaskPerformanceTest.java     |   6 +-
 .../entity/group/DynamicClusterTest.java        |   8 +-
 .../ssh/SshMachineLocationIntegrationTest.java  |   6 +-
 .../location/ssh/SshMachineLocationTest.java    |  12 +-
 .../util/core/task/TaskPredicatesTest.java      |   5 +-
 .../brooklyn/util/core/task/TasksTest.java      |   4 +-
 .../util/core/task/ValueResolverTest.java       |  93 +++++++++++---
 .../policy/jclouds/os/CreateUserPolicy.java     |   6 +-
 .../policy/ha/AbstractFailureDetector.java      |  16 ++-
 .../policy/ha/ServiceFailureDetector.java       |   7 +-
 .../brooklyn/policy/ha/ServiceReplacer.java     |  15 +--
 .../brooklyn/policy/ha/ServiceRestarter.java    |   7 +-
 .../rest/resources/SensorResourceTest.java      |   2 +-
 .../entity/brooklynnode/BrooklynNodeImpl.java   |  12 +-
 .../SameServerDriverLifecycleEffectorTasks.java |   5 +-
 .../windows/WindowsPerformanceCounterFeed.java  |  21 ++--
 .../java/org/apache/brooklyn/test/Asserts.java  |  20 ++-
 .../org/apache/brooklyn/test/AssertsTest.java   |  17 +++
 69 files changed, 892 insertions(+), 464 deletions(-)
----------------------------------------------------------------------



[12/23] brooklyn-server git commit: fix tests that asserted specific tasks (as there are now more)

Posted by he...@apache.org.
fix tests that asserted specific tasks (as there are now more)


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/130a29b9
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/130a29b9
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/130a29b9

Branch: refs/heads/master
Commit: 130a29b906da83f108ae1ea6cebef94c0c5f6a6f
Parents: 79cc9bc
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Sep 20 13:37:21 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Sep 20 13:37:21 2017 +0100

----------------------------------------------------------------------
 .../mgmt/internal/LocalSubscriptionManager.java |  2 +-
 .../core/entity/hello/LocalEntitiesTest.java    | 20 +++++++++-----------
 .../internal/EntityExecutionManagerTest.java    | 15 ++++++++++++---
 .../core/mgmt/rebind/RebindFeedWithHaTest.java  |  9 +++++++--
 4 files changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/130a29b9/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 6b1a5d9..a927a89 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -147,7 +147,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
             } else {
                 if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
                 em.submit(
-                    MutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(s.producer)),
+                    MutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(s.producer), BrooklynTaskTags.SENSOR_TAG),
                         "displayName", "Initial publication of "+s.sensor.getName()),
                     () -> {
                         T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/130a29b9/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java
index 985997e..6951a97 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java
@@ -33,20 +33,19 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.mgmt.EntityManager;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.location.SimulatedLocation;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.time.Time;
 
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
@@ -63,14 +62,12 @@ public class LocalEntitiesTest extends BrooklynAppUnitTestSupport {
     public static final Logger log = LoggerFactory.getLogger(LocalEntitiesTest.class);
     
     private SimulatedLocation loc;
-    private EntityManager entityManager;
             
     @BeforeMethod(alwaysRun=true)
     @Override
     public void setUp() throws Exception {
         super.setUp();
         loc = new SimulatedLocation();
-        entityManager = mgmt.getEntityManager();
     }
 
     @Test
@@ -165,9 +162,10 @@ public class LocalEntitiesTest extends BrooklynAppUnitTestSupport {
         h.setAge(6);
         long totalTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
         
-        // TODO guava util for (1..5)
-        Asserts.continually(MutableMap.of("timeout", 50), Suppliers.ofInstance(data), Predicates.<Object>equalTo(ImmutableList.of(1,2,3,4,5)));
-        assertTrue(totalTime < 2000, "totalTime="+totalTime);  //shouldn't have blocked for anywhere close to 2s (Aled says TODO: too time sensitive for BuildHive?)
+        Asserts.continually(
+            Suppliers.ofInstance(data), Predicates.<Object>equalTo(ImmutableList.of(1,2,3,4,5)),
+            Duration.millis(50), null, null);
+        assertTrue(totalTime < 2000, "totalTime="+totalTime);  //shouldn't have blocked for anywhere close to 2s (unless build machine v v slow eg BuildHive)
     }
 
     @Test
@@ -224,8 +222,8 @@ public class LocalEntitiesTest extends BrooklynAppUnitTestSupport {
         assertTrue(System.currentTimeMillis() - startTime < 1500);
         synchronized (sonsConfig) {
             assertEquals(null, sonsConfig[0]);
-            for (Task tt : ((EntityInternal)dad).getExecutionContext().getTasks()) { log.info("task at dad:  {}, {}", tt, tt.getStatusDetail(false)); }
-            for (Task tt : ((EntityInternal)son).getExecutionContext().getTasks()) { log.info("task at son:  {}, {}", tt, tt.getStatusDetail(false)); }
+            for (Task<?> tt : ((EntityInternal)dad).getExecutionContext().getTasks()) { log.info("task at dad:  {}, {}", tt, tt.getStatusDetail(false)); }
+            for (Task<?> tt : ((EntityInternal)son).getExecutionContext().getTasks()) { log.info("task at son:  {}, {}", tt, tt.getStatusDetail(false)); }
             dad.sensors().set(HelloEntity.FAVOURITE_NAME, "Dan");
             if (!s1.tryAcquire(2, TimeUnit.SECONDS)) fail("race mismatch, missing permits");
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/130a29b9/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
index 97f02b7..4f581f5 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
@@ -46,6 +47,7 @@ import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.task.BasicExecutionManager;
@@ -149,6 +151,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         for (Task<?> t: tasks) {
             if (t instanceof ScheduledTask) continue;
             if (t.getTags().contains(BrooklynTaskTags.SENSOR_TAG)) continue;
+            if (t.getDisplayName().contains("Validating")) continue;
             result.add(t);
         }
         return result;
@@ -294,8 +297,6 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         forceGc();
         stopCondition.set(true);
 
-        // might need an eventually here, if the internal job completion and GC is done in the background
-        // (if there are no test failures for a few months, since Sept 2014, then we can remove this comment)
         assertTaskMaxCountForEntityEventually(e, 2);
     }
 
@@ -343,7 +344,15 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         forceGc();
         Collection<Task<?>> t2 = em.getAllTasks();
 
-        Assert.assertEquals(t1.size(), t2.size(), "lists are different:\n"+t1+"\n"+t2+"\n");
+        // no tasks from first batch were GC'd
+        Asserts.assertSize(MutableList.builder().addAll(t1).removeAll(t2).build(), 0);
+
+        // and we expect just the add/remove cycle at parent, and service problems
+        Set<String> newOnes = MutableList.<Task<?>>builder().addAll(t2).removeAll(t1).build().stream().map(
+            (t) -> t.getDisplayName()).collect(Collectors.toSet());
+        Function<String,String> prefix = (s) -> "sensor "+app.getId()+":"+s;
+        Assert.assertEquals(newOnes, MutableSet.of(
+            prefix.apply("entity.children.removed"), prefix.apply("entity.children.added"), prefix.apply("service.problems"))); 
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/130a29b9/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java
index a38a873..319311a 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java
@@ -24,6 +24,7 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
 
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.mgmt.Task;
@@ -31,7 +32,6 @@ import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.sensor.Feed;
 import org.apache.brooklyn.core.entity.EntityAsserts;
-import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
 import org.apache.brooklyn.core.test.entity.TestApplication;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.util.core.http.BetterMockWebServer;
@@ -102,10 +102,11 @@ public class RebindFeedWithHaTest extends RebindTestFixtureWithApp {
             @Override
             public Boolean call() throws Exception {
                 origManagementContext.getGarbageCollector().gcIteration();
-                List<Task<?>> tasksAfter = ((BasicExecutionManager)origManagementContext.getExecutionManager()).getAllTasks();
+                List<Task<?>> tasksAfter = removeSystemTasks( ((BasicExecutionManager)origManagementContext.getExecutionManager()).getAllTasks() );
                 log.info("tasks after disabling HA, "+tasksAfter.size()+": "+tasksAfter);
                 return tasksAfter.isEmpty();
             }
+
         }).runRequiringTrue();
         
         newManagementContext = createNewManagementContext();
@@ -123,6 +124,10 @@ public class RebindFeedWithHaTest extends RebindTestFixtureWithApp {
         EntityAsserts.assertAttributeEqualsEventually(newEntity, SENSOR_STRING, "{\"foo\":\"myfoo\"}");
     }
 
+    static List<Task<?>> removeSystemTasks(List<Task<?>> tasks) {
+        return tasks.stream().filter(t -> !("rebind".equals(t.getDisplayName()) || t.getDisplayName().contains("Validating"))).collect(Collectors.toList());
+    }
+
     @Test(groups="Integration", invocationCount=50)
     public void testHttpFeedCleansUpAfterHaDisabledAndRunsAtFailoverManyTimes() throws Exception {
         testHttpFeedCleansUpAfterHaDisabledAndRunsAtFailover();


[13/23] brooklyn-server git commit: Merge branch 'master' into tasks-better-tree

Posted by he...@apache.org.
Merge branch 'master' into tasks-better-tree


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/80446ab1
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/80446ab1
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/80446ab1

Branch: refs/heads/master
Commit: 80446ab1f28d35d430899f19f0f46e6ccab777d4
Parents: 130a29b df83d44
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Mon Sep 25 10:38:50 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 25 10:38:50 2017 +0100

----------------------------------------------------------------------
 api/pom.xml                                     |    2 +-
 .../brooklyn/api/catalog/BrooklynCatalog.java   |    3 +
 .../brooklyn/api/typereg/RegisteredType.java    |    5 +-
 .../typereg/RegisteredTypeLoadingContext.java   |    2 +-
 camp/camp-base/pom.xml                          |    2 +-
 camp/camp-brooklyn/pom.xml                      |    2 +-
 .../camp/brooklyn/AbstractYamlTest.java         |   31 +-
 .../brooklyn/EmptySoftwareProcessYamlTest.java  |   94 +-
 .../camp/brooklyn/FunctionSensorYamlTest.java   |  123 ++
 .../VanillaSoftwareProcessYamlTest.java         |  299 +++++
 .../CatalogOsgiVersionMoreEntityRebindTest.java |   10 +-
 .../CatalogOsgiVersionMoreEntityTest.java       |    3 +
 camp/camp-server/pom.xml                        |    2 +-
 camp/pom.xml                                    |    2 +-
 core/pom.xml                                    |    2 +-
 .../apache/brooklyn/core/BrooklynVersion.java   |    2 +-
 .../catalog/internal/BasicBrooklynCatalog.java  |  106 +-
 .../core/mgmt/entitlement/Entitlements.java     |    1 +
 .../core/mgmt/ha/OsgiArchiveInstaller.java      |   18 +-
 .../mgmt/ha/OsgiBundleInstallationResult.java   |   21 +-
 .../brooklyn/core/mgmt/ha/OsgiManager.java      |   89 +-
 .../core/sensor/function/FunctionSensor.java    |   89 ++
 .../core/typereg/BasicBrooklynTypeRegistry.java |   20 +-
 .../typereg/BasicTypeImplementationPlan.java    |    4 +-
 .../typereg/RegisteredTypeLoadingContexts.java  |    6 +-
 .../core/typereg/RegisteredTypePredicates.java  |   98 +-
 .../brooklyn/core/typereg/RegisteredTypes.java  |   43 +-
 core/src/main/resources/catalog.bom             |    2 +-
 .../entity/RecordingSensorEventListener.java    |   12 +
 .../core/mgmt/osgi/OsgiStandaloneTest.java      |    4 +-
 .../core/policy/basic/EnricherTypeTest.java     |    6 -
 .../sensor/function/FunctionSensorTest.java     |   77 ++
 .../core/test/entity/TestEntityImpl.java        |    2 +-
 .../util/core/internal/ssh/ExecCmdAsserts.java  |   12 +-
 core/src/test/resources/catalog.bom             |    2 +-
 karaf/commands/pom.xml                          |    2 +-
 karaf/features/pom.xml                          |    2 +-
 karaf/httpcomponent-extension/pom.xml           |    2 +-
 karaf/init/pom.xml                              |    2 +-
 .../init/src/main/resources/catalog-classes.bom |   14 +-
 karaf/jetty-config/pom.xml                      |    2 +-
 karaf/pom.xml                                   |    2 +-
 karaf/start/pom.xml                             |    2 +-
 launcher-common/pom.xml                         |    2 +-
 launcher/pom.xml                                |    2 +-
 locations/container/pom.xml                     |    2 +-
 .../resources/generic-application.tests.bom     |    2 +-
 .../src/test/resources/generic.tests.bom        |    2 +-
 locations/jclouds/pom.xml                       |    2 +-
 .../location/jclouds/JcloudsLocation.java       |   73 +-
 .../api/JcloudsLocationConfigPublic.java        |    6 +
 .../JcloudsMaxConcurrencyStubbedTest.java       |  225 ++++
 logging/logback-includes/pom.xml                |    2 +-
 logging/logback-xml/pom.xml                     |    2 +-
 parent/pom.xml                                  |    2 +-
 policy/pom.xml                                  |    2 +-
 .../action/AbstractScheduledEffectorPolicy.java |  292 +++++
 .../policy/action/PeriodicEffectorPolicy.java   |  116 ++
 .../policy/action/ScheduledEffectorPolicy.java  |  100 ++
 .../policy/autoscaling/AutoScalerPolicy.java    |    4 +-
 policy/src/main/resources/catalog.bom           |   25 +-
 .../action/AbstractEffectorPolicyTest.java      |   81 ++
 .../action/PeriodicEffectorPolicyTest.java      |  123 ++
 .../action/ScheduledEffectorPolicyTest.java     |  121 ++
 .../action/ScheduledPolicyRebindTest.java       |  137 +++
 .../AutoScalerPolicyPoolSizeTest.java           |   18 +-
 .../AbstractLoadBalancingPolicyTest.java        |    1 -
 pom.xml                                         |    4 +-
 rest/rest-api/pom.xml                           |    2 +-
 .../org/apache/brooklyn/rest/api/BundleApi.java |  147 +++
 .../apache/brooklyn/rest/api/CatalogApi.java    |   53 +
 .../org/apache/brooklyn/rest/api/TypeApi.java   |  100 ++
 .../domain/BundleInstallationRestResult.java    |   67 ++
 .../brooklyn/rest/domain/BundleSummary.java     |  106 ++
 .../brooklyn/rest/domain/ConfigSummary.java     |   79 +-
 .../rest/domain/EnricherConfigSummary.java      |   52 +-
 .../rest/domain/EntityConfigSummary.java        |   67 +-
 .../rest/domain/LocationConfigSummary.java      |   40 +-
 .../rest/domain/PolicyConfigSummary.java        |   53 +-
 .../brooklyn/rest/domain/TaskSummary.java       |    9 +-
 .../apache/brooklyn/rest/domain/TypeDetail.java |   87 ++
 .../brooklyn/rest/domain/TypeSummary.java       |  293 +++++
 rest/rest-resources/pom.xml                     |    2 +-
 .../apache/brooklyn/rest/BrooklynRestApi.java   |    6 +-
 .../brooklyn/rest/resources/BundleResource.java |  169 +++
 .../rest/resources/CatalogResource.java         |   40 +-
 .../brooklyn/rest/resources/TypeResource.java   |  207 ++++
 .../rest/transform/EntityTransformer.java       |   26 +-
 .../rest/transform/PolicyTransformer.java       |    3 +-
 .../rest/transform/TypeTransformer.java         |  197 ++++
 .../resources/BundleAndTypeResourcesTest.java   | 1098 ++++++++++++++++++
 rest/rest-server/pom.xml                        |    2 +-
 server-cli/pom.xml                              |    2 +-
 .../main/resources/brooklyn/default.catalog.bom |    2 +-
 server-cli/src/main/resources/catalog.bom       |    2 +-
 software/base/pom.xml                           |    2 +-
 .../entity/brooklynnode/BrooklynNode.java       |    2 +-
 .../brooklynnode/BrooklynNodeSshDriver.java     |    2 +-
 .../brooklyn/entity/java/JmxmpSslSupport.java   |    2 +-
 .../software/base/EmptySoftwareProcessImpl.java |    4 +-
 .../software/base/VanillaSoftwareProcess.java   |    6 +
 .../base/VanillaSoftwareProcessImpl.java        |   16 +-
 software/base/src/main/resources/catalog.bom    |    2 +-
 software/winrm/pom.xml                          |    2 +-
 test-framework/pom.xml                          |    2 +-
 test-framework/src/main/resources/catalog.bom   |    6 +-
 test-support/pom.xml                            |    2 +-
 .../osgi/com-example-entities/pom.xml           |    2 +-
 utils/common/dependencies/osgi/entities/pom.xml |    2 +-
 .../dependencies/osgi/more-entities-v1/pom.xml  |    2 +-
 .../osgi/more-entities-v2-evil-twin/pom.xml     |    2 +-
 .../dependencies/osgi/more-entities-v2/pom.xml  |    2 +-
 utils/common/pom.xml                            |    2 +-
 .../brooklyn/util/osgi/VersionedName.java       |   24 +-
 .../brooklyn/util/maven/MavenArtifactTest.java  |    4 +-
 utils/groovy/pom.xml                            |    2 +-
 utils/jmx/jmxmp-ssl-agent/pom.xml               |    2 +-
 utils/jmx/jmxrmi-agent/pom.xml                  |    2 +-
 utils/rest-swagger/pom.xml                      |    2 +-
 utils/rt-felix/pom.xml                          |    2 +-
 utils/test-support/pom.xml                      |    2 +-
 121 files changed, 5095 insertions(+), 486 deletions(-)
----------------------------------------------------------------------



[06/23] brooklyn-server git commit: entity adjuncts have extra tag for execution context, used in subscription delivery

Posted by he...@apache.org.
entity adjuncts have extra tag for execution context, used in subscription delivery


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/d4c9fe12
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/d4c9fe12
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/d4c9fe12

Branch: refs/heads/master
Commit: d4c9fe12ecfdf884ea8e1945e51cc07e026d7610
Parents: 8ecf395
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 19 13:10:17 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Sep 19 13:12:11 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/ManagementContext.java    |  4 ++
 .../brooklyn/core/config/ConfigConstraints.java |  9 ++-
 .../org/apache/brooklyn/core/feed/Poller.java   |  2 +-
 .../brooklyn/core/mgmt/BrooklynTaskTags.java    | 64 ++++++++++++++++----
 .../internal/AbstractManagementContext.java     |  8 +++
 .../mgmt/internal/EntityManagementSupport.java  | 22 ++++---
 .../NonDeploymentManagementContext.java         | 20 ++++--
 .../core/objs/AbstractEntityAdjunct.java        | 32 ++++++++--
 .../brooklyn/core/objs/AdjunctConfigMap.java    |  4 +-
 .../SshCommandMembershipTrackingPolicy.java     |  2 +-
 .../apache/brooklyn/feed/shell/ShellFeed.java   |  2 +-
 .../util/core/task/BasicExecutionContext.java   |  5 +-
 .../internal/EntityExecutionManagerTest.java    |  5 +-
 .../brooklyn/util/core/task/TasksTest.java      |  4 +-
 .../policy/jclouds/os/CreateUserPolicy.java     |  6 +-
 .../policy/ha/AbstractFailureDetector.java      | 16 +++--
 .../policy/ha/ServiceFailureDetector.java       |  7 ++-
 .../brooklyn/policy/ha/ServiceReplacer.java     | 13 +---
 .../brooklyn/policy/ha/ServiceRestarter.java    |  5 +-
 .../brooklyn/entity/chef/ChefAttributeFeed.java | 16 +++--
 .../windows/WindowsPerformanceCounterFeed.java  | 21 +++----
 21 files changed, 172 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
index 515ec6b..00dd4d3 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.api.mgmt.entitlement.EntitlementManager;
 import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager;
 import org.apache.brooklyn.api.mgmt.rebind.RebindManager;
 import org.apache.brooklyn.api.objs.BrooklynObject;
+import org.apache.brooklyn.api.objs.EntityAdjunct;
 import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry;
 import org.apache.brooklyn.config.StringConfigMap;
 import org.apache.brooklyn.util.guava.Maybe;
@@ -176,6 +177,9 @@ public interface ManagementContext {
      */
     SubscriptionContext getSubscriptionContext(Entity entity);
 
+    /** As {@link #getSubscriptionContext(Entity)} where there is also an adjunct */
+    SubscriptionContext getSubscriptionContext(Entity e, EntityAdjunct a);
+    
     /**
      * Returns a {@link SubscriptionContext} instance representing subscriptions
      * (from the {@link SubscriptionManager}) associated with this location, and capable 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
index b891c2b..c4ce81d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
+++ b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import org.apache.brooklyn.api.objs.BrooklynObject;
 import org.apache.brooklyn.api.objs.EntityAdjunct;
 import org.apache.brooklyn.config.ConfigKey;
@@ -114,8 +115,12 @@ public abstract class ConfigConstraints<T extends BrooklynObject> {
     abstract Iterable<ConfigKey<?>> getBrooklynObjectTypeConfigKeys();
 
     public Iterable<ConfigKey<?>> getViolations() {
-        if (getBrooklynObject() instanceof EntityInternal) {
-            return ((EntityInternal) getBrooklynObject()).getExecutionContext().get(
+        ExecutionContext exec = 
+            getBrooklynObject() instanceof EntityInternal ? ((EntityInternal)getBrooklynObject()).getExecutionContext() :
+            // getBrooklynObject() instanceof AbstractEntityAdjunct ? ((AbstractEntityAdjunct)getBrooklynObject()).getExecutionContext() :
+            null;
+        if (exec!=null) {
+            return exec.get(
                 Tasks.<Iterable<ConfigKey<?>>>builder().dynamic(false).displayName("Validating config").body(
                     () -> validateAll() ).build() );
         } else {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index 940057e..dd7d22d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -139,7 +139,7 @@ public class Poller<V> {
         
         for (final Callable<?> oneOffJob : oneOffJobs) {
             Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).displayName("Poll").description("One-time poll job "+oneOffJob).build();
-            oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task));
+            oneOffTasks.add(feed.getExecutionContext().submit(task));
         }
         
         Duration minPeriod = null;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
index 5cb1027..a68df32 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.api.mgmt.ExecutionManager;
 import org.apache.brooklyn.api.mgmt.ManagementContext;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext;
+import org.apache.brooklyn.api.objs.EntityAdjunct;
 import org.apache.brooklyn.core.mgmt.internal.AbstractManagementContext;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
@@ -59,13 +60,14 @@ import com.google.common.collect.ImmutableSet;
 /** Provides utilities for making Tasks easier to work with in Brooklyn.
  * Main thing at present is to supply (and find) wrapped entities for tasks to understand the
  * relationship of the entity to the task.
- * TODO Longer term it would be better to remove 'tags' on Tasks and use a strongly typed context object.
+ * <p>
+ * Eventually it may be better to replace these 'tags' on Tasks with strongly typed context objects.
  * (Tags there are used mainly for determining who called it (caller), what they called it on (target entity),
  * and what type of task it is (effector, schedule/sensor, etc).)
  */
 public class BrooklynTaskTags extends TaskTags {
 
-    private static final Logger log = LoggerFactory.getLogger(BrooklynTaskTags.WrappedEntity.class);
+    private static final Logger log = LoggerFactory.getLogger(BrooklynTaskTags.class);
 
     /** Tag for tasks which are running on behalf of the management server, rather than any entity */
     public static final String BROOKLYN_SERVER_TASK_TAG = "BROOKLYN-SERVER";
@@ -85,37 +87,66 @@ public class BrooklynTaskTags extends TaskTags {
 
     // ------------- entity tags -------------------------
     
-    public static class WrappedEntity {
+    public abstract static class WrappedItem<T> {
+        /** @deprecated since 0.12.0 going private; use {@link #getWrappingType()} */
+        @Deprecated
         public final String wrappingType;
-        public final Entity entity;
-        protected WrappedEntity(String wrappingType, Entity entity) {
+        protected WrappedItem(String wrappingType) {
             Preconditions.checkNotNull(wrappingType);
-            Preconditions.checkNotNull(entity);
             this.wrappingType = wrappingType;
-            this.entity = entity;
+        }
+        public abstract T unwrap();
+        public String getWrappingType() {
+            return wrappingType;
         }
         @Override
         public String toString() {
-            return "Wrapped["+wrappingType+":"+entity+"]";
+            return "Wrapped["+getWrappingType()+":"+unwrap()+"]";
         }
         @Override
         public int hashCode() {
-            return Objects.hashCode(entity, wrappingType);
+            return Objects.hashCode(unwrap(), getWrappingType());
         }
         @Override
         public boolean equals(Object obj) {
             if (this==obj) return true;
-            if (!(obj instanceof WrappedEntity)) return false;
+            if (!(obj instanceof WrappedItem)) return false;
             return 
-                Objects.equal(entity, ((WrappedEntity)obj).entity) &&
-                Objects.equal(wrappingType, ((WrappedEntity)obj).wrappingType);
+                Objects.equal(unwrap(), ((WrappedItem<?>)obj).unwrap()) &&
+                Objects.equal(getWrappingType(), ((WrappedItem<?>)obj).getWrappingType());
+        }
+    }
+    public static class WrappedEntity extends WrappedItem<Entity> {
+        /** @deprecated since 0.12.0 going private; use {@link #unwrap()} */
+        @Deprecated
+        public final Entity entity;
+        protected WrappedEntity(String wrappingType, Entity entity) {
+            super(wrappingType);
+            this.entity = Preconditions.checkNotNull(entity);
+        }
+        @Override
+        public Entity unwrap() {
+            return entity;
         }
     }
+    public static class WrappedObject<T> extends WrappedItem<T> {
+        private final T object;
+        protected WrappedObject(String wrappingType, T object) {
+            super(wrappingType);
+            this.object = Preconditions.checkNotNull(object);
+        }
+        @Override
+        public T unwrap() {
+            return object;
+        }        
+    }
     
     public static final String CONTEXT_ENTITY = "contextEntity";
     public static final String CALLER_ENTITY = "callerEntity";
     public static final String TARGET_ENTITY = "targetEntity";
     
+    public static final String CONTEXT_ADJUNCT = "contextAdjunct";
+    
     /**
      * Marks a task as running in the context of the entity. This means
      * resolving any relative/context sensitive values against that entity.
@@ -138,6 +169,15 @@ public class BrooklynTaskTags extends TaskTags {
         return new WrappedEntity(TARGET_ENTITY, entity);
     }
 
+    /**
+     * As {@link #tagForContextEntity(Entity)} but wrapping an adjunct.
+     * Tasks with this tag will also have a {@link #tagForContextEntity(Entity)}.
+     */
+    public static WrappedObject<EntityAdjunct> tagForContextAdjunct(EntityAdjunct adjunct) {
+        return new WrappedObject<EntityAdjunct>(CONTEXT_ADJUNCT, adjunct);
+    }
+    
+
     public static WrappedEntity getWrappedEntityTagOfType(Task<?> t, String wrappingType) {
         if (t==null) return null;
         return getWrappedEntityTagOfType( getTagsFast(t), wrappingType);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index 52eed44..f0c9335 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -51,6 +51,7 @@ import org.apache.brooklyn.api.mgmt.entitlement.EntitlementManager;
 import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager;
 import org.apache.brooklyn.api.mgmt.rebind.RebindManager;
 import org.apache.brooklyn.api.objs.BrooklynObject;
+import org.apache.brooklyn.api.objs.EntityAdjunct;
 import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry;
 import org.apache.brooklyn.api.typereg.RegisteredType;
 import org.apache.brooklyn.config.StringConfigMap;
@@ -258,6 +259,13 @@ public abstract class AbstractManagementContext implements ManagementContextInte
         Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e)));
         return new BasicSubscriptionContext(flags, getSubscriptionManager(), e);
     }
+    
+    @Override
+    public SubscriptionContext getSubscriptionContext(Entity e, EntityAdjunct a) {
+        // BSC is a thin wrapper around SM so fine to create a new one here
+        Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e), BrooklynTaskTags.tagForContextAdjunct(a)));
+        return new BasicSubscriptionContext(flags, getSubscriptionManager(), e);
+    }
 
     @Override
     public SubscriptionContext getSubscriptionContext(Location loc) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
index 96cfd33..e192edd 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
@@ -84,8 +84,8 @@ public class EntityManagementSupport {
     
     protected transient ManagementContext initialManagementContext;
     protected transient ManagementContext managementContext;
-    protected transient SubscriptionContext subscriptionContext;
-    protected transient ExecutionContext executionContext;
+    protected transient volatile SubscriptionContext subscriptionContext;
+    protected transient volatile ExecutionContext executionContext;
     
     protected final AtomicBoolean managementContextUsable = new AtomicBoolean(false);
     protected final AtomicBoolean currentlyDeployed = new AtomicBoolean(false);
@@ -357,19 +357,25 @@ public class EntityManagementSupport {
         return (managementContextUsable.get()) ? managementContext : nonDeploymentManagementContext;
     }    
     
-    public synchronized ExecutionContext getExecutionContext() {
+    public ExecutionContext getExecutionContext() {
         if (executionContext!=null) return executionContext;
         if (managementContextUsable.get()) {
-            executionContext = managementContext.getExecutionContext(entity);
-            return executionContext;
+            synchronized (this) {
+                if (executionContext!=null) return executionContext;
+                executionContext = managementContext.getExecutionContext(entity);
+                return executionContext;
+            }
         }
         return nonDeploymentManagementContext.getExecutionContext(entity);
     }
-    public synchronized SubscriptionContext getSubscriptionContext() {
+    public SubscriptionContext getSubscriptionContext() {
         if (subscriptionContext!=null) return subscriptionContext;
         if (managementContextUsable.get()) {
-            subscriptionContext = managementContext.getSubscriptionContext(entity);
-            return subscriptionContext;
+            synchronized (this) {
+                if (subscriptionContext!=null) return subscriptionContext;
+                subscriptionContext = managementContext.getSubscriptionContext(entity);
+                return subscriptionContext;
+            }
         }
         return nonDeploymentManagementContext.getSubscriptionContext(entity);
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
index 2e9238f..91375d2 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
@@ -58,6 +58,7 @@ import org.apache.brooklyn.api.mgmt.rebind.RebindManager;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData;
 import org.apache.brooklyn.api.objs.BrooklynObject;
+import org.apache.brooklyn.api.objs.EntityAdjunct;
 import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry;
 import org.apache.brooklyn.config.StringConfigMap;
 import org.apache.brooklyn.core.catalog.internal.CatalogInitialization;
@@ -101,7 +102,6 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
     private ManagementContextInternal initialManagementContext;
     
     private final QueueingSubscriptionManager qsm;
-    private final BasicSubscriptionContext subscriptionContext;
     private NonDeploymentEntityManager entityManager;
     private NonDeploymentLocationManager locationManager;
     private NonDeploymentAccessManager accessManager;
@@ -112,10 +112,6 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
         this.mode = checkNotNull(mode, "mode");
         qsm = new QueueingSubscriptionManager();
         
-        // For subscription flags, see AbstractManagementContext.getSubscriptionContext. This is 
-        // needed for callbacks, to ensure the correct entity context is set.
-        Map<String, ?> subscriptionFlags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(entity)));
-        subscriptionContext = new BasicSubscriptionContext(subscriptionFlags, qsm, entity);
         entityManager = new NonDeploymentEntityManager(null);
         locationManager = new NonDeploymentLocationManager(null);
         accessManager = new NonDeploymentAccessManager(null);
@@ -254,7 +250,19 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
         if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity);
         if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED)
             throw new IllegalStateException("Entity "+entity+" is no longer managed; subscription context not available");
-        return subscriptionContext;
+        // see also AbstractManagementContext.getSubscriptionContext - needed for callbacks, to ensure the correct entity context is set
+        Map<String, ?> subscriptionFlags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(entity)));
+        return new BasicSubscriptionContext(subscriptionFlags, qsm, entity);
+    }
+    
+    @Override
+    public SubscriptionContext getSubscriptionContext(Entity entity, EntityAdjunct adjunct) {
+        if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity);
+        if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED)
+            throw new IllegalStateException("Entity "+entity+" is no longer managed; subscription context not available");
+        // see also AbstractManagementContext.getSubscriptionContext - needed for callbacks, to ensure the correct entity context is set
+        Map<String, ?> subscriptionFlags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(entity), BrooklynTaskTags.tagForContextAdjunct(adjunct)));
+        return new BasicSubscriptionContext(subscriptionFlags, qsm, entity);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index 2c4c4b4..9018d5d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 import javax.annotation.Nullable;
 
@@ -37,6 +36,7 @@ import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.entity.Group;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.objs.BrooklynObject;
@@ -54,12 +54,15 @@ import org.apache.brooklyn.core.enricher.AbstractEnricher;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.entity.internal.ConfigUtilsInternal;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.flags.FlagUtils;
 import org.apache.brooklyn.util.core.flags.SetFromFlag;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.core.task.BasicExecutionContext;
 import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,6 +89,8 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
     @Deprecated
     protected Map<String,Object> leftoverProperties = Maps.newLinkedHashMap();
 
+    /** @deprecated since 0.12.0, going private, use {@link #getExecutionContext()} */
+    @Deprecated
     protected transient ExecutionContext execution;
 
     private final BasicConfigurationSupport config = new BasicConfigurationSupport();
@@ -213,6 +218,14 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
         return _legacyNoConstructionInit;
     }
 
+    /** If the entity has been set, returns the execution context indicating this adjunct.
+     * Primarily intended for this adjunct to execute tasks, but in some cases, mainly low level,
+     * it may make sense for other components to execute tasks against this adjunct. */
+    @Beta
+    public ExecutionContext getExecutionContext() {
+        return execution;
+    }
+    
     @Override
     public ConfigurationSupportInternal config() {
         return config;
@@ -276,7 +289,7 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
             synchronized (AbstractEntityAdjunct.this) {
                 if (_subscriptionTracker!=null) return _subscriptionTracker;
                 if (entity==null) return null;
-                _subscriptionTracker = new SubscriptionTracker(((EntityInternal)entity).subscriptions().getSubscriptionContext());
+                _subscriptionTracker = new SubscriptionTracker(getManagementContext().getSubscriptionContext(entity, AbstractEntityAdjunct.this));
                 return _subscriptionTracker;
             }
         }
@@ -334,7 +347,7 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
 
         @Override
         protected ExecutionContext getContext() {
-            return AbstractEntityAdjunct.this.execution;
+            return AbstractEntityAdjunct.this.getExecutionContext();
         }
 
         @Override
@@ -402,10 +415,21 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
         this.name = name;
     }
 
+    @Override
+    public ManagementContext getManagementContext() {
+        ManagementContext result = super.getManagementContext();
+        if (result!=null) return result;
+        if (entity!=null) {
+            return ((EntityInternal)entity).getManagementContext();
+        }
+        return null;
+    }
+    
     public void setEntity(EntityLocal entity) {
         if (destroyed.get()) throw new IllegalStateException("Cannot set entity on a destroyed entity adjunct");
         this.entity = entity;
-        this.execution = ((EntityInternal) entity).getExecutionContext();
+        this.execution = new BasicExecutionContext( getManagementContext().getExecutionManager(),
+                MutableList.of(BrooklynTaskTags.tagForContextAdjunct(this), BrooklynTaskTags.tagForContextEntity(entity)) );
         if (entity!=null && getCatalogItemId() == null) {
             setCatalogItemIdAndSearchPath(entity.getCatalogItemId(), entity.getCatalogItemIdSearchPath());
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java b/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java
index 602d943..71fe16c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java
@@ -69,9 +69,7 @@ public class AdjunctConfigMap extends AbstractConfigMapImpl<EntityAdjunct> {
 
     @Override
     protected ExecutionContext getExecutionContext(BrooklynObject bo) {
-        // TODO expose ((AbstractEntityAdjunct)bo).execution ?
-        Entity entity = ((AbstractEntityAdjunct)bo).entity;
-        return (entity != null) ? ((EntityInternal)entity).getExecutionContext() : null;
+        return ((AbstractEntityAdjunct)bo).getExecutionContext();
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java b/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
index ea2ec55..373c667 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
@@ -186,7 +186,7 @@ public class SshCommandMembershipTrackingPolicy extends AbstractMembershipTracki
 
         // Try to resolve the configuration in the env Map
         try {
-            env = (Map<String, Object>) Tasks.resolveDeepValue(env, Object.class, ((EntityInternal) entity).getExecutionContext());
+            env = (Map<String, Object>) Tasks.resolveDeepValue(env, Object.class, getExecutionContext());
         } catch (InterruptedException | ExecutionException e) {
             throw Exceptions.propagate(e);
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
index ca41304..3f78c3a 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java
@@ -229,7 +229,7 @@ public class ShellFeed extends AbstractFeed {
 
             final ProcessTaskFactory<?> taskFactory = newTaskFactory(pollInfo.command, pollInfo.env, pollInfo.dir, 
                     pollInfo.input, pollInfo.context, pollInfo.timeout);
-            final ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext();
+            final ExecutionContext executionContext = getExecutionContext();
 
             getPoller().scheduleAtFixedRate(
                     new Callable<SshPollValue>() {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 429cad7..1b85663 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -44,6 +44,7 @@ import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedItem;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
@@ -107,8 +108,8 @@ public class BasicExecutionContext extends AbstractExecutionContext {
         // which may require access to internal methods
         // (could remove this check if generalizing; it has been here for a long time and the problem seems gone)
         for (Object tag: tags) {
-            if (tag instanceof BrooklynTaskTags.WrappedEntity) {
-                if (Proxy.isProxyClass(((WrappedEntity)tag).entity.getClass())) {
+            if (tag instanceof BrooklynTaskTags.WrappedItem) {
+                if (Proxy.isProxyClass(((WrappedItem<?>)tag).unwrap().getClass())) {
                     log.warn(""+this+" has entity proxy in "+tag);
                 }
             }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
index 59ee3cf..97f02b7 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
@@ -40,6 +40,7 @@ import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.internal.BrooklynProperties;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedItem;
 import org.apache.brooklyn.core.sensor.BasicAttributeSensor;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests;
@@ -318,8 +319,8 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
             if (tag instanceof Entity && ((Entity)tag).getId().equals(eId)) {
                 fail("tags contains unmanaged entity "+tag);
             }
-            if ((tag instanceof WrappedEntity) && ((WrappedEntity)tag).entity.getId().equals(eId) 
-                    && ((WrappedEntity)tag).wrappingType.equals(BrooklynTaskTags.CONTEXT_ENTITY)) {
+            if ((tag instanceof WrappedEntity) && ((WrappedEntity)tag).unwrap().getId().equals(eId) 
+                    && ((WrappedItem<?>)tag).getWrappingType().equals(BrooklynTaskTags.CONTEXT_ENTITY)) {
                 fail("tags contains unmanaged entity (wrapped) "+tag);
             }
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
index 990e7f7..8c656a3 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
@@ -248,8 +248,8 @@ public class TasksTest extends BrooklynAppUnitTestSupport {
             for (Object tag : Tasks.current().getTags()) {
                 if (tag instanceof WrappedEntity) {
                     WrappedEntity wrapped = (WrappedEntity)tag;
-                    if (BrooklynTaskTags.CONTEXT_ENTITY.equals(wrapped.wrappingType)) {
-                        context.add(wrapped.entity);
+                    if (BrooklynTaskTags.CONTEXT_ENTITY.equals(wrapped.getWrappingType())) {
+                        context.add(wrapped.unwrap());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java b/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java
index 2078779..555d7cb 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java
@@ -110,11 +110,7 @@ public class CreateUserPolicy extends AbstractPolicy implements SensorEventListe
     }
 
     protected void addUserAsync(final Entity entity, final SshMachineLocation machine) {
-        ((EntityInternal)entity).getExecutionContext().execute(new Runnable() {
-            @Override
-            public void run() {
-                addUser(entity, machine);
-            }});
+        getExecutionContext().execute(() -> addUser(entity, machine));
     }
     
     protected void addUser(Entity entity, SshMachineLocation machine) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
index 840335a..c6bd33d 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
@@ -21,7 +21,6 @@ package org.apache.brooklyn.policy.ha;
 import static org.apache.brooklyn.util.time.Time.makeTimeStringRounded;
 
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -30,19 +29,18 @@ import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
-import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.flags.SetFromFlag;
 import org.apache.brooklyn.util.core.task.BasicTask;
 import org.apache.brooklyn.util.core.task.ScheduledTask;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Duration;
 import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.reflect.TypeToken;
 
@@ -194,7 +192,7 @@ public abstract class AbstractFailureDetector extends AbstractPolicy {
     protected void doStartPolling() {
         if (scheduledTask == null || scheduledTask.isDone()) {
             ScheduledTask task = ScheduledTask.builder(pollingTaskFactory).displayName( getTaskName() ).period(getPollPeriod()).build();
-            scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task);
+            scheduledTask = getExecutionContext().submit(task);
         }
     }
 
@@ -270,7 +268,6 @@ public abstract class AbstractFailureDetector extends AbstractPolicy {
         schedulePublish(0);
     }
 
-    @SuppressWarnings("unchecked")
     protected void schedulePublish(long delay) {
         if (isRunning() && executorQueued.compareAndSet(false, true)) {
             long now = System.currentTimeMillis();
@@ -279,8 +276,9 @@ public abstract class AbstractFailureDetector extends AbstractPolicy {
 
             Runnable job = new PublishJob();
 
-            ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask<Void>(job));
-            ((EntityInternal)entity).getExecutionContext().submit(task);
+            ScheduledTask task = ScheduledTask.builder(() -> Tasks.builder().body(job).dynamic(false).displayName("Failure detector iteration").build())
+                .delay(Duration.millis(delay)).displayName("Failure detector scheduler").build();
+            getExecutionContext().submit(task);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
index 2cbbf28..e143582 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java
@@ -39,6 +39,7 @@ import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.flags.SetFromFlag;
 import org.apache.brooklyn.util.core.task.BasicTask;
 import org.apache.brooklyn.util.core.task.ScheduledTask;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.time.Duration;
@@ -314,7 +315,6 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
         return description;
     }
     
-    @SuppressWarnings({ "rawtypes" })
     protected void recomputeAfterDelay(long delay) {
         // TODO Execute in same thread as other onEvent calls are done in (i.e. same conceptually 
         // single-threaded executor as the subscription-manager will use).
@@ -352,8 +352,9 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
             }
         };
         
-        ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job));
-        ((EntityInternal)entity).getExecutionContext().submit(task);
+        ScheduledTask task = ScheduledTask.builder(() -> Tasks.builder().body(job).dynamic(false).displayName("Failure detector recompute").build())
+            .delay(Duration.millis(delay)).displayName("Failure detector recompute after delay").build();
+        getExecutionContext().submit(task);
     }
     
     private String getTimeStringSince(Long time) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
index 3a1ba80..e14433d 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
@@ -129,10 +129,7 @@ public class ServiceReplacer extends AbstractPolicy {
                     if (isRunning()) {
                         highlightViolation("Failure detected");
                         LOG.warn("ServiceReplacer notified; dispatching job for "+entity+" ("+event.getValue()+")");
-                        ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
-                            @Override public void run() {
-                                onDetectedFailure(event);
-                            }});
+                        getExecutionContext().submit(() -> onDetectedFailure(event));
                     } else {
                         LOG.warn("ServiceReplacer not running, so not acting on failure detected at "+entity+" ("+event.getValue()+", child of "+entity+")");
                     }
@@ -176,10 +173,7 @@ public class ServiceReplacer extends AbstractPolicy {
         
         highlightViolation(violationText+", triggering restart");
         LOG.warn("ServiceReplacer acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")");
-        Task<?> t = ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
-
-            @Override
-            public void run() {
+        Task<?> t = getExecutionContext().submit(() -> {
                 try {
                     Entities.invokeEffectorWithArgs(entity, entity, MemberReplaceable.REPLACE_MEMBER, failedEntity.getId()).get();
                     consecutiveReplacementFailureTimes.clear();
@@ -191,8 +185,7 @@ public class ServiceReplacer extends AbstractPolicy {
                     highlightViolation(violationText+" and replace attempt failed: "+Exceptions.collapseText(e));
                     onReplacementFailed("Replace failure ("+Exceptions.collapseText(e)+") at "+entity+": "+reason);
                 }
-            }
-        });
+            });
         highlightAction("Replacing "+failedEntity, t);
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
index 8faaf89..a31e3c0 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
@@ -112,10 +112,7 @@ public class ServiceRestarter extends AbstractPolicy {
                     
                     if (isRunning()) {
                         LOG.info("ServiceRestarter notified; dispatching job for "+entity+" ("+event.getValue()+")");
-                        ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
-                            @Override public void run() {
-                                onDetectedFailure(event);
-                            }});
+                        getExecutionContext().submit(() -> onDetectedFailure(event));
                     } else {
                         LOG.warn("ServiceRestarter not running, so not acting on failure detected at "+entity+" ("+event.getValue()+")");
                     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefAttributeFeed.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefAttributeFeed.java b/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefAttributeFeed.java
index f6d2615..0252dc2 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefAttributeFeed.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefAttributeFeed.java
@@ -225,9 +225,8 @@ public class ChefAttributeFeed extends AbstractFeed {
             @Override
             public SshPollValue call() throws Exception {
                 ProcessTaskWrapper<String> taskWrapper = knifeTaskFactory.newTask();
-                final ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext();
                 log.debug("START: Running knife to query attributes of Chef node {}", nodeName);
-                executionContext.submit(taskWrapper);
+                getExecutionContext().submit(taskWrapper);
                 taskWrapper.block();
                 log.debug("DONE:  Running knife to query attributes of Chef node {}", nodeName);
                 return new SshPollValue(null, taskWrapper.getExitCode(), taskWrapper.getStdout(), taskWrapper.getStderr());
@@ -235,7 +234,7 @@ public class ChefAttributeFeed extends AbstractFeed {
         };
 
         getPoller().scheduleAtFixedRate(
-                new CallInEntityExecutionContext<SshPollValue>(entity, getAttributesFromKnife),
+                new CallInExecutionContext<SshPollValue>(this, getAttributesFromKnife),
                 new SendChefAttributesToSensors(entity, polls),
                 minPeriod);
     }
@@ -269,20 +268,19 @@ public class ChefAttributeFeed extends AbstractFeed {
      *
      * @param <T> The type of the {@link Callable}.
      */
-    private static class CallInEntityExecutionContext<T> implements Callable<T> {
+    private static class CallInExecutionContext<T> implements Callable<T> {
 
         private final Callable<T> job;
-        private Entity entity;
+        private AbstractFeed feed;
 
-        private CallInEntityExecutionContext(Entity entity, Callable<T> job) {
+        private CallInExecutionContext(AbstractFeed feed, Callable<T> job) {
             this.job = job;
-            this.entity = entity;
+            this.feed = feed;
         }
 
         @Override
         public T call() throws Exception {
-            final ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext();
-            return executionContext.submit(Maps.newHashMap(), job).get();
+            return feed.getExecutionContext().submit(Maps.newHashMap(), job).get();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4c9fe12/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
----------------------------------------------------------------------
diff --git a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
index d5d9751..55b273e 100644
--- a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
+++ b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
@@ -211,14 +211,14 @@ public class WindowsPerformanceCounterFeed extends AbstractFeed {
         String command = JOINER_ON_SPACE.join(allParams);
         log.debug("Windows performance counter poll command for {} will be: {}", entity, command);
 
-        GetPerformanceCountersJob<WinRmToolResponse> job = new GetPerformanceCountersJob(getEntity(), command);
+        GetPerformanceCountersJob job = new GetPerformanceCountersJob(getEntity(), command);
         getPoller().scheduleAtFixedRate(
-                new CallInEntityExecutionContext(entity, job),
+                new CallInExecutionContext<WinRmToolResponse>(this, job),
                 new SendPerfCountersToSensors(getEntity(), polls),
                 minPeriod);
     }
 
-    private static class GetPerformanceCountersJob<T> implements Callable<T> {
+    private static class GetPerformanceCountersJob implements Callable<WinRmToolResponse> {
 
         private final Entity entity;
         private final String command;
@@ -229,15 +229,14 @@ public class WindowsPerformanceCounterFeed extends AbstractFeed {
         }
 
         @Override
-        @SuppressWarnings("unchecked")
-        public T call() throws Exception {
+        public WinRmToolResponse call() throws Exception {
             Maybe<WinRmMachineLocation> machineLocationMaybe = Machines.findUniqueMachineLocation(entity.getLocations(), WinRmMachineLocation.class);
             if (machineLocationMaybe.isAbsent()) {
                 return null;
             }
             WinRmMachineLocation machine = EffectorTasks.getMachine(entity, WinRmMachineLocation.class);
             WinRmToolResponse response = machine.executePsScript(command);
-            return (T)response;
+            return response;
         }
     }
 
@@ -254,18 +253,18 @@ public class WindowsPerformanceCounterFeed extends AbstractFeed {
      *
      * @param <T> The type of the {@link java.util.concurrent.Callable}.
      */
-    private static class CallInEntityExecutionContext<T> implements Callable<T> {
+    private static class CallInExecutionContext<T> implements Callable<T> {
         private final Callable<T> job;
-        private Entity entity;
+        private AbstractFeed feed;
 
-        private CallInEntityExecutionContext(Entity entity, Callable<T> job) {
+        private CallInExecutionContext(AbstractFeed feed, Callable<T> job) {
             this.job = job;
-            this.entity = entity;
+            this.feed = feed;
         }
 
         @Override
         public T call() throws Exception {
-            ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext();
+            ExecutionContext executionContext = feed.getExecutionContext();
             return executionContext.submit(Maps.newHashMap(), job).get();
         }
     }


[02/23] brooklyn-server git commit: task visibility: entity mgmt create and startup wrapped in its own task

Posted by he...@apache.org.
task visibility: entity mgmt create and startup wrapped in its own task


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/7f4d7bd8
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/7f4d7bd8
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/7f4d7bd8

Branch: refs/heads/master
Commit: 7f4d7bd87e0e0e1d98ed49d875e601a21e0635c8
Parents: 37b6b11
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 12 15:01:09 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 17:10:54 2017 +0100

----------------------------------------------------------------------
 .../core/mgmt/EntityManagementUtils.java        |  8 +++---
 .../mgmt/internal/EntityManagementSupport.java  | 19 ++++++++-----
 .../mgmt/internal/LocalSubscriptionManager.java |  2 ++
 .../internal/QueueingSubscriptionManager.java   |  4 +--
 .../internal/EntityExecutionManagerTest.java    | 28 ++++++++++++++------
 5 files changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
index 0cd18fc..de9964c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java
@@ -91,9 +91,11 @@ public class EntityManagementUtils {
      */
     @Beta
     public static <T extends Application> T createUnstarted(ManagementContext mgmt, EntitySpec<T> spec, Optional<String> entityId) {
-        // TODO wrap in task
-        T app = ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId);
-        return app;
+        return mgmt.getServerExecutionContext().get(Tasks.<T>builder().dynamic(false)
+            .displayName("Creating entity "+
+                (Strings.isNonBlank(spec.getDisplayName()) ? spec.getDisplayName() : spec.getType().getName()) )
+            .body(() -> ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId))
+            .build() );
     }
 
     /** as {@link #createUnstarted(ManagementContext, EntitySpec)} but for a string plan (e.g. camp yaml) */

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
index 97c7bac..96cfd33 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java
@@ -38,10 +38,12 @@ import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.entity.AbstractEntity;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.EntityAndItem;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.StringAndArgument;
 import org.apache.brooklyn.core.mgmt.internal.NonDeploymentManagementContext.NonDeploymentManagementContextMode;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -159,9 +161,10 @@ public class EntityManagementSupport {
     }
     
     public void onManagementStarting(ManagementTransitionInfo info) {
-        try {
-            // TODO same-thread task on this entity, with internal tag ?
-            synchronized (this) {
+        info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management starting")
+            .dynamic(false)
+            .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+            .body(() -> { try { synchronized (this) {
                 boolean alreadyManaging = isDeployed();
                 
                 if (alreadyManaging) {
@@ -212,13 +215,15 @@ public class EntityManagementSupport {
         } catch (Throwable t) {
             managementFailed.set(true);
             throw Exceptions.propagate(t);
-        }
+        }}).build() );
     }
 
     @SuppressWarnings("deprecation")
     public void onManagementStarted(ManagementTransitionInfo info) {
-        try {
-            synchronized (this) {
+        info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management started")
+            .dynamic(false)
+            .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+            .body(() -> { try { synchronized (this) {
                 boolean alreadyManaged = isFullyManaged();
                 
                 if (alreadyManaged) {
@@ -265,7 +270,7 @@ public class EntityManagementSupport {
         } catch (Throwable t) {
             managementFailed.set(true);
             throw Exceptions.propagate(t);
-        }
+        }}).build() );
     }
     
     @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 2349c73..983b307 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -229,6 +229,8 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
             .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
             .add(s.subscriberExecutionManagerTag)
             .add(BrooklynTaskTags.SENSOR_TAG)
+            // associate the publish event with the publisher (though on init it might be triggered by subscriber)
+            .addIfNotNull(event.getSource()!=null ? BrooklynTaskTags.tagForTargetEntity(event.getSource()) : null)
             .build()
             .asUnmodifiable();
         

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
index 83facda..290520d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
@@ -76,7 +76,7 @@ public class QueueingSubscriptionManager extends AbstractSubscriptionManager {
     
     @SuppressWarnings("unchecked")
     public synchronized void startDelegatingForSubscribing() {
-        // TODO wrap in same-thread task
+        // could wrap in same-thread task, but there's enough context without it
         assert delegate!=null;
         for (QueuedSubscription s: queuedSubscriptions) {
             delegate.subscribe(s.flags, s.s);
@@ -87,7 +87,7 @@ public class QueueingSubscriptionManager extends AbstractSubscriptionManager {
     
     @SuppressWarnings("unchecked")
     public synchronized void startDelegatingForPublishing() {
-        // TODO wrap in same-thread task
+        // could wrap in same-thread task, but there's enough context without it
         assert delegate!=null;
         for (SensorEvent evt: queuedSensorEvents) {
             delegate.publish(evt);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/7f4d7bd8/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
index 51f5bdc..59ee3cf 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java
@@ -46,8 +46,10 @@ import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.task.BasicExecutionManager;
 import org.apache.brooklyn.util.core.task.ExecutionListener;
+import org.apache.brooklyn.util.core.task.ScheduledTask;
 import org.apache.brooklyn.util.core.task.TaskBuilder;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.javalang.JavaClassNames;
@@ -129,18 +131,28 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         return Tasks.builder().displayName(name).dynamic(false).body(Callables.returning(null));
     }
 
-    protected void assertTaskCountForEntityEventually(final Entity entity, final int expectedCount) {
+    protected void assertImportantTaskCountForEntityEventually(final Entity entity, final int expectedCount) {
         // Dead task (and initialization task) should have been GC'd on completion.
         // However, the GC'ing happens in a listener, executed in a different thread - the task.get()
         // doesn't block for it. Therefore can't always guarantee it will be GC'ed by now.
         Asserts.succeedsEventually(new Runnable() {
             @Override public void run() {
-                forceGc();
-                Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity);
+                forceGc();  
+                Collection<Task<?>> tasks = removeSystemTasks(BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity));
                 Assert.assertEquals(tasks.size(), expectedCount, "Tasks were "+tasks);
             }});
     }
 
+    static Set<Task<?>> removeSystemTasks(Iterable<Task<?>> tasks) {
+        Set<Task<?>> result = MutableSet.of();
+        for (Task<?> t: tasks) {
+            if (t instanceof ScheduledTask) continue;
+            if (t.getTags().contains(BrooklynTaskTags.SENSOR_TAG)) continue;
+            result.add(t);
+        }
+        return result;
+    }
+
     // Needed because of https://issues.apache.org/jira/browse/BROOKLYN-401
     protected void assertTaskMaxCountForEntityEventually(final Entity entity, final int expectedMaxCount) {
         // Dead task (and initialization task) should have been GC'd on completion.
@@ -149,7 +161,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         Asserts.succeedsEventually(new Runnable() {
             @Override public void run() {
                 forceGc();
-                Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity);
+                Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) );
                 Assert.assertTrue(tasks.size() <= expectedMaxCount,
                         "Expected tasks count max of " + expectedMaxCount + ". Tasks were "+tasks);
             }});
@@ -161,8 +173,8 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         final Task<?> task = runEmptyTaskWithNameAndTags(e, "should-be-kept", ManagementContextInternal.NON_TRANSIENT_TASK_TAG);
         runEmptyTaskWithNameAndTags(e, "should-be-gcd", ManagementContextInternal.TRANSIENT_TASK_TAG);
         
-        assertTaskCountForEntityEventually(e, 1);
-        Collection<Task<?>> tasks = BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e);
+        assertImportantTaskCountForEntityEventually(e, 1);
+        Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e) );
         assertEquals(tasks, ImmutableList.of(task), "Mismatched tasks, got: "+tasks);
     }
 
@@ -320,7 +332,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         // allow background enrichers to complete
         Time.sleep(Duration.ONE_SECOND);
         forceGc();
-        List<Task<?>> t1 = em.getAllTasks();
+        Collection<Task<?>> t1 = em.getAllTasks();
 
         TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
         entity.sensors().set(TestEntity.NAME, "bob");
@@ -328,7 +340,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport {
         Entities.destroy(entity);
         Time.sleep(Duration.ONE_SECOND);
         forceGc();
-        List<Task<?>> t2 = em.getAllTasks();
+        Collection<Task<?>> t2 = em.getAllTasks();
 
         Assert.assertEquals(t1.size(), t2.size(), "lists are different:\n"+t1+"\n"+t2+"\n");
     }


[17/23] brooklyn-server git commit: Merge branch 'master' into tasks-4

Posted by he...@apache.org.
Merge branch 'master' into tasks-4


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/0c2e1f60
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/0c2e1f60
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/0c2e1f60

Branch: refs/heads/master
Commit: 0c2e1f60e7781def8b4a8be744e4ee9244658212
Parents: bb26d32 baf8070
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Oct 4 03:00:04 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 03:00:04 2017 +0100

----------------------------------------------------------------------
 .../org/apache/brooklyn/api/entity/Entity.java  |   3 +
 .../brooklyn/api/mgmt/rebind/RebindManager.java |  14 +-
 .../org/apache/brooklyn/api/sensor/Feed.java    |   2 +-
 ...loudsCustomizerInstantiationYamlDslTest.java | 156 +++++--
 .../catalog/SpecParameterUnwrappingTest.java    |   2 +-
 .../brooklyn/core/catalog/CatalogLoadMode.java  |  73 ----
 .../catalog/internal/BasicBrooklynCatalog.java  |  28 +-
 .../catalog/internal/CatalogInitialization.java | 278 ++++++++-----
 .../core/catalog/internal/CatalogUtils.java     |  31 +-
 .../brooklyn/core/entity/AbstractEntity.java    |  34 +-
 .../core/entity/BrooklynConfigKeys.java         |   2 +-
 .../brooklyn/core/entity/EntityAdjuncts.java    |  28 +-
 .../brooklyn/core/entity/EntityInternal.java    |  16 +-
 .../apache/brooklyn/core/feed/AbstractFeed.java |   2 +-
 .../mgmt/ha/HighAvailabilityManagerImpl.java    |  45 +-
 .../core/mgmt/ha/OsgiArchiveInstaller.java      | 210 +++++++---
 .../brooklyn/core/mgmt/ha/OsgiManager.java      |  36 +-
 .../internal/AbstractManagementContext.java     |   5 +-
 .../core/mgmt/internal/LocalUsageManager.java   |  31 +-
 .../ManagementNodeStateListenerManager.java     | 175 ++++++++
 .../NonDeploymentManagementContext.java         |  19 -
 .../rebind/ImmediateDeltaChangeListener.java    | 160 --------
 .../core/mgmt/rebind/RebindIteration.java       | 213 ++++------
 .../core/mgmt/rebind/RebindManagerImpl.java     |  30 --
 .../mgmt/rebind/dto/BasicEntityMemento.java     |   2 +-
 .../mgmt/rebind/dto/MementosGenerators.java     |   5 +-
 .../mgmt/usage/ManagementNodeStateListener.java |  53 +++
 .../core/objs/BasicConfigurableObject.java      |   4 +-
 .../apache/brooklyn/core/policy/Policies.java   |  66 +--
 .../core/server/BrooklynServerConfig.java       |  28 +-
 .../FixedListMachineProvisioningLocation.java   |   3 +-
 .../brooklyn/util/core/ClassLoaderUtils.java    |   7 +-
 .../brooklyn/util/core/flags/TypeCoercions.java |  21 +
 .../brooklyn/util/core/osgi/BundleMaker.java    |  38 +-
 .../brooklyn/util/core/task/BasicTask.java      |   5 +-
 .../core/entity/EntityConcurrencyTest.java      |   2 +-
 .../ha/HighAvailabilityManagerInMemoryTest.java |   5 -
 .../ha/HighAvailabilityManagerTestFixture.java  |  61 ++-
 .../mgmt/rebind/RebindCatalogEntityTest.java    |   2 +-
 .../core/mgmt/rebind/RebindCatalogItemTest.java |   2 -
 .../core/mgmt/rebind/RebindFailuresTest.java    |  18 +-
 .../core/mgmt/rebind/RebindFeedWithHaTest.java  |   2 +-
 .../usage/ManagementNodeStateListenerTest.java  | 160 ++++++++
 .../util/core/ClassLoaderUtilsTest.java         |   6 +-
 .../util/core/internal/TypeCoercionsTest.java   |  31 ++
 .../core/internal/ssh/RecordingSshTool.java     |   7 +
 .../util/core/osgi/BundleMakerTest.java         |   8 +-
 .../bundlemaker/withmanifest/myemptyfile.txt    |   0
 .../launcher/osgi/OsgiLauncherImpl.java         |   2 +-
 .../init/src/main/resources/catalog-classes.bom | 364 ----------------
 karaf/init/src/main/resources/catalog.bom       |  25 ++
 .../BrooklynLauncherRebindCatalogTest.java      |   2 +-
 .../brooklyn/launcher/BrooklynLauncherTest.java |   2 +-
 .../launcher/CleanOrphanedAdjunctsTest.java     |   6 +-
 .../CleanOrphanedLocationsIntegrationTest.java  |   2 +-
 .../jclouds/LocationCustomizerDelegate.java     |   8 +-
 ...ailabilityManagerJcloudsObjectStoreTest.java |   7 +-
 .../apache/brooklyn/rest/api/AdjunctApi.java    | 237 +++++++++++
 .../apache/brooklyn/rest/api/CatalogApi.java    |  34 +-
 .../brooklyn/rest/api/EntityConfigApi.java      |  10 +-
 .../org/apache/brooklyn/rest/api/PolicyApi.java |  16 +-
 .../brooklyn/rest/api/PolicyConfigApi.java      |  35 +-
 .../brooklyn/rest/domain/AdjunctDetail.java     |  72 ++++
 .../brooklyn/rest/domain/AdjunctSummary.java    | 148 +++++++
 .../brooklyn/rest/domain/ApplicationSpec.java   |  12 +-
 .../rest/domain/CatalogEnricherSummary.java     |   4 +
 .../rest/domain/CatalogEntitySummary.java       |   4 +
 .../rest/domain/CatalogItemSummary.java         |   4 +
 .../rest/domain/CatalogLocationSummary.java     |   4 +
 .../rest/domain/CatalogPolicySummary.java       |   4 +
 .../brooklyn/rest/domain/EffectorSummary.java   |   9 +-
 .../brooklyn/rest/domain/EntityDetail.java      |  14 +-
 .../brooklyn/rest/domain/EntitySummary.java     |  11 +-
 .../rest/domain/PolicyConfigSummary.java        |   2 +-
 .../brooklyn/rest/domain/PolicySummary.java     |  83 +---
 .../rest/domain/ScriptExecutionSummary.java     |  11 +-
 .../brooklyn/rest/domain/SensorSummary.java     |   7 +-
 .../org/apache/brooklyn/rest/domain/Status.java |   7 +-
 .../apache/brooklyn/rest/BrooklynRestApi.java   |   2 +
 .../rest/resources/AdjunctResource.java         | 273 ++++++++++++
 .../rest/resources/CatalogResource.java         |  21 +-
 .../rest/resources/EntityConfigResource.java    |  10 +-
 .../rest/resources/PolicyConfigResource.java    |   1 +
 .../brooklyn/rest/resources/PolicyResource.java |   2 +-
 .../rest/transform/AdjunctTransformer.java      |  96 +++++
 .../rest/transform/CatalogTransformer.java      |  30 +-
 .../rest/transform/ConfigTransformer.java       | 175 ++++++++
 .../rest/transform/EntityTransformer.java       | 107 +++--
 .../rest/transform/PolicyTransformer.java       |   3 +
 .../rest/transform/SensorTransformer.java       |  21 +-
 .../rest/transform/TypeTransformer.java         |  10 +-
 .../rest/util/BrooklynRestResourceUtils.java    |  44 ++
 .../rest/resources/AdjunctResourceTest.java     | 198 +++++++++
 .../rest/testing/BrooklynRestResourceTest.java  |   2 +-
 .../main/java/org/apache/brooklyn/cli/Main.java |  21 +-
 .../brooklyn/cli/lister/ItemDescriptors.java    |  26 +-
 .../java/org/apache/brooklyn/cli/CliTest.java   |  32 +-
 .../brooklyn/entity/chef/ChefAttributeFeed.java | 411 -------------------
 .../entity/chef/ChefAttributePollConfig.java    |  61 ---
 .../brooklyn/entity/chef/ChefBashCommands.java  |  42 --
 .../apache/brooklyn/entity/chef/ChefConfig.java |  94 -----
 .../brooklyn/entity/chef/ChefConfigs.java       | 102 -----
 .../apache/brooklyn/entity/chef/ChefEntity.java |  26 --
 .../brooklyn/entity/chef/ChefEntityImpl.java    |  39 --
 .../entity/chef/ChefLifecycleEffectorTasks.java | 364 ----------------
 .../brooklyn/entity/chef/ChefServerTasks.java   |  97 -----
 .../brooklyn/entity/chef/ChefSoloDriver.java    |  85 ----
 .../brooklyn/entity/chef/ChefSoloTasks.java     |  70 ----
 .../apache/brooklyn/entity/chef/ChefTasks.java  | 154 -------
 .../entity/chef/KnifeConvergeTaskFactory.java   | 249 -----------
 .../brooklyn/entity/chef/KnifeTaskFactory.java  | 241 -----------
 .../entity/resolve/ChefEntitySpecResolver.java  |  42 --
 ...oklyn.core.resolve.entity.EntitySpecResolver |   1 -
 .../resources/OSGI-INF/blueprint/blueprint.xml  |   5 -
 software/base/src/main/resources/catalog.bom    |  88 ----
 .../brooklyn/entity/chef/ChefConfigsTest.java   |  40 --
 .../entity/chef/ChefLiveTestSupport.java        |  99 -----
 .../chef/ChefServerTasksIntegrationTest.java    | 109 -----
 .../AbstractChefToyMySqlEntityLiveTest.java     |  41 --
 .../ChefSoloDriverMySqlEntityLiveTest.java      |  49 ---
 .../mysql/ChefSoloDriverToyMySqlEntity.java     |  89 ----
 ...micChefAutodetectToyMySqlEntityLiveTest.java |  43 --
 ...DynamicChefServerToyMySqlEntityLiveTest.java |  50 ---
 .../DynamicChefSoloToyMySqlEntityLiveTest.java  |  43 --
 .../chef/mysql/DynamicToyMySqlEntityChef.java   |  81 ----
 .../chef/mysql/TypedToyMySqlEntityChef.java     |  55 ---
 .../apache/brooklyn/util/stream/Streams.java    |  39 +-
 .../brooklyn/util/stream/StreamsTest.java       |  17 +
 128 files changed, 2905 insertions(+), 4370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0c2e1f60/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0c2e1f60/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0c2e1f60/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0c2e1f60/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java
----------------------------------------------------------------------


[08/23] brooklyn-server git commit: task visibility: entity initialization

Posted by he...@apache.org.
task visibility: entity initialization


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/e1f948ad
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/e1f948ad
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/e1f948ad

Branch: refs/heads/master
Commit: e1f948ad583af01138519a17a9d79d400ad0511a
Parents: 0a1acec
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 19 13:19:39 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Sep 19 13:19:39 2017 +0100

----------------------------------------------------------------------
 .../core/mgmt/internal/LocalSubscriptionManager.java        | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/e1f948ad/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index a9fb70b..65e4b14 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -145,8 +145,13 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
                 LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
             } else {
                 if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
-                T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
-                submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
+                em.submit(
+                    MutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(s.producer)),
+                        "displayName", "Initial publication of "+s.sensor.getName()),
+                    () -> {
+                        T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
+                        submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
+                    });
             }
         }
         


[10/23] brooklyn-server git commit: include adjunct info as a subscription description

Posted by he...@apache.org.
include adjunct info as a subscription description

description could be used for more things, exposed more broadly, but this is needed for now to make sense of those tasks


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/b0556dec
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/b0556dec
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/b0556dec

Branch: refs/heads/master
Commit: b0556decc88a1d5bdc1c45500ceeb7c2eb558716
Parents: e1f948a
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 19 14:30:19 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Sep 20 10:10:25 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/core/mgmt/internal/AbstractManagementContext.java  | 3 ++-
 .../core/mgmt/internal/AbstractSubscriptionManager.java         | 5 +++++
 .../brooklyn/core/mgmt/internal/LocalSubscriptionManager.java   | 5 +++++
 .../core/mgmt/internal/QueueingSubscriptionManager.java         | 1 +
 .../org/apache/brooklyn/core/mgmt/internal/Subscription.java    | 1 +
 5 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b0556dec/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index f0c9335..86d2e06 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -263,7 +263,8 @@ public abstract class AbstractManagementContext implements ManagementContextInte
     @Override
     public SubscriptionContext getSubscriptionContext(Entity e, EntityAdjunct a) {
         // BSC is a thin wrapper around SM so fine to create a new one here
-        Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e), BrooklynTaskTags.tagForContextAdjunct(a)));
+        Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e), BrooklynTaskTags.tagForContextAdjunct(a)),
+            "subscriptionDescription", "adjunct "+a.getId());
         return new BasicSubscriptionContext(flags, getSubscriptionManager(), e);
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b0556dec/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
index c15f770..e7ae59e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java
@@ -30,6 +30,7 @@ import org.apache.brooklyn.api.mgmt.SubscriptionManager;
 import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -147,4 +148,8 @@ public abstract class AbstractSubscriptionManager implements SubscriptionManager
         return s.subscriber!=null ? s.subscriber : flags.containsKey("subscriber") ? flags.remove("subscriber") : s.listener;
     }
 
+    protected <T> String getSubscriptionDescription(Map<String, Object> flags, Subscription<T> s) {
+        return s.subscriptionDescription!=null ? s.subscriptionDescription : flags.containsKey("subscriptionDescription") ? Strings.toString(flags.remove("subscriptionDescription")) : null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b0556dec/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 65e4b14..6b1a5d9 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -106,6 +106,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
         Entity producer = s.producer;
         Sensor<T> sensor= s.sensor;
         s.subscriber = getSubscriber(flags, s);
+        s.subscriptionDescription = getSubscriptionDescription(flags, s);
         if (flags.containsKey("tags") || flags.containsKey("tag")) {
             Iterable<?> tags = (Iterable<?>) flags.get("tags");
             Object tag = flags.get("tag");
@@ -255,6 +256,10 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
         description.append(sourceName==null ? "<null-source>" : sourceName);
         description.append(" publishing to ");
         description.append(s.subscriber instanceof Entity ? ((Entity)s.subscriber).getId() : s.subscriber);
+        if (Strings.isNonBlank(s.subscriptionDescription)) {
+            description.append(", ");
+            description.append(s.subscriptionDescription);
+        }
         
         if (includeDescriptionForSensorTask(event)) {
             name.append(" ");

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b0556dec/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
index 290520d..57ad073 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java
@@ -55,6 +55,7 @@ public class QueueingSubscriptionManager extends AbstractSubscriptionManager {
         QueuedSubscription<T> qs = new QueuedSubscription<T>();
         qs.flags = flags;
         s.subscriber = getSubscriber(flags, s);
+        s.subscriptionDescription = getSubscriptionDescription(flags, s);
         qs.s = s;
         queuedSubscriptions.add(qs);
         return s;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b0556dec/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
index 5e71701..1cf0ad7 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
@@ -37,6 +37,7 @@ class Subscription<T> implements SubscriptionHandle {
     public Object subscriberExecutionManagerTag;
     /** whether the tag was supplied by user, in which case we should not clear execution semantics */
     public boolean subscriberExecutionManagerTagSupplied;
+    public String subscriptionDescription;
     public Iterable<?> subscriberExtraExecTags;
     public final Entity producer;
     public final Sensor<T> sensor;


[11/23] brooklyn-server git commit: task visibility: ensure all tasks have a name, updating exec.submit() methods to take name

Posted by he...@apache.org.
task visibility: ensure all tasks have a name, updating exec.submit() methods to take name


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/79cc9bcc
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/79cc9bcc
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/79cc9bcc

Branch: refs/heads/master
Commit: 79cc9bccf0ca80816ecd09846ec385d5cad3fbf8
Parents: aeecd3e
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Sep 20 10:09:05 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Sep 20 10:10:25 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/ExecutionContext.java     | 17 +++-
 .../brooklyn/api/mgmt/ExecutionManager.java     | 14 ++-
 .../camp/brooklyn/ConfigParametersYamlTest.java | 13 ++-
 .../brooklyn/camp/brooklyn/spi/dsl/DslTest.java |  4 +-
 .../core/mgmt/rebind/RebindManagerImpl.java     | 10 +--
 .../core/task/AbstractExecutionContext.java     | 10 ++-
 .../util/core/task/BasicExecutionContext.java   |  5 ++
 .../util/core/task/BasicExecutionManager.java   |  8 +-
 .../core/config/DeferredConfigTest.java         |  8 +-
 .../entity/ApplicationLifecycleStateTest.java   | 18 ++--
 .../brooklyn/core/entity/EntityAssertsTest.java | 92 ++++---------------
 .../mgmt/persist/XmlMementoSerializerTest.java  |  2 +-
 .../qa/performance/TaskPerformanceTest.java     |  6 +-
 .../ssh/SshMachineLocationIntegrationTest.java  |  6 +-
 .../location/ssh/SshMachineLocationTest.java    | 12 +--
 .../util/core/task/TaskPredicatesTest.java      |  5 +-
 .../util/core/task/ValueResolverTest.java       | 93 +++++++++++++++++---
 .../brooklyn/policy/ha/ServiceReplacer.java     |  6 +-
 .../brooklyn/policy/ha/ServiceRestarter.java    |  4 +-
 .../rest/resources/SensorResourceTest.java      |  2 +-
 .../entity/brooklynnode/BrooklynNodeImpl.java   | 12 +--
 21 files changed, 178 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index 142e664..c940ca0 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -52,12 +52,22 @@ public interface ExecutionContext extends Executor {
      */
     <T> Task<T> submit(Map<?,?> properties, Callable<T> callable);
 
-    /** {@link ExecutionManager#submit(Runnable) */
+    /** {@link ExecutionManager#submit(Runnable) 
+     * @deprecated since 0.12.0 pass a display name or a more detailed map */
+    @Deprecated
     Task<?> submit(Runnable runnable);
  
-    /** {@link ExecutionManager#submit(Callable) */
+    /** {@link ExecutionManager#submit(Callable)
+     * @deprecated since 0.12.0 pass a display name or a more detailed map */
+    @Deprecated
     <T> Task<T> submit(Callable<T> callable);
 
+    /** {@link ExecutionManager#submit(String Runnable) */
+    Task<?> submit(String displayName, Runnable runnable);
+ 
+    /** {@link ExecutionManager#submit(String, Callable) */
+    <T> Task<T> submit(String displayName, Callable<T> callable);
+
     /** See {@link ExecutionManager#submit(Map, TaskAdaptable)}. */
     <T> Task<T> submit(TaskAdaptable<T> task);
     
@@ -80,6 +90,9 @@ public interface ExecutionContext extends Executor {
     // TODO reference ImmediateSupplier when that class is moved to utils project
     @Beta
     <T> Maybe<T> getImmediately(Object callableOrSupplierOrTask);
+    /** As {@link #getImmediately(Object)} but strongly typed for a task. */
+    @Beta
+    <T> Maybe<T> getImmediately(Task<T> callableOrSupplierOrTask);
 
     /**
      * Efficient shortcut for {@link #submit(TaskAdaptable)} followed by an immediate {@link Task#get()}.

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
index 97108ab..d4d1db4 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java
@@ -79,11 +79,21 @@ public interface ExecutionManager {
 //    /** returns all tasks known to this manager (immutable) */
 //    public Set<Task<?>> getAllTasks();
 
-    /** see {@link #submit(Map, TaskAdaptable)} */
+    /** see {@link #submit(Map, TaskAdaptable)} 
+     * @deprecated since 0.12.0 pass displayName or map */
+    @Deprecated
     public Task<?> submit(Runnable r);
 
+    /** see {@link #submit(Map, TaskAdaptable)} 
+     * @deprecated since 0.12.0 pass displayName or map */
+    @Deprecated
+    public <T> Task<T> submit(Callable<T> r);
+
+    /** see {@link #submit(Map, TaskAdaptable)} */
+    public Task<?> submit(String displayName, Runnable c);
+
     /** see {@link #submit(Map, TaskAdaptable)} */
-    public <T> Task<T> submit(Callable<T> c);
+    public <T> Task<T> submit(String displayName, Callable<T> c);
 
     /** see {@link #submit(Map, TaskAdaptable)} */
     public <T> Task<T> submit(TaskAdaptable<T> task);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java
index 98ccb0b..3840e2d 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java
@@ -934,13 +934,12 @@ public class ConfigParametersYamlTest extends AbstractYamlRebindTest {
         Entity app = createStartWaitAndLogApplication(yaml);
         final TestEntity entity1 = (TestEntity) Iterables.getOnlyElement(app.getChildren());
 
-        TestEntity entity2 = entity1.getExecutionContext().submit(new Callable<TestEntity>() {
-            public TestEntity call() {
-                TestEntity entity2 = entity1.addChild(EntitySpec.create(TestEntity.class));
-                entity2.start(Collections.<Location>emptyList());
-                return entity2;
-            } 
-        }).get();
+        TestEntity entity2 = entity1.getExecutionContext().submit("create and start", () -> {
+                TestEntity entity2i = entity1.addChild(EntitySpec.create(TestEntity.class));
+                entity2i.start(Collections.<Location>emptyList());
+                return entity2i;
+            })
+            .get();
         
         Entities.dumpInfo(app);
         

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
index 63aba8e..04b6bf1 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
@@ -327,7 +327,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
     protected void runConcurrentWorker(Supplier<Runnable> taskSupplier) {
         Collection<Task<?>> results = new ArrayList<>();
         for (int i = 0; i < MAX_PARALLEL_RESOLVERS; i++) {
-            Task<?> result = app.getExecutionContext().submit(taskSupplier.get());
+            Task<?> result = app.getExecutionContext().submit("parallel "+i, taskSupplier.get());
             results.add(result);
         }
         for (Task<?> result : results) {
@@ -550,7 +550,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
             }
         };
         if (execInTask) {
-            Task<Maybe<?>> task = ((EntityInternal)context).getExecutionContext().submit(job);
+            Task<Maybe<?>> task = ((EntityInternal)context).getExecutionContext().submit("Resolving DSL for test: "+dsl, job);
             task.get(Asserts.DEFAULT_LONG_TIMEOUT);
             assertTrue(task.isDone());
             return task.get();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index d896376..381fee8 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -520,15 +520,7 @@ public class RebindManagerImpl implements RebindManager {
         ExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
         if (ec == null) {
             ec = managementContext.getServerExecutionContext();
-            Task<List<Application>> task = ec.submit(new Callable<List<Application>>() {
-                @Override public List<Application> call() throws Exception {
-                    return rebindImpl(classLoader, exceptionHandler, mode);
-                }});
-            try {
-                return task.get();
-            } catch (Exception e) {
-                throw Exceptions.propagate(e);
-            }
+            return ec.get(Tasks.<List<Application>>builder().displayName("rebind").dynamic(false).body(() -> rebindImpl(classLoader, exceptionHandler, mode)).build());
         } else {
             return rebindImpl(classLoader, exceptionHandler, mode);
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java
index e7debb9..424bbfc 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java
@@ -25,6 +25,7 @@ import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import org.apache.brooklyn.api.mgmt.ExecutionManager;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.mgmt.TaskAdaptable;
+import org.apache.brooklyn.util.collections.MutableMap;
 
 import com.google.common.collect.Maps;
 
@@ -41,11 +42,16 @@ public abstract class AbstractExecutionContext implements ExecutionContext {
     public Task<?> submit(Map<?,?> properties, Runnable runnable) { return submitInternal(properties, runnable); }
     
     /** @see #submit(Map, Runnable) */
-    @Override
+    @Override 
+    public Task<?> submit(String displayName, Runnable runnable) { return submitInternal(MutableMap.of("displayName", displayName), runnable); }
+    @Override @Deprecated
     public Task<?> submit(Runnable runnable) { return submitInternal(Maps.newLinkedHashMap(), runnable); }
+    
  
     /** @see #submit(Map, Runnable) */
-    @Override
+    @Override 
+    public <T> Task<T> submit(String displayName, Callable<T> callable) { return submitInternal(MutableMap.of("displayName", displayName), callable); }
+    @Override @Deprecated
     public <T> Task<T> submit(Callable<T> callable) { return submitInternal(Maps.newLinkedHashMap(), callable); }
     
     /** @see #submit(Map, Runnable) */

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 1b85663..1236219 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -239,6 +239,11 @@ public class BasicExecutionContext extends AbstractExecutionContext {
         }
     }
     
+    @Override
+    public <T> Maybe<T> getImmediately(Task<T> callableOrSupplier) {
+        return getImmediately((Object) callableOrSupplier);
+    }
+    
     /** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context;
      * currently supports {@link Supplier}, {@link Callable}, {@link Runnable}, or {@link Task} instances; 
      * with tasks if it is submitted or in progress,

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 46e501e8..e85bf2f 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -57,7 +57,7 @@ import org.apache.brooklyn.core.config.Sanitizer;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.util.collections.MutableList;
-import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
@@ -388,10 +388,12 @@ public class BasicExecutionManager implements ExecutionManager {
         }
     }
 
-    @Override public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); }
+    @Override @Deprecated public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); }
+    @Override public Task<?> submit(String displayName, Runnable r) { return submit(MutableMap.of("displayName", displayName), r); }
     @Override public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags, new BasicTask<Void>(flags, r)); }
 
-    @Override public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); }
+    @Override @Deprecated public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); }
+    @Override public <T> Task<T> submit(String displayName, Callable<T> c) { return submit(MutableMap.of("displayName", displayName), c); }
     @Override public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { return submit(flags, new BasicTask<T>(flags, c)); }
 
     @Override public <T> Task<T> submit(TaskAdaptable<T> t) { return submit(new LinkedHashMap<Object,Object>(1), t); }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java b/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java
index 9decca8..c2cf908 100644
--- a/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java
@@ -21,7 +21,6 @@ package org.apache.brooklyn.core.config;
 import static org.testng.Assert.assertEquals;
 
 import java.util.List;
-import java.util.concurrent.Callable;
 
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.Sensor;
@@ -55,13 +54,10 @@ public class DeferredConfigTest extends BrooklynAppUnitTestSupport {
     
     void doTestDeferredConfigInList(final boolean delay) throws Exception {
         // Simulate a deferred value
-        Task<Sensor<?>> sensorFuture = app.getExecutionContext().submit(new Callable<Sensor<?>>() {
-            @Override
-            public Sensor<?> call() throws Exception {
+        Task<Sensor<?>> sensorFuture = app.getExecutionContext().submit("deferred return sensor", () -> {
                 if (delay) Time.sleep(Duration.FIVE_SECONDS);
                 return TestApplication.MY_ATTRIBUTE;
-            }
-        });
+            });
         app.config().set(SENSORS_UNTYPED, (Object)ImmutableList.of(sensorFuture));
 
         if (!delay) sensorFuture.get(Duration.ONE_SECOND);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java
index e8a3eee..f301b3c 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java
@@ -267,20 +267,14 @@ public class ApplicationLifecycleStateTest extends BrooklynMgmtUnitTestSupport {
             }
         });
 
-        Task<?> first = mgmt.getExecutionManager().submit(new Runnable() {
-            @Override
-            public void run() {
+        Task<?> first = mgmt.getExecutionManager().submit("setting test sensor", () -> {
                 app.sensors().set(TEST_SENSOR, "first");
                 log.debug("set first");
-            }
-        });
-        Task<?> second = mgmt.getExecutionManager().submit(new Runnable() {
-            @Override
-            public void run() {
+            });
+        Task<?> second = mgmt.getExecutionManager().submit("setting test sensor", () -> {
                 app.sensors().set(TEST_SENSOR, "second");
                 log.debug("set second");
-            }
-        });
+            });
         first.blockUntilEnded();
         second.blockUntilEnded();
 
@@ -394,8 +388,8 @@ public class ApplicationLifecycleStateTest extends BrooklynMgmtUnitTestSupport {
         };
 
         // Simulates firing the emit method from event handlers in different threads
-        mgmt.getExecutionManager().submit(overrideJob);
-        mgmt.getExecutionManager().submit(overrideJob);
+        mgmt.getExecutionManager().submit("emitting test sensor", overrideJob);
+        mgmt.getExecutionManager().submit("emitting test sensor", overrideJob);
 
         Asserts.eventually(Suppliers.ofInstance(seenValues), CollectionFunctionals.sizeEquals(2));
         Asserts.succeedsContinually(new Runnable() {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
index bfdac3c..3160518 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.brooklyn.core.entity;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
@@ -33,7 +32,6 @@ import org.apache.brooklyn.util.time.Duration;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -85,12 +83,7 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
         entity.sensors().set(TestEntity.NAME, "before");
         final String after = "after";
 
-        Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
-            @Override
-            public void run() {
-                EntityAsserts.assertAttributeEqualsEventually(entity, TestEntity.NAME, after);
-            }
-        });
+        Task<?> assertValue = entity.getExecutionContext().submit("assert attr equals", () -> EntityAsserts.assertAttributeEqualsEventually(entity, TestEntity.NAME, after));
         entity.sensors().set(TestEntity.NAME, after);
         assertValue.get();
     }
@@ -105,12 +98,7 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
     @Test
     public void shouldAssertAttributeEventuallyNonNull() throws Exception {
         EntityAsserts.assertAttributeEquals(entity, TestEntity.NAME, null);
-        Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
-            @Override
-            public void run() {
-                EntityAsserts.assertAttributeEventuallyNonNull(entity, TestEntity.NAME);
-            }
-        });
+        Task<?> assertValue = entity.getExecutionContext().submit("assert attr non-null", () -> EntityAsserts.assertAttributeEventuallyNonNull(entity, TestEntity.NAME));
         entity.sensors().set(TestEntity.NAME, "something");
         assertValue.get();
     }
@@ -118,18 +106,11 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
     @Test
     public void shouldAssertAttributeEventually() throws Exception {
         final CountDownLatch eventuallyEntered = new CountDownLatch(2);
-        Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
-            @Override
-            public void run() {
-                EntityAsserts.assertAttributeEventually(entity, TestEntity.NAME, new Predicate<String>() {
-                    @Override
-                    public boolean apply(String input) {
-                        eventuallyEntered.countDown();
-                        return input.matches(".*\\d+");
-                    }
-                });
-            }
-        });
+        Task<?> assertValue = entity.getExecutionContext().submit("assert attribute", () -> EntityAsserts.assertAttributeEventually(entity, TestEntity.NAME, 
+            (input) -> {
+                eventuallyEntered.countDown();
+                return input.matches(".*\\d+");
+            }) );
         eventuallyEntered.await();
         entity.sensors().set(TestEntity.NAME, "testing testing 123");
         assertValue.get();
@@ -146,18 +127,11 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
     public void shouldAssertPredicateEventuallyTrue() throws Exception {
         final int testVal = 987654321;
         final CountDownLatch eventuallyEntered = new CountDownLatch(2);
-        Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
-            @Override
-            public void run() {
-                EntityAsserts.assertPredicateEventuallyTrue(entity, new Predicate<TestEntity>() {
-                    @Override
-                    public boolean apply(TestEntity input) {
-                        eventuallyEntered.countDown();
-                        return testVal == input.getSequenceValue();
-                    }
-                });
-            }
-        });
+        Task<?> assertValue = entity.getExecutionContext().submit("assert predicate", () -> EntityAsserts.assertPredicateEventuallyTrue(entity, 
+            (input) -> {
+                eventuallyEntered.countDown();
+                return testVal == input.getSequenceValue();
+            }));
         eventuallyEntered.await();
         entity.setSequenceValue(testVal);
         assertValue.get();
@@ -175,12 +149,7 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
     public void shouldFailAssertAttributeEqualsContinually() throws Throwable {
         final String myName = "myname";
         entity.sensors().set(TestEntity.NAME, myName);
-        Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
-            @Override
-            public void run() {
-                EntityAsserts.assertAttributeEqualsContinually(entity, TestEntity.NAME, myName);
-            }
-        });
+        Task<?> assertValue = entity.getExecutionContext().submit("check attr equals", () -> EntityAsserts.assertAttributeEqualsContinually(entity, TestEntity.NAME, myName));
         entity.sensors().set(TestEntity.NAME, "something");
         try {
             assertValue.get();
@@ -199,20 +168,10 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
         app.createAndManageChild(stooge);
         app.createAndManageChild(stooge);
 
-        Task<?> assertValue1 = entity.getExecutionContext().submit(new Runnable() {
-            @Override
-            public void run() {
-                EntityAsserts.assertGroupSizeEqualsEventually(ImmutableMap.of("timeout", "2s"), stooges, 3);
-            }
-        });
+        Task<?> assertValue1 = entity.getExecutionContext().submit("assert size", () -> EntityAsserts.assertGroupSizeEqualsEventually(ImmutableMap.of("timeout", "2s"), stooges, 3));
         stooges.setEntityFilter(EntityPredicates.configEqualTo(TestEntity.CONF_NAME, STOOGE));
         assertValue1.get();
-        Task<?> assertValue2 = entity.getExecutionContext().submit(new Runnable() {
-            @Override
-            public void run() {
-                EntityAsserts.assertGroupSizeEqualsEventually(stooges, 0);
-            }
-        });
+        Task<?> assertValue2 = entity.getExecutionContext().submit("assert size 0", () -> EntityAsserts.assertGroupSizeEqualsEventually(stooges, 0));
         stooges.setEntityFilter(EntityPredicates.configEqualTo(TestEntity.CONF_NAME, "Marx Brother"));
         assertValue2.get();
     }
@@ -220,24 +179,11 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
     @Test
     public void shouldAssertAttributeChangesEventually () throws Exception{
         entity.sensors().set(TestEntity.NAME, "before");
-        final Task<?> assertValue = entity.getExecutionContext().submit(new Runnable() {
-            @Override
-            public void run() {
-                EntityAsserts.assertAttributeChangesEventually(entity, TestEntity.NAME);
-            }
-        });
+        final Task<?> assertValue = entity.getExecutionContext().submit("check attr change", () -> EntityAsserts.assertAttributeChangesEventually(entity, TestEntity.NAME));
         Repeater.create()
-            .repeat(new Runnable() {
-                @Override
-                public void run() {
-                    entity.sensors().set(TestEntity.NAME, "after" + System.currentTimeMillis());
-                }
-            }).until(new Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
-                    return assertValue.isDone();
-                }
-            }).every(Duration.millis(10))
+            .repeat(() -> entity.sensors().set(TestEntity.NAME, "after" + System.currentTimeMillis()))
+            .until(() -> assertValue.isDone())
+            .every(Duration.millis(10))
             .run();
         assertValue.get();
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java
index 59ea8f2..88082ff 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java
@@ -611,7 +611,7 @@ public class XmlMementoSerializerTest {
     public void testTask() throws Exception {
         final TestApplication app = TestApplication.Factory.newManagedInstanceForTests();
         mgmt = app.getManagementContext();
-        Task<String> completedTask = app.getExecutionContext().submit(Callables.returning("myval"));
+        Task<String> completedTask = app.getExecutionContext().submit("return myval", Callables.returning("myval"));
         completedTask.get();
         
         String loggerName = UnwantedStateLoggingMapper.class.getName();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java b/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java
index a7a531b..949f139 100644
--- a/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java
@@ -74,11 +74,7 @@ public class TaskPerformanceTest extends AbstractPerformanceTest {
                 .summary("TaskPerformanceTest.testExecuteSimplestRunnable")
                 .iterations(numIterations)
                 .minAcceptablePerSecond(minRatePerSec)
-                .job(new Runnable() {
-                    @Override
-                    public void run() {
-                        executionManager.submit(work);
-                    }})
+                .job(() -> executionManager.submit("inner", work))
                 .completionLatch(completionLatch));
     }
     

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java
index 9ff39c1..e648c32 100644
--- a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java
+++ b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java
@@ -123,11 +123,7 @@ public class SshMachineLocationIntegrationTest extends SshMachineLocationTest {
         BasicExecutionManager execManager = new BasicExecutionManager("mycontextid");
         BasicExecutionContext execContext = new BasicExecutionContext(execManager);
         try {
-            MachineDetails details = execContext.submit(new Callable<MachineDetails>() {
-                @Override
-                public MachineDetails call() {
-                    return host.getMachineDetails();
-                }}).get();
+            MachineDetails details = execContext.submit("get details", () -> host.getMachineDetails()).get();
             LOG.info("machineDetails="+details);
             assertNotNull(details);
         } finally {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java
index 5c6e918..d25991c 100644
--- a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java
+++ b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java
@@ -131,11 +131,7 @@ public class SshMachineLocationTest extends BrooklynAppUnitTestSupport {
         BasicExecutionManager execManager = new BasicExecutionManager("mycontextid");
         BasicExecutionContext execContext = new BasicExecutionContext(execManager);
         try {
-            MachineDetails details = execContext.submit(new Callable<MachineDetails>() {
-                @Override
-                public MachineDetails call() {
-                    return host.getMachineDetails();
-                }}).get();
+            MachineDetails details = execContext.submit("get details", () -> host.getMachineDetails()).get();
             LOG.info("machineDetails="+details);
             assertNotNull(details);
             
@@ -166,11 +162,7 @@ public class SshMachineLocationTest extends BrooklynAppUnitTestSupport {
         BasicExecutionManager execManager = new BasicExecutionManager("mycontextid");
         BasicExecutionContext execContext = new BasicExecutionContext(execManager);
         try {
-            MachineDetails details = execContext.submit(new Callable<MachineDetails>() {
-                @Override
-                public MachineDetails call() {
-                    return host.getMachineDetails();
-                }}).get();
+            MachineDetails details = execContext.submit("get details", () -> host.getMachineDetails()).get();
             LOG.info("machineDetails="+details);
             assertNotNull(details);
             

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
index 90d6b06..2656c80 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
@@ -82,14 +82,13 @@ public class TaskPredicatesTest extends BrooklynAppUnitTestSupport {
     @Test
     public void testIsDone() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        Task<?> task = app.getExecutionContext().submit(new Runnable() {
-            public void run() {
+        Task<?> task = app.getExecutionContext().submit("await latch", () -> {
                 try {
                     latch.await();
                 } catch (InterruptedException e) {
                     throw Exceptions.propagate(e);
                 }
-            }});
+            });
         
         assertFalse(TaskPredicates.isDone().apply(task));
         

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index 1f5c754..455f8ad 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.fail;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -87,19 +88,17 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
         
         Assert.assertTrue(result.isAbsent(), "result="+result);
         Exception exception = Maybe.getException(result);
-        Assert.assertTrue(exception.toString().contains("no execution context available"), "exception="+exception);
+        Asserts.assertStringContains(exception.toString(), "no execution context available");
+        
+        Asserts.assertThat(t, (tt) -> !tt.isBegun());
     }
 
     public void testUnsubmittedTaskWithExecutionContextExecutesAndReturns() {
         final Task<String> t = newSleepTask(Duration.ZERO, "foo");
         
         // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
-        Maybe<String>  result = app.getExecutionContext()
-                .submit(new Callable<Maybe<String> >() {
-                    @Override
-                    public Maybe<String>  call() throws Exception {
-                        return Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe();
-                    }})
+        Maybe<String> result = app.getExecutionContext()
+                .submit("resolving sleep task", () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe())
                 .getUnchecked();
         
         Assert.assertEquals(result.get(), "foo");
@@ -110,17 +109,83 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
         
         // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
         // However, it will quickly timeout as the task will not have completed.
-        Maybe<String>  result = app.getExecutionContext()
-                .submit(new Callable<Maybe<String> >() {
-                    @Override
-                    public Maybe<String>  call() throws Exception {
-                        return Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
-                    }})
+        Maybe<String> result = app.getExecutionContext()
+                .submit("resolving sleep task", () -> Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe())
                 .getUnchecked();
         
         Assert.assertTrue(result.isAbsent(), "result="+result);
         Exception exception = Maybe.getException(result);
         Assert.assertTrue(exception.toString().contains("not completed when immediate completion requested"), "exception="+exception);
+        
+        Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS);
+        Asserts.assertThat(t, (tt) -> !tt.isDone());
+    }
+    
+    public void testUnsubmittedTaskWithExecutionContextExecutesAndReturnsForeground() {
+        final Task<String> t = newSleepTask(Duration.ZERO, "foo");
+        
+        // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
+        Maybe<String> result = app.getExecutionContext()
+                .get(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe() ));
+        
+        Assert.assertEquals(result.get(), "foo");
+    }
+
+    public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOutForeground() {
+        final Task<String> t = newSleepTask(Duration.ONE_MINUTE, "foo");
+        
+        // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
+        // However, it will quickly timeout as the task will not have completed.
+        Maybe<String> result = app.getExecutionContext()
+            .get(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe() ));
+        
+        Assert.assertTrue(result.isAbsent(), "result="+result);
+        Exception exception = Maybe.getException(result);
+        Assert.assertTrue(exception.toString().contains("not completed when immediate completion requested"), "exception="+exception);
+        
+        Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS);
+        Asserts.assertThat(t, (tt) -> !tt.isDone());
+    }
+
+    public void testUnsubmittedTaskWithExecutionContextTimesOutWhenImmediate() {
+        final Task<String> t = newSleepTask(Duration.ZERO, "foo");
+        
+        // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task
+        Maybe<Maybe<String>> result = app.getExecutionContext()
+                .getImmediately(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe() ));
+        
+        // However, the resubmission will not be waited upon
+        Assert.assertTrue(result.isPresent(), "result="+result);
+        Assert.assertTrue(result.get().isAbsent(), "result="+result);
+        Exception exception = Maybe.getException(result.get());
+        
+        Asserts.assertStringContainsIgnoreCase(exception.toString(), "immediate", "not", "available");
+
+        // But the underlying task is running
+        Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS);
+        Asserts.eventually(() -> t, (tt) -> tt.isDone(), Duration.TEN_SECONDS);
+        Asserts.assertThat(t, (tt) -> Objects.equals(tt.getUnchecked(), "foo"));
+        
+        // And subsequent get _is_ immediate
+        result = app.getExecutionContext()
+            .getImmediately(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe() ));
+        Assert.assertEquals(result.get().get(), "foo");
+    }
+
+    public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOutImmediate() {
+        final Task<String> t = newSleepTask(Duration.ONE_MINUTE, "foo");
+        
+        // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
+        // However, it will quickly timeout as the task will not have completed.
+        Maybe<Maybe<String>> result = app.getExecutionContext()
+            .getImmediately(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe() ));
+        
+        Assert.assertTrue(result.isPresent(), "result="+result);
+        Assert.assertTrue(result.get().isAbsent(), "result="+result);
+        Exception exception = Maybe.getException(result.get());
+        Asserts.assertStringContainsIgnoreCase(exception.toString(), "immediate", "not", "available");
+        Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS);
+        Asserts.assertThat(t, (tt) -> !tt.isDone());
     }
 
     public void testSwallowError() {
@@ -161,7 +226,7 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
     
     public void testGetImmediatelyInTask() throws Exception {
         final MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
-        Task<CallInfo> task = app.getExecutionContext().submit(new Callable<CallInfo>() {
+        Task<CallInfo> task = app.getExecutionContext().submit("test task for call stack", new Callable<CallInfo>() {
             @Override
             public CallInfo call() {
                 return myUniquelyNamedMethod();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
index e14433d..c799336 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java
@@ -129,7 +129,7 @@ public class ServiceReplacer extends AbstractPolicy {
                     if (isRunning()) {
                         highlightViolation("Failure detected");
                         LOG.warn("ServiceReplacer notified; dispatching job for "+entity+" ("+event.getValue()+")");
-                        getExecutionContext().submit(() -> onDetectedFailure(event));
+                        getExecutionContext().submit("Analyzing detected failure", () -> onDetectedFailure(event));
                     } else {
                         LOG.warn("ServiceReplacer not running, so not acting on failure detected at "+entity+" ("+event.getValue()+", child of "+entity+")");
                     }
@@ -171,9 +171,9 @@ public class ServiceReplacer extends AbstractPolicy {
             return;
         }
         
-        highlightViolation(violationText+", triggering restart");
+        highlightViolation(violationText+", triggering replacement");
         LOG.warn("ServiceReplacer acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")");
-        Task<?> t = getExecutionContext().submit(() -> {
+        Task<?> t = getExecutionContext().submit("Replace member on failure", () -> {
                 try {
                     Entities.invokeEffectorWithArgs(entity, entity, MemberReplaceable.REPLACE_MEMBER, failedEntity.getId()).get();
                     consecutiveReplacementFailureTimes.clear();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
index a31e3c0..3c0341e 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java
@@ -29,14 +29,12 @@ import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
 import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
 import org.apache.brooklyn.core.entity.trait.Startable;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.core.sensor.BasicNotificationSensor;
 import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
-import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.flags.SetFromFlag;
 import org.apache.brooklyn.util.javalang.JavaClassNames;
 import org.apache.brooklyn.util.time.Duration;
@@ -112,7 +110,7 @@ public class ServiceRestarter extends AbstractPolicy {
                     
                     if (isRunning()) {
                         LOG.info("ServiceRestarter notified; dispatching job for "+entity+" ("+event.getValue()+")");
-                        getExecutionContext().submit(() -> onDetectedFailure(event));
+                        getExecutionContext().submit("Analyzing detected failure", () -> onDetectedFailure(event));
                     } else {
                         LOG.warn("ServiceRestarter not running, so not acting on failure detected at "+entity+" ("+event.getValue()+")");
                     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java
----------------------------------------------------------------------
diff --git a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java
index 391f2bb..e5fc5b2 100644
--- a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java
+++ b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java
@@ -325,7 +325,7 @@ public class SensorResourceTest extends BrooklynRestResourceTest {
     
     @Test
     public void testGetSensorValueOfTypeCompletedTask() throws Exception {
-        Task<String> task = entity.getExecutionContext().submit(Callables.returning("myval"));
+        Task<String> task = entity.getExecutionContext().submit("returning myval", Callables.returning("myval"));
         task.get();
         entity.sensors().set(Sensors.newSensor(Task.class, "myTask"), task);
         doGetSensorTest("myTask", String.class, "\"myval\"");

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/79cc9bcc/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
index 54c81d1..d6e3fce 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
@@ -50,32 +50,32 @@ import org.apache.brooklyn.entity.brooklynnode.EntityHttpClient.ResponseCodePred
 import org.apache.brooklyn.entity.brooklynnode.effector.BrooklynNodeUpgradeEffectorBody;
 import org.apache.brooklyn.entity.brooklynnode.effector.SetHighAvailabilityModeEffectorBody;
 import org.apache.brooklyn.entity.brooklynnode.effector.SetHighAvailabilityPriorityEffectorBody;
-import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
 import org.apache.brooklyn.entity.software.base.SoftwareProcess.StopSoftwareParameters.StopMode;
+import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
 import org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks;
 import org.apache.brooklyn.feed.http.HttpFeed;
 import org.apache.brooklyn.feed.http.HttpPollConfig;
 import org.apache.brooklyn.feed.http.HttpValueFunctions;
 import org.apache.brooklyn.feed.http.JsonFunctions;
-import org.apache.http.HttpStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.brooklyn.util.collections.Jsonya;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.http.HttpToolResponse;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.TaskTags;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.exceptions.PropagatedRuntimeException;
 import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.http.HttpToolResponse;
 import org.apache.brooklyn.util.javalang.Enums;
 import org.apache.brooklyn.util.javalang.JavaClassNames;
 import org.apache.brooklyn.util.repeat.Repeater;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
 import org.apache.brooklyn.util.time.Time;
+import org.apache.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Functions;
@@ -222,7 +222,7 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod
             // we could wait for BrooklynTaskTags.getTasksInEntityContext(ExecutionManager, this).isEmpty();
             Task<?> stopEffectorTask = BrooklynTaskTags.getClosestEffectorTask(Tasks.current(), Startable.STOP);
             Task<?> topEntityTask = getTopEntityTask(stopEffectorTask);
-            getManagementContext().getExecutionManager().submit(new UnmanageTask(topEntityTask, this));
+            getManagementContext().getExecutionManager().submit("Unmanage Brooklyn entity after stop", new UnmanageTask(topEntityTask, this));
         }
     }
 


[22/23] brooklyn-server git commit: address review comments

Posted by he...@apache.org.
address review comments


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/2dcb0a03
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/2dcb0a03
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/2dcb0a03

Branch: refs/heads/master
Commit: 2dcb0a03a9e9b27a81535a13bd48d1958f8984d9
Parents: 9213f0e
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Oct 6 01:04:34 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Oct 6 01:12:43 2017 +0100

----------------------------------------------------------------------
 .../apache/brooklyn/api/mgmt/ExecutionContext.java  |  2 +-
 .../apache/brooklyn/api/mgmt/ManagementContext.java |  3 +++
 .../brooklyn/core/config/ConfigConstraints.java     |  2 +-
 .../mgmt/internal/AbstractManagementContext.java    | 16 ++++++++++++++++
 .../internal/NonDeploymentManagementContext.java    |  9 +++++++++
 .../core/mgmt/rebind/RebindManagerImpl.java         |  3 ++-
 .../brooklyn/core/objs/AbstractEntityAdjunct.java   |  6 +-----
 .../util/core/task/BasicExecutionManager.java       |  5 ++++-
 .../brooklyn/core/entity/EntityAssertsTest.java     |  3 ++-
 .../mgmt/internal/LocalSubscriptionManagerTest.java |  8 +++-----
 10 files changed, 42 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index 2d8fe23..d303ba9 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -62,7 +62,7 @@ public interface ExecutionContext extends Executor {
     @Deprecated
     <T> Task<T> submit(Callable<T> callable);
 
-    /** {@link ExecutionManager#submit(String Runnable) */
+    /** {@link ExecutionManager#submit(String, Runnable) */
     Task<?> submit(String displayName, Runnable runnable);
  
     /** {@link ExecutionManager#submit(String, Callable) */

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
index 00dd4d3..c1a2c21 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
@@ -166,6 +166,9 @@ public interface ManagementContext {
      */
     ExecutionContext getExecutionContext(Entity entity);
     
+    /** As {@link #getExecutionContext(Entity)} where there is also an adjunct */
+    ExecutionContext getExecutionContext(Entity e, EntityAdjunct a);
+    
     /**
      * Returns a {@link SubscriptionContext} instance representing subscriptions
      * (from the {@link SubscriptionManager}) associated with this entity, and capable 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
index c4ce81d..682dedb 100644
--- a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
+++ b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java
@@ -117,7 +117,7 @@ public abstract class ConfigConstraints<T extends BrooklynObject> {
     public Iterable<ConfigKey<?>> getViolations() {
         ExecutionContext exec = 
             getBrooklynObject() instanceof EntityInternal ? ((EntityInternal)getBrooklynObject()).getExecutionContext() :
-            // getBrooklynObject() instanceof AbstractEntityAdjunct ? ((AbstractEntityAdjunct)getBrooklynObject()).getExecutionContext() :
+            getBrooklynObject() instanceof AbstractEntityAdjunct ? ((AbstractEntityAdjunct)getBrooklynObject()).getExecutionContext() :
             null;
         if (exec!=null) {
             return exec.get(

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index 7a0fabd..27645bc 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -71,6 +71,7 @@ import org.apache.brooklyn.core.mgmt.classloading.JavaBrooklynClassLoadingContex
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
 import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl;
 import org.apache.brooklyn.core.mgmt.rebind.RebindManagerImpl;
+import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
 import org.apache.brooklyn.core.typereg.BasicBrooklynTypeRegistry;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.core.ResourceUtils;
@@ -245,6 +246,21 @@ public abstract class AbstractManagementContext implements ManagementContextInte
             return ((EntityInternal)e).getExecutionContext();
         }
     }
+    
+    @Override
+    public ExecutionContext getExecutionContext(Entity e, EntityAdjunct adjunct) {
+        // BEC is a thin wrapper around EM so fine to create a new one here; but make sure it gets the real entity
+        if (e instanceof AbstractEntityAdjunct) {
+            ImmutableSet<Object> tags = ImmutableSet.<Object>of(
+                    BrooklynTaskTags.tagForContextAdjunct(adjunct),
+                    BrooklynTaskTags.tagForContextEntity(e),
+                    this
+            );
+            return new BasicExecutionContext(getExecutionManager(), tags);
+        } else {
+            return ((EntityInternal)e).getExecutionContext();
+        }
+    }
 
     @Override
     public ExecutionContext getServerExecutionContext() {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
index bac7aeb..b5fd0b5 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
@@ -282,6 +282,15 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
     }
 
     @Override
+    public ExecutionContext getExecutionContext(Entity entity, EntityAdjunct adjunct) {
+        if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity);
+        if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED)
+            throw new IllegalStateException("Entity "+entity+" is no longer managed; execution context not available");
+        checkInitialManagementContextReal();
+        return initialManagementContext.getExecutionContext(entity, adjunct);
+    }
+
+    @Override
     public ExecutionContext getServerExecutionContext() {
         return initialManagementContext.getServerExecutionContext();
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index f5a1c0c..0830900 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -490,7 +490,8 @@ public class RebindManagerImpl implements RebindManager {
         ExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
         if (ec == null) {
             ec = managementContext.getServerExecutionContext();
-            return ec.get(Tasks.<List<Application>>builder().displayName("rebind").dynamic(false).body(() -> rebindImpl(classLoader, exceptionHandler, mode)).build());
+            return ec.get(Tasks.<List<Application>>builder().displayName("rebind").dynamic(false)
+                .body(() -> rebindImpl(classLoader, exceptionHandler, mode)).build());
         } else {
             return rebindImpl(classLoader, exceptionHandler, mode);
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index c6cb74c..7b2f6ad 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -54,15 +54,12 @@ import org.apache.brooklyn.core.enricher.AbstractEnricher;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.entity.internal.ConfigUtilsInternal;
-import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker;
-import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.flags.FlagUtils;
 import org.apache.brooklyn.util.core.flags.SetFromFlag;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
-import org.apache.brooklyn.util.core.task.BasicExecutionContext;
 import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -428,8 +425,7 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
     public void setEntity(EntityLocal entity) {
         if (destroyed.get()) throw new IllegalStateException("Cannot set entity on a destroyed entity adjunct");
         this.entity = entity;
-        this.execution = new BasicExecutionContext( getManagementContext().getExecutionManager(),
-                MutableList.of(BrooklynTaskTags.tagForContextAdjunct(this), BrooklynTaskTags.tagForContextEntity(entity)) );
+        this.execution = getManagementContext().getExecutionContext(entity, this);
         if (entity!=null && getCatalogItemId() == null) {
             setCatalogItemIdAndSearchPath(entity.getCatalogItemId(), entity.getCatalogItemIdSearchPath());
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index c7a9855..c53277d 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -799,7 +799,10 @@ public class BasicExecutionManager implements ExecutionManager {
             Task<T> t = Tasks.<T>builder().dynamic(false).displayName(displayName+" (placeholder for "+id+")")
                 .description("Details of the original task have been forgotten.")
                 .body(Callables.returning((T)null)).build();
-            ((BasicTask<T>)t).ignoreIfNotRun();
+            // don't really want anyone executing the "gone" task...
+            // also if we are GC'ing tasks then cancelled may help with cleanup 
+            // of sub-tasks that have lost their submitted-by-task reference ?
+            // also don't want warnings when it's finalized, this means we don't need ignoreIfNotRun()
             ((BasicTask<T>)t).cancelled = true;
             return t;
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
index 3160518..2967f22 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java
@@ -83,7 +83,8 @@ public class EntityAssertsTest extends BrooklynAppUnitTestSupport {
         entity.sensors().set(TestEntity.NAME, "before");
         final String after = "after";
 
-        Task<?> assertValue = entity.getExecutionContext().submit("assert attr equals", () -> EntityAsserts.assertAttributeEqualsEventually(entity, TestEntity.NAME, after));
+        Task<?> assertValue = entity.getExecutionContext().submit("assert attr equals", 
+            () -> EntityAsserts.assertAttributeEqualsEventually(entity, TestEntity.NAME, after));
         entity.sensors().set(TestEntity.NAME, after);
         assertValue.get();
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2dcb0a03/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
index 1373545..5c04978 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
@@ -193,8 +193,7 @@ public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport {
         // delivery should be in subscription order, so 123 then 456
         entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener);
         // wait for the above delivery - otherwise it might get dropped
-        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> { 
-            Asserts.assertSize(listener.getEvents(), 1); });
+        Asserts.succeedsEventually(() -> Asserts.assertSize(listener.getEvents(), 1));
         entity.sensors().set(TestEntity.SEQUENCE, 456);
         
         // notifications don't have "initial value" so don't get -1
@@ -207,8 +206,7 @@ public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport {
         entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener);
         entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener);
         
-        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable() {
-            @Override public void run() {
+        Asserts.succeedsEventually(() -> {
                 Asserts.assertEquals(listener.getEvents(), ImmutableList.of(
                         new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123),
                         new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 456),
@@ -216,7 +214,7 @@ public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport {
                         new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING),
                         new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")),
                     "actually got: "+listener.getEvents());
-            }});
+            });
     }
     
     @Test


[04/23] brooklyn-server git commit: task visibility: better names for config retrieval tasks

Posted by he...@apache.org.
task visibility: better names for config retrieval tasks


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/8ecf3950
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/8ecf3950
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/8ecf3950

Branch: refs/heads/master
Commit: 8ecf3950ac1c158e6cac815c596ae0906439363b
Parents: 4430f76
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Sep 15 11:24:43 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Sep 18 17:10:55 2017 +0100

----------------------------------------------------------------------
 .../core/objs/AbstractConfigurationSupportInternal.java | 12 ++----------
 .../brooklyn/core/objs/BrooklynObjectInternal.java      |  2 ++
 2 files changed, 4 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8ecf3950/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
index 460c8c4..f67f1f5 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
@@ -106,8 +106,7 @@ public abstract class AbstractConfigurationSupportInternal implements BrooklynOb
             }
         };
 
-        // TODO can we remove the DST ?  this is structured so maybe not
-        Task<T> t = Tasks.<T>builder().body(job)
+        Task<T> t = Tasks.<T>builder().dynamic(false).body(job)
                 .displayName("Resolving config "+key.getName())
                 .description("Internal non-blocking structured key resolution")
                 .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
@@ -126,20 +125,13 @@ public abstract class AbstractConfigurationSupportInternal implements BrooklynOb
      * See {@link #getNonBlockingResolvingStructuredKey(ConfigKey)}.
      */
     protected <T> Maybe<T> getNonBlockingResolvingSimple(ConfigKey<T> key) {
-        // TODO See AbstractConfigMapImpl.getConfigImpl, for how it looks up the "container" of the
-        // key, so that it gets the right context entity etc.
-
-        // getRaw returns Maybe(val) if the key was explicitly set (where val can be null)
-        // or Absent if the config key was unset.
         Object unresolved = getRaw(key).or(key.getDefaultValue());
-        // TODO add description that we are evaluating this config key to be used if the code below submits futher tasks
-        // and look at other uses of "description" method
-        // and make sure it is marked transient
         Maybe<Object> resolved = Tasks.resolving(unresolved)
                 .as(Object.class)
                 .immediately(true)
                 .deep(true)
                 .context(getContext())
+                .description("Resolving raw value of simple config "+key)
                 .getMaybe();
         if (resolved.isAbsent()) return Maybe.Absent.<T>castAbsent(resolved);
         

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8ecf3950/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
index 6ad42f4..972612b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
@@ -96,7 +96,9 @@ public interface BrooklynObjectInternal extends BrooklynObject, Rebindable {
         /**
          * Returns the uncoerced value for this config key, if available, not taking any default.
          * If there is no local value and there is an explicit inherited value, will return the inherited.
+         * May return a {@link Maybe}-wrapped null if the value is explicitly null.
          * Returns {@link Maybe#absent()} if the key is not explicitly set on this object or an ancestor.
+         * Often this is used with {@link Maybe#or(Object))} to return default value.
          * <p>
          * See also {@link #getLocalRaw(ConfigKey).
          */


[05/23] brooklyn-server git commit: fix visibility: entity init runs in same thread

Posted by he...@apache.org.
fix visibility: entity init runs in same thread


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/84d24d1a
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/84d24d1a
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/84d24d1a

Branch: refs/heads/master
Commit: 84d24d1a949275e6bad3bbabefe6bf422f9fade7
Parents: d4c9fe1
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Sep 19 13:09:39 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Sep 19 13:12:11 2017 +0100

----------------------------------------------------------------------
 .../apache/brooklyn/core/objs/proxy/InternalEntityFactory.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/84d24d1a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
index a14225b..bd6cc1d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
@@ -309,7 +309,7 @@ public class InternalEntityFactory extends InternalFactory {
          * which currently show up at the top level once the initializer task completes.
          * TODO It would be nice if these schedule tasks were grouped in a bucket! 
          */
-        ((EntityInternal)entity).getExecutionContext().submit(Tasks.builder().dynamic(false).displayName("Entity initialization")
+        ((EntityInternal)entity).getExecutionContext().get(Tasks.builder().dynamic(false).displayName("Entity initialization")
                 .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
                 .body(new Runnable() {
             @Override
@@ -354,7 +354,7 @@ public class InternalEntityFactory extends InternalFactory {
                     initEntityAndDescendants(child.getId(), entitiesByEntityId, specsByEntityId);
                 }
             }
-        }).build()).getUnchecked();
+        }).build());
     }
     
     /**