You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2021/09/14 21:56:23 UTC

[brooklyn-server] 04/27: tidy task before/after model, ensure scheduled tasks clear their task context

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 85d965153a491789f1367d3265d690f8e479422c
Author: Alex Heneveld <al...@cloudsoftcorp.com>
AuthorDate: Mon Sep 13 17:08:04 2021 +0100

    tidy task before/after model, ensure scheduled tasks clear their task context
---
 .../util/core/task/BasicExecutionContext.java      |  50 +++--
 .../util/core/task/BasicExecutionManager.java      | 196 ++++++++++------
 .../apache/brooklyn/util/core/task/BasicTask.java  |   3 +-
 .../brooklyn/util/core/task/ScheduledTask.java     |   2 +
 .../util/core/task/ScheduledExecutionTest.java     | 248 +++++++++++++++------
 5 files changed, 338 insertions(+), 161 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index ca9e158..70322ce 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -361,33 +361,41 @@ public class BasicExecutionContext extends AbstractExecutionContext {
 
         taskTags.addAll(tags);
 
-        if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current()) 
+        if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current())
                 && !taskTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) && !taskTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) {
             // tag as transient if submitter is transient, unless explicitly tagged as non-transient
             taskTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
         }
 
-        final Object startCallback = properties.get("newTaskStartCallback");
-        properties.put("newTaskStartCallback", new Function<Task<?>,Void>() {
-            @Override
-            public Void apply(Task<?> it) {
-                registerPerThreadExecutionContext();
-                if (startCallback!=null) BasicExecutionManager.invokeCallback(startCallback, it);
-                return null;
-            }});
-        
-        final Object endCallback = properties.get("newTaskEndCallback");
-        properties.put("newTaskEndCallback", new Function<Task<?>,Void>() {
-            @Override
-            public Void apply(Task<?> it) {
-                try {
-                    if (endCallback!=null) BasicExecutionManager.invokeCallback(endCallback, it);
-                } finally {
-                    clearPerThreadExecutionContext();
+        if (task instanceof ScheduledTask) {
+            // not run for scheduler
+            ((ScheduledTask)task).executionContext = this;
+
+        } else {
+            final Object startCallback = properties.get("newTaskStartCallback");
+            properties.put("newTaskStartCallback", new Function<Task<?>, Void>() {
+                @Override
+                public Void apply(Task<?> it) {
+                    registerPerThreadExecutionContext();
+                    if (startCallback != null) BasicExecutionManager.invokeCallback(startCallback, it);
+                    return null;
                 }
-                return null;
-            }});
-        
+            });
+
+            final Object endCallback = properties.get("newTaskEndCallback");
+            properties.put("newTaskEndCallback", new Function<Task<?>, Void>() {
+                @Override
+                public Void apply(Task<?> it) {
+                    try {
+                        if (endCallback != null) BasicExecutionManager.invokeCallback(endCallback, it);
+                    } finally {
+                        clearPerThreadExecutionContext();
+                    }
+                    return null;
+                }
+            });
+        }
+
         if (task instanceof Task) {
             return executionManager.submit(properties, (Task)task);
         } else if (task instanceof Callable) {
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 29984e2..f1496fc 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.internal.BrooklynLoggingCategories;
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import org.apache.brooklyn.api.mgmt.ExecutionManager;
 import org.apache.brooklyn.api.mgmt.HasTaskChildren;
 import org.apache.brooklyn.api.mgmt.Task;
@@ -69,6 +70,7 @@ import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
 import org.apache.brooklyn.util.text.Identifiers;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.CountdownTimer;
@@ -106,6 +108,8 @@ public class BasicExecutionManager implements ExecutionManager {
     public static final String LOGGING_MDC_KEY_ENTITY_IDS = "entity.ids";
     public static final String LOGGING_MDC_KEY_TASK_ID = "task.id";
 
+    private static final boolean SCHEDULED_TASKS_COUNT_AS_ACTIVE = false;
+
     private boolean jitterThreads = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_JITTER_THREADS);
     private int jitterThreadsMaxDelay = Integer.getInteger(JITTER_THREADS_MAX_DELAY_PROPERTY, 200);
 
@@ -525,10 +529,13 @@ public class BasicExecutionManager implements ExecutionManager {
     }
 
     protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
-        beforeSubmitScheduledTaskAllIterations(flags, task);
-        
-        if (!submitSubsequentScheduledTask(flags, task)) {
-            afterEndScheduledTaskAllIterations(flags, task, null);
+        boolean result = false;
+        try {
+            result = submitSubsequentScheduledTask(flags, task);
+        } finally {
+            if (!result) {
+                afterEndScheduledTaskAllIterations(flags, task, null);
+            }
         }
         return task;
     }
@@ -555,27 +562,39 @@ public class BasicExecutionManager implements ExecutionManager {
         @Override
         @SuppressWarnings({ "rawtypes", "unchecked" })
         public Object call() {
-            if (task.startTimeUtc==-1) {
-                // this is overwritten on each run; not sure if that's best or not
-                task.startTimeUtc = System.currentTimeMillis();
-            }
-            TaskInternal<?> taskScheduled = null;
+            TaskInternal<?> taskIteration = null;
             Throwable error = null;
             try {
-                taskScheduled = (TaskInternal<?>) task.newTask();
-                taskScheduled.setSubmittedByTask(task);
-                beforeStartScheduledTaskSubmissionIteration(flags, task, taskScheduled);
-                final Callable<?> oldJob = taskScheduled.getJob();
-                final TaskInternal<?> taskScheduledF = taskScheduled;
-                taskScheduled.setJob(new Callable() { @Override public Object call() {
+                if (task.startTimeUtc==-1) {
+                    beforeSubmitScheduledTaskAllIterations(flags, task);
+                    beforeStartScheduledTaskAllIterations(flags, task);
+
+                    task.startTimeUtc = System.currentTimeMillis();
+                }
+
+                taskIteration = (TaskInternal<?>) task.newTask();
+                taskIteration.setSubmittedByTask(task);
+
+                beforeSubmitScheduledTaskSubmissionIteration(flags, task);
+
+                final Callable<?> oldJob = taskIteration.getJob();
+                final TaskInternal<?> taskIterationF = taskIteration;
+                taskIteration.setJob(new Callable() { @Override public Object call() {
                     if (task.isCancelled()) {
-                        afterEndScheduledTaskAllIterations(flags, task, new CancellationException("cancel detected"));
-                        throw new CancellationException("cancel detected");  // above throws, but for good measure
+                        CancellationException cancelDetected = new CancellationException("cancel detected");
+                        try {
+                            afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, cancelDetected);
+                        } finally {
+                            // do in finally block so runs even if above throws cancelDetected
+                            afterEndScheduledTaskAllIterations(flags, task, cancelDetected);
+                        }
+                        throw cancelDetected;
                     }
                     Throwable lastError = null;
                     boolean shouldResubmit = true;
-                    task.recentRun = taskScheduledF;
-                    try (BrooklynTaskLoggingMdc mdc = BrooklynTaskLoggingMdc.create(task).start()) {
+                    task.recentRun = taskIterationF;
+                    try (BrooklynTaskLoggingMdc mdc = BrooklynTaskLoggingMdc.create(taskIterationF).start()) {
+                        beforeStartScheduledTaskSubmissionIteration(flags, task, taskIterationF);
                         synchronized (task) {
                             task.notifyAll();
                         }
@@ -590,26 +609,33 @@ public class BasicExecutionManager implements ExecutionManager {
                         }
                         return result;
                     } finally {
-                        // do in finally block in case we were interrupted
-                        if (shouldResubmit && resubmit()) {
-                            // resubmitted fine, no-op
-                        } else {
-                            // not resubmitted, note ending
-                            afterEndScheduledTaskAllIterations(flags, task, lastError);
+                        if (!task.isCancelled() || task.getEndTimeUtc()<=0) {
+                            // don't re-run on cancellation
+
+                            afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, lastError);
+                            // do in finally block in case we were interrupted
+                            if (shouldResubmit && resubmit()) {
+                                // resubmitted fine, no-op
+                            } else {
+                                // not resubmitted, note ending
+                                afterEndScheduledTaskAllIterations(flags, task, lastError);
+                            }
                         }
                     }
                 }});
-                task.nextRun = taskScheduled;
-                BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
-                if (ec!=null) return ec.submit(taskScheduled);
-                else return submit(taskScheduled);
+                task.nextRun = taskIteration;
+                ExecutionContext ec =
+                        // no longer associated the execution context on each execution;
+//                         BasicExecutionContext.getCurrentExecutionContext();
+                        // instead it is set on the task
+                        task.executionContext;
+                if (ec!=null) return ec.submit(taskIteration);
+                else return submit(taskIteration);
 
             } catch (Exception e) {
                 error = e;
+                afterEndScheduledTaskSubmissionIteration(flags, task, taskIteration, error);
                 throw Exceptions.propagate(e);
-                
-            } finally {
-                afterEndScheduledTaskSubmissionIteration(flags, task, taskScheduled, error);
             }
         }
 
@@ -804,8 +830,9 @@ public class BasicExecutionManager implements ExecutionManager {
             }
         }
         
-        if (task instanceof ScheduledTask)
-            return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task);
+        if (task instanceof ScheduledTask) {
+            return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask) task);
+        }
         
         beforeSubmitAtomicTask(flags, task);
         
@@ -850,6 +877,11 @@ public class BasicExecutionManager implements ExecutionManager {
     }
     
     protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
+        // for these, beforeSubmitAtomicTask is not called,
+        // but beforeStartAtomic and afterSubmitAtomic _are_ called
+        internalBeforeSubmit(flags, task);
+    }
+    protected void beforeSubmitScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) {
         internalBeforeSubmit(flags, task);
     }
     protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) {
@@ -916,20 +948,25 @@ public class BasicExecutionManager implements ExecutionManager {
         }
     }
 
