You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/01/20 22:04:36 UTC

[2/6] incubator-brooklyn git commit: clearer semantics for cancel, and cancel dependent submitted tasks by default

clearer semantics for cancel, and cancel dependent submitted tasks by default

prevents leaks where dependent tasks (eg resolveValue) are submitted in the background,
then the caller is cancelled; the interruption was not propagated.  now by default it is,
to children tasks and to submitted transients, with options for other (weaker and stronger) cancellations.
see TaskInternal.cancel(TaskCancellationOptions), and new tests for cancelling children in DynamicSequentialTaskTest.

also remove deprecated loose-typing ExecutionUtils.invoke,
more TRACE logging for activities,
and wrap batch config in a task so more resolutions are nested


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/aed07863
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/aed07863
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/aed07863

Branch: refs/heads/master
Commit: aed078633aaf40a4a9da907469700808a245cc78
Parents: 2f3e465
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue Jan 19 12:48:15 2016 +0000
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Jan 20 16:19:53 2016 +0000

----------------------------------------------------------------------
 brooklyn-docs/guide/misc/release-notes.md       |   5 +
 .../java/org/apache/brooklyn/api/mgmt/Task.java |  18 +++
 .../spi/dsl/BrooklynDslDeferredSupplier.java    |   2 +-
 .../core/objs/proxy/InternalEntityFactory.java  |   6 +-
 .../core/sensor/DependentConfiguration.java     |   3 +-
 .../util/core/task/BasicExecutionContext.java   |  12 +-
 .../util/core/task/BasicExecutionManager.java   | 121 ++++++++++++++++---
 .../brooklyn/util/core/task/BasicTask.java      |  32 +++--
 .../util/core/task/DynamicSequentialTask.java   |  41 +++++--
 .../brooklyn/util/core/task/ExecutionUtils.java |  49 --------
 .../core/task/ListenableForwardingFuture.java   |  28 ++++-
 .../brooklyn/util/core/task/ScheduledTask.java  |  14 +--
 .../brooklyn/util/core/task/TaskInternal.java   |  39 ++++++
 .../brooklyn/util/core/task/TaskPredicates.java |  16 +++
 .../task/BasicTaskExecutionPerformanceTest.java |   3 -
 .../core/task/DynamicSequentialTaskTest.java    |  96 ++++++++++++++-
 .../core/task/NonBasicTaskExecutionTest.java    |   5 +
 .../util/core/task/ScheduledExecutionTest.java  |   2 +-
 .../util/core/task/TaskPredicatesTest.java      |   4 +-
 .../rest/resources/EntityConfigResource.java    |  34 ++++--
 20 files changed, 409 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-docs/guide/misc/release-notes.md
----------------------------------------------------------------------
diff --git a/brooklyn-docs/guide/misc/release-notes.md b/brooklyn-docs/guide/misc/release-notes.md
index 03ba4a3..721655d 100644
--- a/brooklyn-docs/guide/misc/release-notes.md
+++ b/brooklyn-docs/guide/misc/release-notes.md
@@ -49,3 +49,8 @@ parent or application root in YAML.
 
 For changes in prior versions, please refer to the release notes for 
 [0.8.0](/v/0.8.0-incubating/misc/release-notes.html).