-    protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskRepeatedlyScheduling, Task<?> taskIteration) {
-        internalBeforeStart(flags, taskRepeatedlyScheduling, true);
+    /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
+    protected void beforeStartScheduledTaskAllIterations(Map<?,?> flags, Task<?> taskDoingTheInitialSchedule) {
+        internalBeforeStart(flags, taskDoingTheInitialSchedule, !SCHEDULED_TASKS_COUNT_AS_ACTIVE, true, true);
+    }
+    protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskDoingTheScheduling, Task<?> taskIteration) {
+        // no-op, because handled as an atomic task
+        // internalBeforeStart(flags, taskIteration, true, false);
     }
     protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) {
-        internalBeforeStart(flags, task, true);
+        internalBeforeStart(flags, task, false, true, false);
     }
     protected void beforeStartInSameThreadTask(Map<?,?> flags, Task<?> task) {
-        internalBeforeStart(flags, task, false);
+        internalBeforeStart(flags, task, false, false, false);
     }
     
     /** invoked in a task's thread when a task is starting to run (may be some time after submitted), 
      * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */
-    protected void internalBeforeStart(Map<?,?> flags, Task<?> task, boolean allowJitter) {
-        int count = activeTaskCount.incrementAndGet();
+    protected void internalBeforeStart(Map<?,?> flags, Task<?> task, boolean skipIncrementCounter, boolean allowJitter, boolean startingThisThreadMightEndElsewhere) {
+        int count = skipIncrementCounter ? activeTaskCount.get() : activeTaskCount.incrementAndGet();
         if (count % 1000==0) {
             log.warn("High number of active tasks: task #"+count+" is "+task);
         }
@@ -939,19 +976,21 @@ public class BasicExecutionManager implements ExecutionManager {
         if (!task.isCancelled()) {
             Thread thread = Thread.currentThread();
             ((TaskInternal<?>)task).setThread(thread);
-            if (RENAME_THREADS) {
-                threadOriginalName.set(thread.getName());
-                String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8);
-                thread.setName(newThreadName);
+            if (!startingThisThreadMightEndElsewhere) {
+                if (RENAME_THREADS) {
+                    threadOriginalName.set(thread.getName());
+                    String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8);
+                    thread.setName(newThreadName);
+                }
+                PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
             }
-            PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
             ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
         }
 
         if (allowJitter) {
             jitterThreadStart(task);
         }
-        if (flags!=null) {
+        if (flags!=null && !startingThisThreadMightEndElsewhere) {
             invokeCallback(flags.get("newTaskStartCallback"), task);
         }
     }
@@ -992,23 +1031,33 @@ public class BasicExecutionManager implements ExecutionManager {
     }
     private static boolean loggedClosureDeprecatedInInvokeCallback;
     
-    /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
-    protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> taskRepeatedlyScheduling, Throwable error) {
-        internalAfterEnd(flags, taskRepeatedlyScheduling, false, true, error);
+    /** normally (if not interrupted) called once for each call to {@link #beforeStartScheduledTaskAllIterations(Map, Task)}  */
+    protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> taskDoingTheInitialSchedule, Throwable error) {
+        boolean taskWasSubmittedAndNotYetEnded = true;
+        try {
+            taskWasSubmittedAndNotYetEnded = internalAfterEnd(flags, taskDoingTheInitialSchedule, !SCHEDULED_TASKS_COUNT_AS_ACTIVE, false, error);
+        } finally {
+            synchronized (taskDoingTheInitialSchedule) { taskDoingTheInitialSchedule.notifyAll(); }
+            if (taskWasSubmittedAndNotYetEnded) {
+                // prevent from running twice on cancellation after start
+                ((TaskInternal<?>) taskDoingTheInitialSchedule).runListeners();
+            }
+        }
     }
     /** called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task, Task)},
      * with a per-iteration task generated by the surrounding scheduled task */
-    protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskRepeatedlyScheduling, Task<?> taskIteration, Throwable error) {
-        internalAfterEnd(flags, taskRepeatedlyScheduling, true, false, error);
+    protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskDoingTheInitialSchedule, Task<?> taskIteration, Throwable error) {
+        // no-op because handled as an atomic task
+        // internalAfterEnd(flags, taskIteration, false, true, error);
     }
     /** called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked,
      * and normally (if not interrupted prior to start) 
      * called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} */
     protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task, Throwable error) {
-        internalAfterEnd(flags, task, true, true, error);
+        internalAfterEnd(flags, task, false, true, error);
     }
     protected void afterEndInSameThreadTask(Map<?,?> flags, Task<?> task, Throwable error) {
-        internalAfterEnd(flags, task, true, true, error);
+        internalAfterEnd(flags, task, false, true, error);
     }
     protected void afterEndForCancelBeforeStart(Map<?,?> flags, Task<?> task, boolean calledFromCanceller) {
         if (calledFromCanceller) {
@@ -1025,28 +1074,37 @@ public class BasicExecutionManager implements ExecutionManager {
                 // to ensure listeners and callback only invoked once
             }
         }
-        internalAfterEnd(flags, task, !calledFromCanceller, true, null);
+        internalAfterEnd(flags, task, true, !calledFromCanceller, null);
     }
     
     /** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)},
      * and, for atomic tasks and scheduled-task submission iterations where 
-     * always called once if {@link #internalBeforeStart(Map, Task, boolean)} is invoked and in the same thread as that method */
-    protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInThisThread, boolean isEndingAllIterations, Throwable error) {
+     * always called once if {@link #internalBeforeStart(Map, Task, boolean, boolean, boolean)} is invoked and if possible
+     * (but not possible for isEndingAllIterations) in the same thread as that method */
+    protected boolean internalAfterEnd(Map<?,?> flags, Task<?> task, boolean skipDecrementCounter, boolean startedGuaranteedToEndInSameThreadAndEndingSameThread, Throwable error) {
         boolean taskWasSubmittedAndNotYetEnded = true;
         try {
             if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task);
-            if (startedInThisThread) {
+            taskWasSubmittedAndNotYetEnded = incompleteTaskIds.remove(task.getId());
+            // this method might be called more than once, eg if cancelled, so use the above as a guard where single invocation is needed (eg counts)
+
+            if (!skipDecrementCounter && taskWasSubmittedAndNotYetEnded) {
                 activeTaskCount.decrementAndGet();
             }
-            if (isEndingAllIterations) {
-                taskWasSubmittedAndNotYetEnded = incompleteTaskIds.remove(task.getId());
-                if (flags!=null && taskWasSubmittedAndNotYetEnded) {
-                    invokeCallback(flags.get("newTaskEndCallback"), task);
+
+            if (flags!=null && taskWasSubmittedAndNotYetEnded && startedGuaranteedToEndInSameThreadAndEndingSameThread) {
+                invokeCallback(flags.get("newTaskEndCallback"), task);
+            }
+            if (task.getEndTimeUtc()>0) {
+                if (taskWasSubmittedAndNotYetEnded) {
+                    // shouldn't happen
+                    log.debug("Task "+task+" has end time "+task.getEndTimeUtc()+" but was marked as incomplete");
                 }
-                ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
+            } else {
+                ((TaskInternal<?>) task).setEndTimeUtc(System.currentTimeMillis());
             }
-    
-            if (startedInThisThread) {
+
+            if (startedGuaranteedToEndInSameThreadAndEndingSameThread) {
                 PerThreadCurrentTaskHolder.perThreadCurrentTask.remove();
                 //clear thread _after_ endTime set, so we won't get a null thread when there is no end-time
                 if (RENAME_THREADS) {
@@ -1058,15 +1116,18 @@ public class BasicExecutionManager implements ExecutionManager {
                         threadOriginalName.remove();
                     }
                 }
-                ((TaskInternal<?>)task).setThread(null);
             }
+            ((TaskInternal<?>)task).setThread(null);
+
         } finally {
             try {
                 if (error!=null) {
                     /* we throw, after logging debug.
                      * the throw means the error is available for task submitters to monitor.
                      * however it is possible no one is monitoring it, in which case we will have debug logging only for errors.
-                     * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!) 
+                     * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!)
+                     *
+                     * Note in particular that scheduled tasks will typically swallow this and simply re-submit
                      */
                     if (log.isDebugEnabled()) {
                         // debug only here, because most submitters will handle failures
@@ -1088,12 +1149,13 @@ public class BasicExecutionManager implements ExecutionManager {
                 }
             } finally {
                 synchronized (task) { task.notifyAll(); }
-                if (isEndingAllIterations && taskWasSubmittedAndNotYetEnded) {
+                if (taskWasSubmittedAndNotYetEnded) {
                     // prevent from running twice on cancellation after start
                     ((TaskInternal<?>)task).runListeners();
                 }
             }
         }
+        return taskWasSubmittedAndNotYetEnded;
     }
 
     public TaskScheduler getTaskSchedulerForTag(Object tag) {
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
index b5304e1..a6fe8d0 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
@@ -553,8 +553,7 @@ public class BasicTask<T> implements TaskInternal<T> {
         else if (!isCancelled() && startTimeUtc <= 0) {
             rv = "Submitted for execution";
             if (verbosity>0) {
-                long elapsed = System.currentTimeMillis() - submitTimeUtc;
-                rv += " "+Time.makeTimeStringRoundedSince(elapsed)+" ago";
+                rv += " "+Time.makeTimeStringRoundedSince(submitTimeUtc)+" ago";
             }
             if (verbosity >= 2 && getExtraStatusText()!=null) {
                 rv += "\n\n"+getExtraStatusText();
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
index beabe30..8851971 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.brooklyn.util.core.task;
 
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
 
@@ -73,6 +74,7 @@ public class ScheduledTask extends BasicTask<Object> {
      */
     protected boolean cancelOnException = true;
 
+    protected ExecutionContext executionContext;
     protected int runCount=0;
     protected Task<?> recentRun, nextRun;
     Class<? extends Exception> lastThrownType;
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
index 166da59..fea7234 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.brooklyn.util.core.task;
 
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.test.entity.TestEntityImpl;
+import org.apache.brooklyn.util.collections.MutableList;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -49,36 +53,37 @@ import com.google.common.collect.Lists;
 public class ScheduledExecutionTest {
 
     public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class);
-    
+
     @Test
     public void testScheduledTask() throws Exception {
         Duration PERIOD = Duration.millis(20);
         BasicExecutionManager m = new BasicExecutionManager("mycontextid");
         final AtomicInteger i = new AtomicInteger(0);
         ScheduledTask t = ScheduledTask.builder(() -> new BasicTask<Integer>(() -> {
-                        log.info("task running: "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
-                        return i.incrementAndGet();
-                    }))
+                    log.info("task running: " + Tasks.current() + " " + Tasks.current().getStatusDetail(false));
+                    return i.incrementAndGet();
+                }))
                 .displayName("test-1")
                 .delay(PERIOD.multiply(2))
                 .period(PERIOD)
                 .maxIterations(5)
                 .build();
-    
+
         log.info("submitting {} {}", t, t.getStatusDetail(false));
         m.submit(t);
         log.info("submitted {} {}", t, t.getStatusDetail(false));
         Integer interimResult = (Integer) t.get();
-        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
-        assertTrue(i.get() > 0, "i="+i);
+        log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)});
+        assertTrue(i.get() > 0, "i=" + i);
         t.blockUntilEnded();
         Integer finalResult = (Integer) t.get();
-        log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)});
-        assertEquals(finalResult, (Integer)5);
+        log.info("ended ({}) {} {}", new Object[]{finalResult, t, t.getStatusDetail(false)});
+        assertEquals(finalResult, (Integer) 5);
         assertEquals(i.get(), 5);
+        Asserts.eventually(m::getNumActiveTasks, l -> l==0);
     }
 
-    @Test
+    @Test(groups="Integration")
     public void testScheduledTaskCancelledIfExceptionThrown() throws Exception {
         BasicExecutionManager m = new BasicExecutionManager("mycontextid");
         final AtomicInteger calls = new AtomicInteger(0);
@@ -90,12 +95,15 @@ public class ScheduledExecutionTest {
                     public Integer call() {
                         calls.incrementAndGet();
                         throw new RuntimeException("boo");
-                    }});
-            }});
+                    }
+                });
+            }
+        });
 
         m.submit(t);
         Runnable callsIsOne = new Runnable() {
-            @Override public void run() {
+            @Override
+            public void run() {
                 if (calls.get() != 1) {
                     throw new RuntimeException("not yet");
                 }
@@ -104,6 +112,7 @@ public class ScheduledExecutionTest {
         };
         Asserts.succeedsEventually(callsIsOne);
         Asserts.succeedsContinually(callsIsOne);
+        Asserts.eventually(m::getNumActiveTasks, l -> l==0);
     }
 
     @Test
@@ -118,44 +127,52 @@ public class ScheduledExecutionTest {
                     public Integer call() {
                         calls.incrementAndGet();
                         throw new RuntimeException("boo");
-                    }});
-            }});
+                    }
+                });
+            }
+        });
 
         m.submit(t);
         t.blockUntilEnded();
         assertEquals(calls.get(), 5, "Expected task to be resubmitted despite throwing an exception");
+        Asserts.eventually(m::getNumActiveTasks, l -> l==0);
     }
 
-    /** like testScheduledTask but the loop is terminated by the task itself adjusting the period */
+    /**
+     * like testScheduledTask but the loop is terminated by the task itself adjusting the period
+     */
     @Test
     public void testScheduledTaskSelfEnding() throws Exception {
         int PERIOD = 20;
         BasicExecutionManager m = new BasicExecutionManager("mycontextid");
         final AtomicInteger i = new AtomicInteger(0);
-        ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD), new Callable<Task<?>>() {
+        ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2 * PERIOD, "period", PERIOD), new Callable<Task<?>>() {
             @Override
             public Task<?> call() throws Exception {
                 return new BasicTask<Integer>(new Callable<Integer>() {
                     @Override
                     public Integer call() {
-                        ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
+                        ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask();
                         if (i.get() >= 4) submitter.period = null;
-                        log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+                        log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false));
                         return i.incrementAndGet();
-                    }});
-            }});
-    
+                    }
+                });
+            }
+        });
+
         log.info("submitting {} {}", t, t.getStatusDetail(false));
         m.submit(t);
         log.info("submitted {} {}", t, t.getStatusDetail(false));
         Integer interimResult = (Integer) t.get();
-        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+        log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)});
         assertTrue(i.get() > 0);
         t.blockUntilEnded();
         Integer finalResult = (Integer) t.get();