+
+3. Task cancellation is now propagated to dependent submitted tasks, including backgrounded tasks if they are transient.
+Previously when a task was cancelled the API did not guarantee semantics but the behaviour was to cancel sub-tasks only 
+in very limited cases. Now the semantics are more precise and controllable, and more sub-tasks are cancelled.
+This can prevent some leaked waits on `attributeWhenReady`.

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java b/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
index c8f1c00..42147c5 100644
--- a/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
+++ b/brooklyn-server/api/src/main/java/org/apache/brooklyn/api/mgmt/Task.java
@@ -81,6 +81,24 @@ public interface Task<T> extends ListenableFuture<T>, TaskAdaptable<T> {
     public boolean isError();
 
     /**
+     * As {@link Future#isDone()}. In particular if cancelled, this will return true
+     * as soon as it is cancelled. The thread for this task may still be running,
+     * if the cancellation (often an interruption, but may be weaker) has not applied,
+     * and submitted threads may also be running depending on cancellation parameters.
+     * <p>
+     * {@link #get()} is guaranteed to return immediately, throwing in the case of cancellation
+     * prior to completion (and including the case above where a thread may still be running).
+     * <p>
+     * To check whether cancelled threads for this task have completed, 
+     * inspect {@link #getEndTimeUtc()}, which is guaranteed to be set when threads complete
+     * if the thread is started (as determinable by whether {@link #getStartTimeUtc()} is set).
+     * (The threads of submitted/child tasks will usually be independent; to determine their
+     * completion requires inspecting the {@link ExecutionManager}.)  
+     */
+    @Override
+    public boolean isDone();
+    
+    /**
      * Causes calling thread to block until the task is started.
      */
     public void blockUntilStarted();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
index a417e32..48a0283 100644
--- a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
+++ b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
@@ -115,7 +115,7 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier
         
         try {
             if (log.isDebugEnabled())
-                log.debug("Queuing task to resolve "+dsl);
+                log.debug("Queuing task to resolve "+dsl+", called by "+Tasks.current());
 
             EntityInternal entity = (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
             ExecutionContext exec =

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
index 8914ca4..eb4ff10 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
@@ -307,12 +307,16 @@ public class InternalEntityFactory extends InternalFactory {
 
         /* Marked transient so that the task is not needlessly kept around at the highest level.
          * Note that the task is not normally visible in the GUI, because 
-         * (a) while it is running, the entity is parentless (and so not in the tree);
+         * (a) while it is running, the entity is often parentless (and so not in the tree);
          * and (b) when it is completed it is GC'd, as it is transient.
          * However task info is available via the API if you know its ID,
          * and if better subtask querying is available it will be picked up as a background task 
          * of the parent entity creating this child entity
          * (note however such subtasks are currently filtered based on parent entity so is excluded).
+         * <p>
+         * Some of these (initializers and enrichers) submit background scheduled tasks,
+         * which currently show up at the top level once the initializer task completes.
+         * TODO It would be nice if these schedule tasks were grouped in a bucket! 
          */
         ((EntityInternal)entity).getExecutionContext().submit(Tasks.builder().dynamic(false).displayName("Entity initialization")
                 .tag(BrooklynTaskTags.tagForContextEntity(entity))

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
index ac4bef5..0c622c3 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
@@ -248,7 +248,7 @@ public class DependentConfiguration {
 
             // return immediately if either the ready predicate or the abort conditions hold
             if (ready(value)) return postProcess(value);
-
+            
             final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList();
             long start = System.currentTimeMillis();
             
@@ -790,6 +790,7 @@ public class DependentConfiguration {
                 .displayName("waiting on "+sensor.getName())
                 .description("Waiting on sensor "+sensor.getName()+" from "+source)
                 .tag("attributeWhenReady")
+                .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
                 .body(new WaitInTaskForAttributeReady<T,V>(this))
                 .build();
         }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 13dda46..74e0ddd 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -175,18 +175,18 @@ public class BasicExecutionContext extends AbstractExecutionContext {
         }
         
         final Object startCallback = properties.get("newTaskStartCallback");
-        properties.put("newTaskStartCallback", new Function<Object,Void>() {
-            public Void apply(Object it) {
+        properties.put("newTaskStartCallback", new Function<Task<?>,Void>() {
+            public Void apply(Task<?> it) {
                 registerPerThreadExecutionContext();
-                if (startCallback!=null) ExecutionUtils.invoke(startCallback, it);
+                if (startCallback!=null) BasicExecutionManager.invokeCallback(startCallback, it);
                 return null;
             }});
         
         final Object endCallback = properties.get("newTaskEndCallback");
-        properties.put("newTaskEndCallback", new Function<Object,Void>() {
-            public Void apply(Object it) {
+        properties.put("newTaskEndCallback", new Function<Task<?>,Void>() {
+            public Void apply(Task<?> it) {
                 try {
-                    if (endCallback!=null) ExecutionUtils.invoke(endCallback, it);
+                    if (endCallback!=null) BasicExecutionManager.invokeCallback(endCallback, it);
                 } finally {
                     clearPerThreadExecutionContext();
                 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index d90b1a1..2010613 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -19,6 +19,7 @@
 package org.apache.brooklyn.util.core.task;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import groovy.lang.Closure;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -51,16 +52,22 @@ import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.mgmt.TaskAdaptable;
 import org.apache.brooklyn.core.BrooklynFeatureEnablement;
 import org.apache.brooklyn.core.config.Sanitizer;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.text.Identifiers;
+import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ExecutionList;
@@ -368,7 +375,6 @@ public class BasicExecutionManager implements ExecutionManager {
         return submitSubsequentScheduledTask(flags, task);
     }
     
-    @SuppressWarnings("unchecked")
     protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
         if (!task.isDone()) {
             task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags),
@@ -508,9 +514,15 @@ public class BasicExecutionManager implements ExecutionManager {
                      */
                     if (log.isDebugEnabled()) {
                         // debug only here, because most submitters will handle failures
-                        log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error);
-                        if (log.isTraceEnabled())
-                            log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error);
+                        if (error instanceof InterruptedException || error instanceof RuntimeInterruptedException) {
+                            log.debug("Detected interruption on task "+task+" (rethrowing)" +
+                                (Strings.isNonBlank(error.getMessage()) ? ": "+error.getMessage() : ""));
+                        } else {
+                            log.debug("Exception running task "+task+" (rethrowing): "+error);
+                        }
+                        if (log.isTraceEnabled()) {
+                            log.trace("Trace for exception running task "+task+" (rethrowing): "+error, error);
+                        }
                     }
                     throw Exceptions.propagate(error);
                 }
@@ -526,19 +538,63 @@ public class BasicExecutionManager implements ExecutionManager {
         }
     }
 
-    private final static class ListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
+    @SuppressWarnings("deprecation")
+    // TODO do we even need a listenable future here?  possibly if someone wants to interrogate the future it might
+    // be interesting, so possibly it is useful that we implement ListenableFuture...
+    private final static class CancellingListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
         private final Task<T> task;
+        private BasicExecutionManager execMgmt;
 
-        private ListenableForwardingFutureForTask(Future<T> delegate, ExecutionList list, Task<T> task) {
+        private CancellingListenableForwardingFutureForTask(BasicExecutionManager execMgmt, Future<T> delegate, ExecutionList list, Task<T> task) {
             super(delegate, list);
+            this.execMgmt = execMgmt;
             this.task = task;
         }
 
         @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
+        public boolean cancel(TaskCancellationMode mode) {
             boolean result = false;
-            if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning);
-            result |= super.cancel(mayInterruptIfRunning);
+            if (log.isTraceEnabled()) {
+                log.trace("CLFFT cancelling "+task+" mode "+mode);
+            }
+            if (!task.isCancelled()) result |= ((TaskInternal<T>)task).cancel(mode);
+            result |= delegate().cancel(mode.isAllowedToInterruptTask());
+            
+            if (mode.isAllowedToInterruptAllSubmittedTasks() || mode.isAllowedToInterruptDependentSubmittedTasks()) {
+                int subtasksFound=0;
+                int subtasksReallyCancelled=0;
+                
+                if (task instanceof HasTaskChildren) {
+                    for (Task<?> child: ((HasTaskChildren)task).getChildren()) {
+                        if (log.isTraceEnabled()) {
+                            log.trace("Cancelling "+child+" on recursive cancellation of "+task);
+                        }
+                        subtasksFound++;
+                        if (((TaskInternal<?>)child).cancel(mode)) {
+                            result = true;
+                            subtasksReallyCancelled++;
+                        }
+                    }
+                }
+                for (Task<?> t: execMgmt.getAllTasks()) {
+                    if (task.equals(t.getSubmittedByTask())) {
+                        if (mode.isAllowedToInterruptAllSubmittedTasks() || BrooklynTaskTags.isTransient(t)) {
+                            if (log.isTraceEnabled()) {
+                                log.trace("Cancelling "+t+" on recursive cancellation of "+task);
+                            }
+                            subtasksFound++;
+                            if (((TaskInternal<?>)t).cancel(mode)) {
+                                result = true;
+                                subtasksReallyCancelled++;
+                            }
+                        }
+                    }
+                }
+                if (log.isTraceEnabled()) {
+                    log.trace("On cancel of "+task+", applicable subtask count "+subtasksFound+", of which "+subtasksReallyCancelled+" were actively cancelled");
+                }
+            }
+            
             ((TaskInternal<?>)task).runListeners();
             return result;
         }
@@ -571,9 +627,15 @@ public class BasicExecutionManager implements ExecutionManager {
 
     @SuppressWarnings("unchecked")
     protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) {
-        if (log.isTraceEnabled()) log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}", 
+        if (log.isTraceEnabled()) {
+            log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}; caller {}", 
                 new Object[] {task.getId(), task, Sanitizer.sanitize(flags), task.getTags(), 
-                (task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>")});
+                (task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>"),
+                Tasks.current() });
+            if (Tasks.current()==null && BrooklynTaskTags.isTransient(task)) {
+                log.trace("Stack trace for unparented submission of transient "+task, new Throwable("trace only (not an error)"));
+            }
+        }
         
         if (task instanceof ScheduledTask)
             return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task);
@@ -604,15 +666,16 @@ public class BasicExecutionManager implements ExecutionManager {
         } else {
             future = runner.submit(job);
         }
-        // on completion, listeners get triggered above; here, below we ensure they get triggered on cancel
-        // (and we make sure the same ExecutionList is used in the future as in the task)
-        ListenableFuture<T> listenableFuture = new ListenableForwardingFutureForTask<T>(future, ((TaskInternal<T>)task).getListeners(), task);
-        // doesn't matter whether the listener is added to the listenableFuture or the task,
-        // except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown
-        // (avoid a bunch of ugly warnings in tests which start and stop things a lot!)
-        // [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement]
+        // SubmissionCallable (above) invokes the listeners on completion;
+        // this future allows a caller to add custom listeners
+        // (it does not notify the listeners; that's our job);
+        // except on cancel we want to listen
+        ListenableFuture<T> listenableFuture = new CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>)task).getListeners(), task);
+        // and we want to make sure *our* (manager) listeners are given suitable callback 
         ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner);
+        // NB: can the above mean multiple callbacks to TaskInternal#runListeners?
         
+        // finally expose the future to callers
         ((TaskInternal<T>)task).initInternalFuture(listenableFuture);
         
         return task;
@@ -665,9 +728,27 @@ public class BasicExecutionManager implements ExecutionManager {
             PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
             ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
         }
-        ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task);
+        invokeCallback(flags.get("newTaskStartCallback"), task);
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    // not ideal, such loose typing on the callback -- should prefer Function<Task,Object>
+    // but at least it's package-private
+    static Object invokeCallback(Object callable, Task<?> task) {
+        if (callable instanceof Closure) return ((Closure<?>)callable).call(task);
+        if (callable instanceof Callable) {
+            try {
+                return ((Callable<?>)callable).call();
+            } catch (Throwable t) {
+                throw Throwables.propagate(t);
+            }
+        }
+        if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
+        if (callable instanceof Function) { return ((Function)callable).apply(task); }
+        if (callable==null) return null;
+        throw new IllegalArgumentException("Cannot invoke unexpected callback object "+callable+" of type "+callable.getClass()+" on "+task);
+    }
+    
     /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
     protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
         internalAfterEnd(flags, task, false, true);
@@ -693,7 +774,7 @@ public class BasicExecutionManager implements ExecutionManager {
         }
         if (isEndingAllIterations) {
             incompleteTaskIds.remove(task.getId());
-            ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task);
+            invokeCallback(flags.get("newTaskEndCallback"), task);
             ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
index 0c26dd1..c727d10 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
@@ -155,7 +155,7 @@ public class BasicTask<T> implements TaskInternal<T> {
             (Strings.isNonEmpty(displayName) ? 
                 displayName : 
                 (job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) +
-            ":"+getId()+"]";
+            "]@"+getId();
     }
 
     @Override
@@ -196,7 +196,7 @@ public class BasicTask<T> implements TaskInternal<T> {
     protected Maybe<Task<?>> submittedByTask;
 
     protected volatile Thread thread = null;
-    private volatile boolean cancelled = false;
+    protected volatile boolean cancelled = false;
     /** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */
     protected volatile Future<T> internalFuture = null;
     
@@ -288,15 +288,33 @@ public class BasicTask<T> implements TaskInternal<T> {
     }
     
     @Override
-    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+    public final synchronized boolean cancel(boolean mayInterruptIfRunning) {
+        // semantics changed in 2016-01, previously "true" was INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS
+        return cancel(mayInterruptIfRunning ? TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS
+            : TaskCancellationMode.DO_NOT_INTERRUPT);
+    }
+    
+    public synchronized boolean cancel(TaskCancellationMode mode) {
         if (isDone()) return false;
-        boolean cancel = true;
+        if (log.isTraceEnabled()) {
+            log.trace("BT cancelling "+this+" mode "+mode);
+        }
         cancelled = true;
+        doCancel(mode);
+        notifyAll();
+        return true;
+    }
+    
+    @SuppressWarnings("deprecation")
+    protected boolean doCancel(TaskCancellationMode mode) {
         if (internalFuture!=null) { 
-            cancel = internalFuture.cancel(mayInterruptIfRunning);
+            if (internalFuture instanceof ListenableForwardingFuture) {
+                return ((ListenableForwardingFuture<?>)internalFuture).cancel(mode);
+            } else {
+                return internalFuture.cancel(mode.isAllowedToInterruptTask());
+            }
         }
-        notifyAll();
-        return cancel;
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
index b7985c8..51a4e34 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
@@ -158,27 +158,44 @@ public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChi
     }
 
     @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-        return cancel(mayInterruptIfRunning, mayInterruptIfRunning, true);
+    protected boolean doCancel(TaskCancellationMode mode) {
+        boolean result = false;
+        if (mode.isAllowedToInterruptDependentSubmittedTasks() || mode.isAllowedToInterruptAllSubmittedTasks()) {
+            for (Task<?> t: secondaryJobsAll)
+                result = ((TaskInternal<?>)t).cancel(mode) || result;
+        }
+        return super.doCancel(mode) || result;
+        // returns true if anything is successfully cancelled
+    }
+    
+    public boolean cancel(TaskCancellationMode mode) {
+        return cancel(mode, null);
     }
-    public boolean cancel(boolean mayInterruptTask, boolean interruptPrimaryThread, boolean alsoCancelChildren) {
+    
+    protected boolean cancel(TaskCancellationMode mode, Boolean interruptPrimaryThreadOverride) {
         if (isDone()) return false;
-        if (log.isTraceEnabled()) log.trace("cancelling {}", this);
-        boolean cancel = super.cancel(mayInterruptTask);
-        if (alsoCancelChildren) {
-            for (Task<?> t: secondaryJobsAll)
-                cancel |= t.cancel(mayInterruptTask);
+        if (log.isTraceEnabled()) log.trace("cancelling DST {}", this);
+        
+        // first do the super's cancel, setting cancelled, and calling doCancel to cancel children
+        boolean result = super.cancel(mode);
+        // then come back and ensure our primary thread is cancelled if needed
+        
+        if (interruptPrimaryThreadOverride==null) interruptPrimaryThreadOverride = mode.isAllowedToInterruptTask();
+        if (log.isTraceEnabled()) {
+            log.trace("DST cancelling "+this+" mode "+mode+", interruptPrimary "+interruptPrimaryThreadOverride);
         }
+
         synchronized (jobTransitionLock) {
             if (primaryThread!=null) {
-                if (interruptPrimaryThread) {
+                if (interruptPrimaryThreadOverride) {
                     if (log.isTraceEnabled()) log.trace("cancelling {} - interrupting", this);
                     primaryThread.interrupt();
                 }
-                cancel = true;
+                result = true;
             }
         }
-        return cancel;
+        
+        return result;
     }
     
     @Override
@@ -309,7 +326,7 @@ public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChi
                                         }
 
                                         if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) {
-                                            cancel(true, false, false);
+                                            cancel(TaskCancellationMode.INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS, false);
                                         }
                                         
                                         result.add(Tasks.getError(secondaryJob));

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ExecutionUtils.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ExecutionUtils.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ExecutionUtils.java
deleted file mode 100644
index 72a5ae4..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ExecutionUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.util.core.task;
-
-import groovy.lang.Closure;
-
-import java.util.concurrent.Callable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Throwables;
-
-public class ExecutionUtils {
-    /**
-     * Attempts to run/call the given object, with the given arguments if possible, preserving the return value if there is one (null otherwise);
-     * throws exception if the callable is a non-null object which cannot be invoked (not a callable or runnable)
-     * @deprecated since 0.7.0 ; this super-loose typing should be avoided; if it is needed, let's move it to one of the Groovy compatibility classes
-     */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    public static Object invoke(Object callable, Object ...args) {
-        if (callable instanceof Closure) return ((Closure<?>)callable).call(args);
-        if (callable instanceof Callable) {
-            try {
-                return ((Callable<?>)callable).call();
-            } catch (Throwable t) {
-                throw Throwables.propagate(t);
-            }
-        }
-        if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
-        if (callable instanceof Function && args.length == 1) { return ((Function)callable).apply(args[0]); }
-        if (callable==null) return null;
-        throw new IllegalArgumentException("Cannot invoke unexpected object "+callable+" of type "+callable.getClass()+", with "+args.length+" args");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
index 4ce56d1..cbc474c 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ListenableForwardingFuture.java
@@ -21,15 +21,26 @@ package org.apache.brooklyn.util.core.task;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 
+import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.util.concurrent.ExecutionList;
 import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 
-/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the resposibility to:
+/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the responsibility to:
  * <li> invoke the listeners on job completion (success or error)
- * <li> invoke the listeners on cancel */
+ * <li> invoke the listeners on cancel
+ * 
+ * @deprecated since 0.9.0 likely to leave the public API */
+@Deprecated  // TODO just one subclass, it can hold the behaviour we need from this, 
+// and the methods here are surprising as they expect the caller to notify the list
 public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> {
 
+    private static final Logger log = LoggerFactory.getLogger(ListenableForwardingFuture.class);
+    
+    // TODO these are never accessed or used
     final ExecutionList listeners;
     
     protected ListenableForwardingFuture(Future<T> delegate) {
@@ -42,9 +53,22 @@ public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFutu
         this.listeners = list;
     }
 
+    private static boolean warned = false;
+    
     @Override
     public void addListener(Runnable listener, Executor executor) {
+        if (!warned) {
+            log.warn("Use of deprecated ListenableForwardingFuture.addListener at "+this+" (future calls will not be logged)", new Throwable("stack trace"));
+            warned = true;
+        }
+        
         listeners.add(listener, executor);
     }
     
+    public abstract boolean cancel(TaskCancellationMode mode);
+    
+    public final boolean cancel(boolean mayInterrupt) {
+        return cancel(TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS);
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
index c1ad4f8..219f4f8 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
@@ -43,7 +43,7 @@ import com.google.common.base.Throwables;
  */
 // TODO ScheduledTask is a very pragmatic implementation; would be nice to tighten, 
 // reduce external assumptions about internal structure, and clarify "done" semantics
-public class ScheduledTask extends BasicTask {
+public class ScheduledTask extends BasicTask<Object> {
     
     final Callable<Task<?>> taskFactory;
 
@@ -84,7 +84,7 @@ public class ScheduledTask extends BasicTask {
         this(MutableMap.of(), task);
     }
 
-    public ScheduledTask(Map flags, final Task<?> task){
+    public ScheduledTask(Map<?,?> flags, final Task<?> task){
         this(flags, new Callable<Task<?>>(){
             @Override
             public Task<?> call() throws Exception {
@@ -92,7 +92,7 @@ public class ScheduledTask extends BasicTask {
             }});
     }
 
-    public ScheduledTask(Map flags, Callable<Task<?>> taskFactory) {
+    public ScheduledTask(Map<?,?> flags, Callable<Task<?>> taskFactory) {
         super(flags);
         this.taskFactory = taskFactory;
         
@@ -194,13 +194,11 @@ public class ScheduledTask extends BasicTask {
     }
     
     @Override
-    public synchronized boolean cancel(boolean mayInterrupt) {
-        boolean result = super.cancel(mayInterrupt);
+    protected boolean doCancel(org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode mode) {
         if (nextRun!=null) {
-            nextRun.cancel(mayInterrupt);
-            notifyAll();
+            ((TaskInternal<?>)nextRun).cancel(mode);
         }
-        return result;
+        return super.doCancel(mode);
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
index 2bf0fec..99c2773 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
@@ -29,6 +29,7 @@ import org.apache.brooklyn.util.time.Duration;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Function;
+import com.google.common.base.Objects;
 import com.google.common.util.concurrent.ExecutionList;
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -95,6 +96,8 @@ public interface TaskInternal<T> extends Task<T> {
     
     Object getExtraStatusText();
 
+    /** On task completion (or cancellation) runs the listeners which have been registered using 
+     * {@link #addListener(Runnable, java.util.concurrent.Executor)}. */
     void runListeners();
 
     void setEndTimeUtc(long val);
@@ -120,5 +123,41 @@ public interface TaskInternal<T> extends Task<T> {
     /** if a task is a proxy for another one (used mainly for internal tasks),
      * this returns the "real" task represented by this one */
     Task<?> getProxyTarget();
+
+    /** clearer semantics around cancellation; may be promoted to {@link Task} if we  */
+    @Beta
+    public boolean cancel(TaskCancellationMode mode);
+    
+    @Beta
+    public static class TaskCancellationMode {
+        public static final TaskCancellationMode DO_NOT_INTERRUPT = new TaskCancellationMode(false, false, false);
+        public static final TaskCancellationMode INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS = new TaskCancellationMode(true, false, false);
+        public static final TaskCancellationMode INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS = new TaskCancellationMode(true, true, false);
+        public static final TaskCancellationMode INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS = new TaskCancellationMode(true, true, true);
+        
+        private final boolean allowedToInterruptTask, 
+            allowedToInterruptDependentSubmittedTasks, 
+            allowedToInterruptAllSubmittedTasks;
+        
+        private TaskCancellationMode(boolean mayInterruptIfRunning, boolean interruptSubmittedTransients, boolean interruptAllSubmitted) {
+            this.allowedToInterruptTask = mayInterruptIfRunning;
+            this.allowedToInterruptDependentSubmittedTasks = interruptSubmittedTransients;
+            this.allowedToInterruptAllSubmittedTasks = interruptAllSubmitted;
+        }
+        
+        public boolean isAllowedToInterruptTask() { return allowedToInterruptTask; }
+        /** Implementation-dependent what "dependent" means in this context, 
+         * e.g. may be linked to a "transient" tag (that's what Brooklyn does) */ 
+        public boolean isAllowedToInterruptDependentSubmittedTasks() { return allowedToInterruptDependentSubmittedTasks; }
+        public boolean isAllowedToInterruptAllSubmittedTasks() { return allowedToInterruptAllSubmittedTasks; }
+        
+        @Override
+        public String toString() {
+            return Objects.toStringHelper(this).add("interruptTask", allowedToInterruptTask)
+                .add("interruptDependentSubmitted", allowedToInterruptDependentSubmittedTasks)
+                .add("interruptAllSubmitted", allowedToInterruptAllSubmittedTasks)
+                .toString();
+        }
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java
index 8e46002..d8d3764 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/TaskPredicates.java
@@ -60,4 +60,20 @@ public class TaskPredicates {
             return "displayNameMatches("+matcher+")";
         }
     }
+    
+    public static Predicate<Task<?>> isDone() {
+        return new IsDone();
+    }
+    
+    private static class IsDone implements Predicate<Task<?>> {
+        @Override
+        public boolean apply(Task<?> input) {
+            return input.isDone();
+        }
+        @Override
+        public String toString() {
+            return "isDone()";
+        }
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java
index a291c53..e8e7890 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/BasicTaskExecutionPerformanceTest.java
@@ -70,7 +70,6 @@ public class BasicTaskExecutionPerformanceTest {
         if (em != null) em.shutdownNow();
     }
     
-    @SuppressWarnings("unchecked")
     @Test
     public void testScheduledTaskExecutedAfterDelay() throws Exception {
         int delay = 100;
@@ -95,7 +94,6 @@ public class BasicTaskExecutionPerformanceTest {
         assertTrue(actualDelay < (delay+MAX_OVERHEAD_MS), "actualDelay="+actualDelay+"; delay="+delay);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testScheduledTaskExecutedAtRegularPeriod() throws Exception {
         final int period = 100;
@@ -127,7 +125,6 @@ public class BasicTaskExecutionPerformanceTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testCanCancelScheduledTask() throws Exception {
         final int period = 1;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
index ceff29f..95f0952 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/DynamicSequentialTaskTest.java
@@ -30,10 +30,13 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.brooklyn.api.mgmt.HasTaskChildren;
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.CollectionFunctionals;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.math.MathPredicates;
 import org.apache.brooklyn.util.time.CountdownTimer;
@@ -49,6 +52,8 @@ import org.testng.annotations.Test;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -154,7 +159,17 @@ public class DynamicSequentialTaskTest {
     }
     
     public Task<String> sayTask(String message, Duration duration, String message2) {
-        return Tasks.<String>builder().body(sayCallable(message, duration, message2)).build();
+        return Tasks.<String>builder().displayName("say:"+message).body(sayCallable(message, duration, message2)).build();
+    }
+    
+    public <T> Task<T> submitting(final Task<T> task) {
+        return Tasks.<T>builder().displayName("submitting:"+task.getId()).body(new Callable<T>() {
+            @Override
+            public T call() throws Exception {
+                ec.submit(task);
+                return task.get();
+            }
+        }).build();
     }
     
     @Test
@@ -207,6 +222,85 @@ public class DynamicSequentialTaskTest {
         // but we do _not_ get a mutex from task3 as it does not run (is not interrupted)
         Assert.assertEquals(cancellations.availablePermits(), 0);
     }
+    
+    @Test
+    public void testCancellationModeAndSubmitted() throws Exception {
+        doTestCancellationModeAndSubmitted(true, TaskCancellationMode.DO_NOT_INTERRUPT, false, false);
+        
+        doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS, true, true);
+        doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS, true, true);
+        doTestCancellationModeAndSubmitted(true, TaskCancellationMode.INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS, true, false);
+        
+        // if it's not transient, it should only be cancelled on "all submitted"
+        doTestCancellationModeAndSubmitted(false, TaskCancellationMode.INTERRUPT_TASK_AND_DEPENDENT_SUBMITTED_TASKS, true, false);
+        doTestCancellationModeAndSubmitted(false, TaskCancellationMode.INTERRUPT_TASK_AND_ALL_SUBMITTED_TASKS, true, true);
+        
+        // cancellation mode left off should be the same as TASK_AND_DEPENDENT, i.e. don't cancel non-transient bg submitted
+        doTestCancellationModeAndSubmitted(true, null, true, true);
+        doTestCancellationModeAndSubmitted(false, null, true, false);
+        // and 'true' should be the same
+        doTestCancellationModeAndSubmitted(true, true, true, true);
+        doTestCancellationModeAndSubmitted(false, true, true, false);
+        
+        // cancellation mode false should be the same as DO_NOT_INTERRUPT
+        doTestCancellationModeAndSubmitted(true, false, false, false);
+    }
+    
+    public void doTestCancellationModeAndSubmitted(
+            boolean isSubtaskTransient,
+            Object cancellationMode,
+            boolean expectedTaskInterrupted,
+            boolean expectedSubtaskCancelled
+            ) throws Exception {
+        tearDown(); setUp();
+        
+        final Task<String> t1 = sayTask("1-wait", Duration.minutes(10), "1-done");
+        if (isSubtaskTransient) {
+            BrooklynTaskTags.addTagDynamically(t1, BrooklynTaskTags.TRANSIENT_TASK_TAG);
+        }
+        
+        final Task<List<?>> t = Tasks.parallel(
+                submitting(t1),
+                sayTask("2-wait", Duration.minutes(10), "2-done"));
+        ec.submit(t);
+        
+        waitForMessages(Predicates.compose(MathPredicates.greaterThanOrEqual(2), CollectionFunctionals.sizeFunction()), TIMEOUT);
+        Asserts.assertEquals(MutableSet.copyOf(messages), MutableSet.of("1-wait", "2-wait"));
+
+        Time.sleep(Duration.millis(400));
+        
+        if (cancellationMode==null) {
+            ((TaskInternal<?>)t).cancel();
+        } else if (cancellationMode instanceof Boolean) {
+            t.cancel((Boolean)cancellationMode);
+        } else if (cancellationMode instanceof TaskCancellationMode) {
+            ((TaskInternal<?>)t).cancel((TaskCancellationMode)cancellationMode);
+        }
+
+        // the cancelled task always reports cancelled and done
+        Assert.assertEquals(t.isDone(), true);
+        Assert.assertEquals(t.isCancelled(), true);
+        // end time might not be set for another fraction of a second
+        if (expectedTaskInterrupted) { 
+            Asserts.eventually(new Supplier<Number>() {
+                @Override public Number get() { return t.getEndTimeUtc(); }}, 
+                MathPredicates.<Number>greaterThanOrEqual(0));
+        } else {
+            Assert.assertTrue(t.getEndTimeUtc() < 0, "Wrong end time: "+t.getEndTimeUtc());
+        }
+        
+        if (expectedSubtaskCancelled) {
+            Asserts.eventually(Suppliers.ofInstance(t1), TaskPredicates.isDone());
+            Assert.assertTrue(t1.isCancelled());
+            Asserts.eventually(new Supplier<Number>() {
+                @Override public Number get() { return t1.getEndTimeUtc(); }}, 
+                MathPredicates.<Number>greaterThanOrEqual(0));
+        } else {
+            Time.sleep(Duration.millis(5));
+            Assert.assertFalse(t1.isCancelled());
+            Assert.assertFalse(t1.isDone());
+        }
+    }
 
     protected void waitForMessages(Predicate<? super List<String>> predicate, Duration timeout) throws Exception {
         long endtime = System.currentTimeMillis() + timeout.toMilliseconds();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java
index 1d1c3af..6fd3021 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/NonBasicTaskExecutionTest.java
@@ -64,6 +64,11 @@ public class NonBasicTaskExecutionTest {
         protected TaskInternal<T> delegate() {
             return delegate;
         }
+
+        @Override
+        public boolean cancel(TaskCancellationMode mode) {
+            return delegate.cancel(mode);
+        }
     }
     
     private BasicExecutionManager em;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
index 1d551e8..5c11355 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
@@ -45,7 +45,7 @@ import org.testng.annotations.Test;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 
-@SuppressWarnings({"unchecked","rawtypes"})
+@SuppressWarnings({"rawtypes"})
 public class ScheduledExecutionTest {
 
     public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
index fce6f0f..8a25361 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java
@@ -57,8 +57,8 @@ public class TaskPredicatesTest extends BrooklynAppUnitTestSupport {
                 .body(Callables.<Object>returning("val"))
                 .displayName("myname")
                 .build());
-        assertTrue(TaskPredicates.displayNameMatches(Predicates.equalTo("myname")).apply(task));
-        assertFalse(TaskPredicates.displayNameMatches(Predicates.equalTo("wrong")).apply(task));
+        assertTrue(TaskPredicates.displayNameSatisfies(Predicates.equalTo("myname")).apply(task));
+        assertFalse(TaskPredicates.displayNameSatisfies(Predicates.equalTo("wrong")).apply(task));
     }
     
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/aed07863/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java b/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
index e483d0b..a6d3b8e 100644
--- a/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
+++ b/brooklyn-server/rest/rest-server/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
@@ -22,6 +22,7 @@ import static com.google.common.collect.Iterables.transform;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.config.ConfigKey;
@@ -35,6 +36,7 @@ import org.apache.brooklyn.rest.filter.HaHotStateRequired;
 import org.apache.brooklyn.rest.transform.EntityTransformer;
 import org.apache.brooklyn.rest.util.WebResourceUtils;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.core.task.ValueResolver;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
@@ -71,14 +73,32 @@ public class EntityConfigResource extends AbstractBrooklynRestResource implement
     public Map<String, Object> batchConfigRead(String application, String entityToken, Boolean raw) {
         // TODO: add test
         Entity entity = brooklyn().getEntity(application, entityToken);
-        Map<ConfigKey<?>, ?> source = ((EntityInternal) entity).config().getBag().getAllConfigAsConfigKeyMap();
-        Map<String, Object> result = Maps.newLinkedHashMap();
-        for (Map.Entry<ConfigKey<?>, ?> ek : source.entrySet()) {
-            Object value = ek.getValue();
-            result.put(ek.getKey().getName(), 
-                resolving(value).preferJson(true).asJerseyOutermostReturnValue(false).raw(raw).context(entity).timeout(Duration.ZERO).renderAs(ek.getKey()).resolve());
+        // wrap in a task for better runtime view
+        return Entities.submit(entity, Tasks.<Map<String,Object>>builder().displayName("REST API batch config read").body(new BatchConfigRead(this, entity, raw)).build()).getUnchecked();
+    }
+    
+    private static class BatchConfigRead implements Callable<Map<String,Object>> {
+        private EntityConfigResource resource;
+        private Entity entity;
+        private Boolean raw;
+
+        public BatchConfigRead(EntityConfigResource resource, Entity entity, Boolean raw) {
+            this.resource = resource;
+            this.entity = entity;
+            this.raw = raw;
+        }
+
+        @Override
+        public Map<String, Object> call() throws Exception {
+            Map<ConfigKey<?>, ?> source = ((EntityInternal) entity).config().getBag().getAllConfigAsConfigKeyMap();
+            Map<String, Object> result = Maps.newLinkedHashMap();
+            for (Map.Entry<ConfigKey<?>, ?> ek : source.entrySet()) {
+                Object value = ek.getValue();
+                result.put(ek.getKey().getName(), 
+                    resource.resolving(value).preferJson(true).asJerseyOutermostReturnValue(false).raw(raw).context(entity).timeout(Duration.ZERO).renderAs(ek.getKey()).resolve());
+            }
+            return result;
         }
-        return result;
     }
 
     @Override