-        log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)});
-        assertEquals(finalResult, (Integer)5);
+        log.info("ended ({}) {} {}", new Object[]{finalResult, t, t.getStatusDetail(false)});
+        assertEquals(finalResult, (Integer) 5);
         assertEquals(i.get(), 5);
+        Asserts.eventually(m::getNumActiveTasks, l -> l==0);
     }
 
     @Test
@@ -169,35 +186,38 @@ public class ScheduledExecutionTest {
                 return new BasicTask<Integer>(new Callable<Integer>() {
                     @Override
                     public Integer call() {
-                        log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
-                        ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
+                        log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false));
+                        ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask();
                         i.incrementAndGet();
                         if (i.get() >= 5) submitter.cancel();
                         return i.get();
-                    }});
-            }});
-    
-        log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+                    }
+                });
+            }
+        });
+
+        log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t, t.getStatusDetail(false));
         m.submit(t);
         log.info("submitted {} {}", t, t.getStatusDetail(false));
         Integer interimResult = (Integer) t.get();
-        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+        log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)});
         assertTrue(i.get() > 0);
         t.blockUntilEnded();
 //      int finalResult = t.get()
-        log.info("ended ({}) {} {}", new Object[] {i, t, t.getStatusDetail(false)});
+        log.info("ended ({}) {} {}", new Object[]{i, t, t.getStatusDetail(false)});
 //      assertEquals(finalResult, 5)
         assertEquals(i.get(), 5);
+        Asserts.eventually(m::getNumActiveTasks, l -> l==0);
     }
 
-    @Test(groups="Integration")
+    @Test(groups = "Integration")
     public void testScheduledTaskCancelOuter() throws Exception {
         final Duration PERIOD = Duration.millis(20);
         final Duration CYCLE_DELAY = Duration.ONE_SECOND;
         // this should be enough to start the next cycle, but not so much that the cycle ends;
         // and enough that when a task is interrupted it terminates within this period
         final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1));
-        
+
         BasicExecutionManager m = new BasicExecutionManager("mycontextid");
         final AtomicInteger i = new AtomicInteger();
         ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
@@ -206,48 +226,51 @@ public class ScheduledExecutionTest {
                 return new BasicTask<Integer>(new Callable<Integer>() {
                     @Override
                     public Integer call() {
-                        log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+                        log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false));
                         Time.sleep(CYCLE_DELAY);
                         i.incrementAndGet();
                         return i.get();
-                    }});
-            }});
-    
-        log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+                    }
+                });
+            }
+        });
+
+        log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t, t.getStatusDetail(false));
         m.submit(t);
         log.info("submitted {} {}", t, t.getStatusDetail(false));
         Integer interimResult = (Integer) t.get();
-        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+        log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)});
         assertEquals(i.get(), 1);
-        
+
         Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
         assertEquals(t.get(), 2);
-        
+
         Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
         Stopwatch timer = Stopwatch.createUnstarted();
         t.cancel(true);
         t.blockUntilEnded();
 //      int finalResult = t.get()
-        log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+        log.info("blocked until ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)});
         try {
             t.get();
-            Assert.fail("Should have failed getting result of cancelled "+t);
+            Assert.fail("Should have failed getting result of cancelled " + t);
         } catch (Exception e) {
             /* expected */
         }
         assertEquals(i.get(), 2);
-        log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+        log.info("ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)});
         Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY));
+        Asserts.eventually(m::getNumActiveTasks, l -> l==0);
     }
 
-    @Test(groups="Integration")
+    @Test(groups = "Integration")
     public void testScheduledTaskCancelInterrupts() throws Exception {
         final Duration PERIOD = Duration.millis(20);
         final Duration CYCLE_DELAY = Duration.ONE_SECOND;
         // this should be enough to start the next cycle, but not so much that the cycle ends;
         // and enough that when a task is interrupted it terminates within this period
         final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1));
-        
+
         BasicExecutionManager m = new BasicExecutionManager("mycontextid");
         final Semaphore interruptedSemaphore = new Semaphore(0);
         final AtomicInteger i = new AtomicInteger();
@@ -258,7 +281,7 @@ public class ScheduledExecutionTest {
                     @Override
                     public Integer call() {
                         try {
-                            log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+                            log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false));
                             Time.sleep(CYCLE_DELAY);
                             i.incrementAndGet();
                             return i.get();
@@ -266,45 +289,48 @@ public class ScheduledExecutionTest {
                             interruptedSemaphore.release();
                             throw Exceptions.propagate(e);
                         }
-                    }});
-            }});
-    
-        log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+                    }
+                });
+            }
+        });
+
+        log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t, t.getStatusDetail(false));
         m.submit(t);
         log.info("submitted {} {}", t, t.getStatusDetail(false));
         Integer interimResult = (Integer) t.get();
-        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+        log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)});
         assertEquals(i.get(), 1);
-        
+
         Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
         assertEquals(t.get(), 2);
-        
+
         Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
         Stopwatch timer = Stopwatch.createUnstarted();
         t.cancel(true);
         t.blockUntilEnded();
 //      int finalResult = t.get()
-        log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+        log.info("blocked until ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)});
         try {
             t.get();
-            Assert.fail("Should have failed getting result of cancelled "+t);
+            Assert.fail("Should have failed getting result of cancelled " + t);
         } catch (Exception e) {
             /* expected */
         }
         assertEquals(i.get(), 2);
         Assert.assertTrue(interruptedSemaphore.tryAcquire(1, SMALL_FRACTION_OF_CYCLE_DELAY.toMilliseconds(), TimeUnit.MILLISECONDS), "child thread was not interrupted");
-        log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+        log.info("ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)});
         Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY));
+        Asserts.eventually(m::getNumActiveTasks, l -> l==0);
     }
 
-    @Test(groups="Integration")
+    @Test(groups = "Integration")
     public void testScheduledTaskTakesLongerThanPeriod() throws Exception {
         final int PERIOD = 1;
         final int SLEEP_TIME = 100;
         final int EARLY_RETURN_GRACE = 10;
         BasicExecutionManager m = new BasicExecutionManager("mycontextid");
         final List<Long> execTimes = new CopyOnWriteArrayList<Long>();
-        
+
         ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD, "period", PERIOD), new Callable<Task<?>>() {
             @Override
             public Task<?> call() throws Exception {
@@ -317,17 +343,20 @@ public class ScheduledExecutionTest {
                         } catch (InterruptedException e) {
                             throw Exceptions.propagate(e);
                         }
-                    }});
-            }});
-    
+                    }
+                });
+            }
+        });
+
         m.submit(t);
-        
+
         Asserts.succeedsEventually(new Runnable() {
             @Override
             public void run() {
-                assertTrue(execTimes.size() > 3, "size="+execTimes.size());
-            }});
-        
+                assertTrue(execTimes.size() > 3, "size=" + execTimes.size());
+            }
+        });
+
         List<Long> timeDiffs = Lists.newArrayList();
         long prevExecTime = -1;
         for (Long execTime : execTimes) {
@@ -338,9 +367,86 @@ public class ScheduledExecutionTest {
                 prevExecTime = execTime;
             }
         }
-        
+
         for (Long timeDiff : timeDiffs) {
-            if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE)) fail("timeDiffs="+timeDiffs+"; execTimes="+execTimes);
+            if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE))
+                fail("timeDiffs=" + timeDiffs + "; execTimes=" + execTimes);
         }
+
+        t.cancel();
+        Asserts.eventually(m::getNumActiveTasks, l -> l==0);
     }
-}
+
+    @Test(groups="Integration")  // because slow
+    public void testScheduledTaskInContextClearing() throws Exception {
+        Duration PERIOD = Duration.millis(50);
+        int COUNT = 10;
+        BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+
+        final List<String> errors = MutableList.of();
+
+        final AtomicInteger i2 = new AtomicInteger();
+        // now start other task, in entity context
+        ScheduledTask t2 = new ScheduledTask(MutableMap.of("displayName", "t2", "period", PERIOD), new Callable<Task<?>>() {
+            @Override
+            public Task<?> call() throws Exception {
+                return new BasicTask<Integer>(MutableMap.of("displayName", "t2-i"), new Callable<Integer>() {
+                    @Override
+                    public Integer call() {
+                        Entity ce = BrooklynTaskTags.getContextEntity(Tasks.current());
+                        log.info("entity task t2 running (" + i2 + "): " + Thread.currentThread() + " " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)+" - "+ce);
+                        if (ce==null) {
+                            errors.add("Scheduled task t2 missing context entity");
+                        }
+
+                        ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask();
+                        i2.incrementAndGet();
+                        if (i2.get() >= COUNT) submitter.cancel();
+                        return i2.get();
+                    }
+                });
+            }
+        });
+        Entity contextEntity = new TestEntityImpl();
+        BasicExecutionContext exec = new BasicExecutionContext(m, MutableList.of(BrooklynTaskTags.tagForContextEntity(contextEntity)));
+        exec.submit(t2);
+
+        final AtomicInteger i1 = new AtomicInteger();
+
+        ScheduledTask t1 = new ScheduledTask(MutableMap.of("displayName", "t1", "period", PERIOD), new Callable<Task<?>>() {
+            @Override
+            public Task<?> call() throws Exception {
+                return new BasicTask<Integer>(MutableMap.of("displayName", "t1-i"), new Callable<Integer>() {
+                    @Override
+                    public Integer call() {
+                        Entity ce = BrooklynTaskTags.getContextEntity(Tasks.current());
+                        log.info("non-entity task t1 running (" + i1 + "): " + Thread.currentThread() + " " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)+" - "+ce);
+                        if (ce!=null) {
+                            errors.add("Scheduled task t1 has context entity "+ce);
+                        }
+
+                        ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask();
+                        i1.incrementAndGet();
+                        if (i1.get() >= COUNT) submitter.cancel();
+                        return i1.get();
+                    }
+                });
+            }
+        });
+
+        Time.sleep(70);
+        log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t1, t1.getStatusDetail(false));
+        m.submit(t1);
+        log.info("submitted {} {}", t1, t1.getStatusDetail(false));
+        Integer interimResult = (Integer) t1.get();
+        log.info("done one ({}) {} {}", new Object[]{interimResult, t1, t1.getStatusDetail(false)});
+        assertTrue(i1.get() > 0);
+
+        t1.blockUntilEnded();
+        t2.blockUntilEnded();
+
+        Asserts.assertSize(errors, 0);
+        Asserts.eventually(m::getNumActiveTasks, l -> l==0);
+    }
+
+}
\ No newline at end of file