You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2021/09/14 21:56:23 UTC
[brooklyn-server] 04/27: tidy task before/after model,
ensure scheduled tasks clear their task context
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 85d965153a491789f1367d3265d690f8e479422c
Author: Alex Heneveld <al...@cloudsoftcorp.com>
AuthorDate: Mon Sep 13 17:08:04 2021 +0100
tidy task before/after model, ensure scheduled tasks clear their task context
---
.../util/core/task/BasicExecutionContext.java | 50 +++--
.../util/core/task/BasicExecutionManager.java | 196 ++++++++++------
.../apache/brooklyn/util/core/task/BasicTask.java | 3 +-
.../brooklyn/util/core/task/ScheduledTask.java | 2 +
.../util/core/task/ScheduledExecutionTest.java | 248 +++++++++++++++------
5 files changed, 338 insertions(+), 161 deletions(-)
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index ca9e158..70322ce 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -361,33 +361,41 @@ public class BasicExecutionContext extends AbstractExecutionContext {
taskTags.addAll(tags);
- if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current())
+ if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current())
&& !taskTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) && !taskTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) {
// tag as transient if submitter is transient, unless explicitly tagged as non-transient
taskTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
}
- final Object startCallback = properties.get("newTaskStartCallback");
- properties.put("newTaskStartCallback", new Function<Task<?>,Void>() {
- @Override
- public Void apply(Task<?> it) {
- registerPerThreadExecutionContext();
- if (startCallback!=null) BasicExecutionManager.invokeCallback(startCallback, it);
- return null;
- }});
-
- final Object endCallback = properties.get("newTaskEndCallback");
- properties.put("newTaskEndCallback", new Function<Task<?>,Void>() {
- @Override
- public Void apply(Task<?> it) {
- try {
- if (endCallback!=null) BasicExecutionManager.invokeCallback(endCallback, it);
- } finally {
- clearPerThreadExecutionContext();
+ if (task instanceof ScheduledTask) {
+ // not run for scheduler
+ ((ScheduledTask)task).executionContext = this;
+
+ } else {
+ final Object startCallback = properties.get("newTaskStartCallback");
+ properties.put("newTaskStartCallback", new Function<Task<?>, Void>() {
+ @Override
+ public Void apply(Task<?> it) {
+ registerPerThreadExecutionContext();
+ if (startCallback != null) BasicExecutionManager.invokeCallback(startCallback, it);
+ return null;
}
- return null;
- }});
-
+ });
+
+ final Object endCallback = properties.get("newTaskEndCallback");
+ properties.put("newTaskEndCallback", new Function<Task<?>, Void>() {
+ @Override
+ public Void apply(Task<?> it) {
+ try {
+ if (endCallback != null) BasicExecutionManager.invokeCallback(endCallback, it);
+ } finally {
+ clearPerThreadExecutionContext();
+ }
+ return null;
+ }
+ });
+ }
+
if (task instanceof Task) {
return executionManager.submit(properties, (Task)task);
} else if (task instanceof Callable) {
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 29984e2..f1496fc 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.internal.BrooklynLoggingCategories;
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.ExecutionManager;
import org.apache.brooklyn.api.mgmt.HasTaskChildren;
import org.apache.brooklyn.api.mgmt.Task;
@@ -69,6 +70,7 @@ import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.CountdownTimer;
@@ -106,6 +108,8 @@ public class BasicExecutionManager implements ExecutionManager {
public static final String LOGGING_MDC_KEY_ENTITY_IDS = "entity.ids";
public static final String LOGGING_MDC_KEY_TASK_ID = "task.id";
+ private static final boolean SCHEDULED_TASKS_COUNT_AS_ACTIVE = false;
+
private boolean jitterThreads = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_JITTER_THREADS);
private int jitterThreadsMaxDelay = Integer.getInteger(JITTER_THREADS_MAX_DELAY_PROPERTY, 200);
@@ -525,10 +529,13 @@ public class BasicExecutionManager implements ExecutionManager {
}
protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
- beforeSubmitScheduledTaskAllIterations(flags, task);
-
- if (!submitSubsequentScheduledTask(flags, task)) {
- afterEndScheduledTaskAllIterations(flags, task, null);
+ boolean result = false;
+ try {
+ result = submitSubsequentScheduledTask(flags, task);
+ } finally {
+ if (!result) {
+ afterEndScheduledTaskAllIterations(flags, task, null);
+ }
}
return task;
}
@@ -555,27 +562,39 @@ public class BasicExecutionManager implements ExecutionManager {
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public Object call() {
- if (task.startTimeUtc==-1) {
- // this is overwritten on each run; not sure if that's best or not
- task.startTimeUtc = System.currentTimeMillis();
- }
- TaskInternal<?> taskScheduled = null;
+ TaskInternal<?> taskIteration = null;
Throwable error = null;
try {
- taskScheduled = (TaskInternal<?>) task.newTask();
- taskScheduled.setSubmittedByTask(task);
- beforeStartScheduledTaskSubmissionIteration(flags, task, taskScheduled);
- final Callable<?> oldJob = taskScheduled.getJob();
- final TaskInternal<?> taskScheduledF = taskScheduled;
- taskScheduled.setJob(new Callable() { @Override public Object call() {
+ if (task.startTimeUtc==-1) {
+ beforeSubmitScheduledTaskAllIterations(flags, task);
+ beforeStartScheduledTaskAllIterations(flags, task);
+
+ task.startTimeUtc = System.currentTimeMillis();
+ }
+
+ taskIteration = (TaskInternal<?>) task.newTask();
+ taskIteration.setSubmittedByTask(task);
+
+ beforeSubmitScheduledTaskSubmissionIteration(flags, task);
+
+ final Callable<?> oldJob = taskIteration.getJob();
+ final TaskInternal<?> taskIterationF = taskIteration;
+ taskIteration.setJob(new Callable() { @Override public Object call() {
if (task.isCancelled()) {
- afterEndScheduledTaskAllIterations(flags, task, new CancellationException("cancel detected"));
- throw new CancellationException("cancel detected"); // above throws, but for good measure
+ CancellationException cancelDetected = new CancellationException("cancel detected");
+ try {
+ afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, cancelDetected);
+ } finally {
+ // do in finally block so runs even if above throws cancelDetected
+ afterEndScheduledTaskAllIterations(flags, task, cancelDetected);
+ }
+ throw cancelDetected;
}
Throwable lastError = null;
boolean shouldResubmit = true;
- task.recentRun = taskScheduledF;
- try (BrooklynTaskLoggingMdc mdc = BrooklynTaskLoggingMdc.create(task).start()) {
+ task.recentRun = taskIterationF;
+ try (BrooklynTaskLoggingMdc mdc = BrooklynTaskLoggingMdc.create(taskIterationF).start()) {
+ beforeStartScheduledTaskSubmissionIteration(flags, task, taskIterationF);
synchronized (task) {
task.notifyAll();
}
@@ -590,26 +609,33 @@ public class BasicExecutionManager implements ExecutionManager {
}
return result;
} finally {
- // do in finally block in case we were interrupted
- if (shouldResubmit && resubmit()) {
- // resubmitted fine, no-op
- } else {
- // not resubmitted, note ending
- afterEndScheduledTaskAllIterations(flags, task, lastError);
+ if (!task.isCancelled() || task.getEndTimeUtc()<=0) {
+ // don't re-run on cancellation
+
+ afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, lastError);
+ // do in finally block in case we were interrupted
+ if (shouldResubmit && resubmit()) {
+ // resubmitted fine, no-op
+ } else {
+ // not resubmitted, note ending
+ afterEndScheduledTaskAllIterations(flags, task, lastError);
+ }
}
}
}});
- task.nextRun = taskScheduled;
- BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
- if (ec!=null) return ec.submit(taskScheduled);
- else return submit(taskScheduled);
+ task.nextRun = taskIteration;
+ ExecutionContext ec =
+ // no longer associated the execution context on each execution;
+// BasicExecutionContext.getCurrentExecutionContext();
+ // instead it is set on the task
+ task.executionContext;
+ if (ec!=null) return ec.submit(taskIteration);
+ else return submit(taskIteration);
} catch (Exception e) {
error = e;
+ afterEndScheduledTaskSubmissionIteration(flags, task, taskIteration, error);
throw Exceptions.propagate(e);
-
- } finally {
- afterEndScheduledTaskSubmissionIteration(flags, task, taskScheduled, error);
}
}
@@ -804,8 +830,9 @@ public class BasicExecutionManager implements ExecutionManager {
}
}
- if (task instanceof ScheduledTask)
- return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task);
+ if (task instanceof ScheduledTask) {
+ return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask) task);
+ }
beforeSubmitAtomicTask(flags, task);
@@ -850,6 +877,11 @@ public class BasicExecutionManager implements ExecutionManager {
}
protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
+ // for these, beforeSubmitAtomicTask is not called,
+ // but beforeStartAtomic and afterSubmitAtomic _are_ called
+ internalBeforeSubmit(flags, task);
+ }
+ protected void beforeSubmitScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) {
internalBeforeSubmit(flags, task);
}
protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) {
@@ -916,20 +948,25 @@ public class BasicExecutionManager implements ExecutionManager {
}
}
- protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskRepeatedlyScheduling, Task<?> taskIteration) {
- internalBeforeStart(flags, taskRepeatedlyScheduling, true);
+ /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
+ protected void beforeStartScheduledTaskAllIterations(Map<?,?> flags, Task<?> taskDoingTheInitialSchedule) {
+ internalBeforeStart(flags, taskDoingTheInitialSchedule, !SCHEDULED_TASKS_COUNT_AS_ACTIVE, true, true);
+ }
+ protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskDoingTheScheduling, Task<?> taskIteration) {
+ // no-op, because handled as an atomic task
+ // internalBeforeStart(flags, taskIteration, true, false);
}
protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) {
- internalBeforeStart(flags, task, true);
+ internalBeforeStart(flags, task, false, true, false);
}
protected void beforeStartInSameThreadTask(Map<?,?> flags, Task<?> task) {
- internalBeforeStart(flags, task, false);
+ internalBeforeStart(flags, task, false, false, false);
}
/** invoked in a task's thread when a task is starting to run (may be some time after submitted),
* but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */
- protected void internalBeforeStart(Map<?,?> flags, Task<?> task, boolean allowJitter) {
- int count = activeTaskCount.incrementAndGet();
+ protected void internalBeforeStart(Map<?,?> flags, Task<?> task, boolean skipIncrementCounter, boolean allowJitter, boolean startingThisThreadMightEndElsewhere) {
+ int count = skipIncrementCounter ? activeTaskCount.get() : activeTaskCount.incrementAndGet();
if (count % 1000==0) {
log.warn("High number of active tasks: task #"+count+" is "+task);
}
@@ -939,19 +976,21 @@ public class BasicExecutionManager implements ExecutionManager {
if (!task.isCancelled()) {
Thread thread = Thread.currentThread();
((TaskInternal<?>)task).setThread(thread);
- if (RENAME_THREADS) {
- threadOriginalName.set(thread.getName());
- String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8);
- thread.setName(newThreadName);
+ if (!startingThisThreadMightEndElsewhere) {
+ if (RENAME_THREADS) {
+ threadOriginalName.set(thread.getName());
+ String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8);
+ thread.setName(newThreadName);
+ }
+ PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
}
- PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
}
if (allowJitter) {
jitterThreadStart(task);
}
- if (flags!=null) {
+ if (flags!=null && !startingThisThreadMightEndElsewhere) {
invokeCallback(flags.get("newTaskStartCallback"), task);
}
}
@@ -992,23 +1031,33 @@ public class BasicExecutionManager implements ExecutionManager {
}
private static boolean loggedClosureDeprecatedInInvokeCallback;
- /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
- protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> taskRepeatedlyScheduling, Throwable error) {
- internalAfterEnd(flags, taskRepeatedlyScheduling, false, true, error);
+ /** normally (if not interrupted) called once for each call to {@link #beforeStartScheduledTaskAllIterations(Map, Task)} */
+ protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> taskDoingTheInitialSchedule, Throwable error) {
+ boolean taskWasSubmittedAndNotYetEnded = true;
+ try {
+ taskWasSubmittedAndNotYetEnded = internalAfterEnd(flags, taskDoingTheInitialSchedule, !SCHEDULED_TASKS_COUNT_AS_ACTIVE, false, error);
+ } finally {
+ synchronized (taskDoingTheInitialSchedule) { taskDoingTheInitialSchedule.notifyAll(); }
+ if (taskWasSubmittedAndNotYetEnded) {
+ // prevent from running twice on cancellation after start
+ ((TaskInternal<?>) taskDoingTheInitialSchedule).runListeners();
+ }
+ }
}
/** called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task, Task)},
* with a per-iteration task generated by the surrounding scheduled task */
- protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskRepeatedlyScheduling, Task<?> taskIteration, Throwable error) {
- internalAfterEnd(flags, taskRepeatedlyScheduling, true, false, error);
+ protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskDoingTheInitialSchedule, Task<?> taskIteration, Throwable error) {
+ // no-op because handled as an atomic task
+ // internalAfterEnd(flags, taskIteration, false, true, error);
}
/** called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked,
* and normally (if not interrupted prior to start)
* called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} */
protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task, Throwable error) {
- internalAfterEnd(flags, task, true, true, error);
+ internalAfterEnd(flags, task, false, true, error);
}
protected void afterEndInSameThreadTask(Map<?,?> flags, Task<?> task, Throwable error) {
- internalAfterEnd(flags, task, true, true, error);
+ internalAfterEnd(flags, task, false, true, error);
}
protected void afterEndForCancelBeforeStart(Map<?,?> flags, Task<?> task, boolean calledFromCanceller) {
if (calledFromCanceller) {
@@ -1025,28 +1074,37 @@ public class BasicExecutionManager implements ExecutionManager {
// to ensure listeners and callback only invoked once
}
}
- internalAfterEnd(flags, task, !calledFromCanceller, true, null);
+ internalAfterEnd(flags, task, true, !calledFromCanceller, null);
}
/** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)},
* and, for atomic tasks and scheduled-task submission iterations where
- * always called once if {@link #internalBeforeStart(Map, Task, boolean)} is invoked and in the same thread as that method */
- protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInThisThread, boolean isEndingAllIterations, Throwable error) {
+ * always called once if {@link #internalBeforeStart(Map, Task, boolean, boolean, boolean)} is invoked and if possible
+ * (but not possible for isEndingAllIterations) in the same thread as that method */
+ protected boolean internalAfterEnd(Map<?,?> flags, Task<?> task, boolean skipDecrementCounter, boolean startedGuaranteedToEndInSameThreadAndEndingSameThread, Throwable error) {
boolean taskWasSubmittedAndNotYetEnded = true;
try {
if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task);
- if (startedInThisThread) {
+ taskWasSubmittedAndNotYetEnded = incompleteTaskIds.remove(task.getId());
+ // this method might be called more than once, eg if cancelled, so use the above as a guard where single invocation is needed (eg counts)
+
+ if (!skipDecrementCounter && taskWasSubmittedAndNotYetEnded) {
activeTaskCount.decrementAndGet();
}
- if (isEndingAllIterations) {
- taskWasSubmittedAndNotYetEnded = incompleteTaskIds.remove(task.getId());
- if (flags!=null && taskWasSubmittedAndNotYetEnded) {
- invokeCallback(flags.get("newTaskEndCallback"), task);
+
+ if (flags!=null && taskWasSubmittedAndNotYetEnded && startedGuaranteedToEndInSameThreadAndEndingSameThread) {
+ invokeCallback(flags.get("newTaskEndCallback"), task);
+ }
+ if (task.getEndTimeUtc()>0) {
+ if (taskWasSubmittedAndNotYetEnded) {
+ // shouldn't happen
+ log.debug("Task "+task+" has end time "+task.getEndTimeUtc()+" but was marked as incomplete");
}
- ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
+ } else {
+ ((TaskInternal<?>) task).setEndTimeUtc(System.currentTimeMillis());
}
-
- if (startedInThisThread) {
+
+ if (startedGuaranteedToEndInSameThreadAndEndingSameThread) {
PerThreadCurrentTaskHolder.perThreadCurrentTask.remove();
//clear thread _after_ endTime set, so we won't get a null thread when there is no end-time
if (RENAME_THREADS) {
@@ -1058,15 +1116,18 @@ public class BasicExecutionManager implements ExecutionManager {
threadOriginalName.remove();
}
}
- ((TaskInternal<?>)task).setThread(null);
}
+ ((TaskInternal<?>)task).setThread(null);
+
} finally {
try {
if (error!=null) {
/* we throw, after logging debug.
* the throw means the error is available for task submitters to monitor.
* however it is possible no one is monitoring it, in which case we will have debug logging only for errors.
- * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!)
+ * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!)
+ *
+ * Note in particular that scheduled tasks will typically swallow this and simply re-submit
*/
if (log.isDebugEnabled()) {
// debug only here, because most submitters will handle failures
@@ -1088,12 +1149,13 @@ public class BasicExecutionManager implements ExecutionManager {
}
} finally {
synchronized (task) { task.notifyAll(); }
- if (isEndingAllIterations && taskWasSubmittedAndNotYetEnded) {
+ if (taskWasSubmittedAndNotYetEnded) {
// prevent from running twice on cancellation after start
((TaskInternal<?>)task).runListeners();
}
}
}
+ return taskWasSubmittedAndNotYetEnded;
}
public TaskScheduler getTaskSchedulerForTag(Object tag) {
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
index b5304e1..a6fe8d0 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java
@@ -553,8 +553,7 @@ public class BasicTask<T> implements TaskInternal<T> {
else if (!isCancelled() && startTimeUtc <= 0) {
rv = "Submitted for execution";
if (verbosity>0) {
- long elapsed = System.currentTimeMillis() - submitTimeUtc;
- rv += " "+Time.makeTimeStringRoundedSince(elapsed)+" ago";
+ rv += " "+Time.makeTimeStringRoundedSince(submitTimeUtc)+" ago";
}
if (verbosity >= 2 && getExtraStatusText()!=null) {
rv += "\n\n"+getExtraStatusText();
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
index beabe30..8851971 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
@@ -18,6 +18,7 @@
*/
package org.apache.brooklyn.util.core.task;
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
@@ -73,6 +74,7 @@ public class ScheduledTask extends BasicTask<Object> {
*/
protected boolean cancelOnException = true;
+ protected ExecutionContext executionContext;
protected int runCount=0;
protected Task<?> recentRun, nextRun;
Class<? extends Exception> lastThrownType;
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
index 166da59..fea7234 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.brooklyn.util.core.task;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.test.entity.TestEntityImpl;
+import org.apache.brooklyn.util.collections.MutableList;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -49,36 +53,37 @@ import com.google.common.collect.Lists;
public class ScheduledExecutionTest {
public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class);
-
+
@Test
public void testScheduledTask() throws Exception {
Duration PERIOD = Duration.millis(20);
BasicExecutionManager m = new BasicExecutionManager("mycontextid");
final AtomicInteger i = new AtomicInteger(0);
ScheduledTask t = ScheduledTask.builder(() -> new BasicTask<Integer>(() -> {
- log.info("task running: "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
- return i.incrementAndGet();
- }))
+ log.info("task running: " + Tasks.current() + " " + Tasks.current().getStatusDetail(false));
+ return i.incrementAndGet();
+ }))
.displayName("test-1")
.delay(PERIOD.multiply(2))
.period(PERIOD)
.maxIterations(5)
.build();
-
+
log.info("submitting {} {}", t, t.getStatusDetail(false));
m.submit(t);
log.info("submitted {} {}", t, t.getStatusDetail(false));
Integer interimResult = (Integer) t.get();
- log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
- assertTrue(i.get() > 0, "i="+i);
+ log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)});
+ assertTrue(i.get() > 0, "i=" + i);
t.blockUntilEnded();
Integer finalResult = (Integer) t.get();
- log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)});
- assertEquals(finalResult, (Integer)5);
+ log.info("ended ({}) {} {}", new Object[]{finalResult, t, t.getStatusDetail(false)});
+ assertEquals(finalResult, (Integer) 5);
assertEquals(i.get(), 5);
+ Asserts.eventually(m::getNumActiveTasks, l -> l==0);
}
- @Test
+ @Test(groups="Integration")
public void testScheduledTaskCancelledIfExceptionThrown() throws Exception {
BasicExecutionManager m = new BasicExecutionManager("mycontextid");
final AtomicInteger calls = new AtomicInteger(0);
@@ -90,12 +95,15 @@ public class ScheduledExecutionTest {
public Integer call() {
calls.incrementAndGet();
throw new RuntimeException("boo");
- }});
- }});
+ }
+ });
+ }
+ });
m.submit(t);
Runnable callsIsOne = new Runnable() {
- @Override public void run() {
+ @Override
+ public void run() {
if (calls.get() != 1) {
throw new RuntimeException("not yet");
}
@@ -104,6 +112,7 @@ public class ScheduledExecutionTest {
};
Asserts.succeedsEventually(callsIsOne);
Asserts.succeedsContinually(callsIsOne);
+ Asserts.eventually(m::getNumActiveTasks, l -> l==0);
}
@Test
@@ -118,44 +127,52 @@ public class ScheduledExecutionTest {
public Integer call() {
calls.incrementAndGet();
throw new RuntimeException("boo");
- }});
- }});
+ }
+ });
+ }
+ });
m.submit(t);
t.blockUntilEnded();
assertEquals(calls.get(), 5, "Expected task to be resubmitted despite throwing an exception");
+ Asserts.eventually(m::getNumActiveTasks, l -> l==0);
}
- /** like testScheduledTask but the loop is terminated by the task itself adjusting the period */
+ /**
+ * like testScheduledTask but the loop is terminated by the task itself adjusting the period
+ */
@Test
public void testScheduledTaskSelfEnding() throws Exception {
int PERIOD = 20;
BasicExecutionManager m = new BasicExecutionManager("mycontextid");
final AtomicInteger i = new AtomicInteger(0);
- ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD), new Callable<Task<?>>() {
+ ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2 * PERIOD, "period", PERIOD), new Callable<Task<?>>() {
@Override
public Task<?> call() throws Exception {
return new BasicTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() {
- ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
+ ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask();
if (i.get() >= 4) submitter.period = null;
- log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+ log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false));
return i.incrementAndGet();
- }});
- }});
-
+ }
+ });
+ }
+ });
+
log.info("submitting {} {}", t, t.getStatusDetail(false));
m.submit(t);
log.info("submitted {} {}", t, t.getStatusDetail(false));
Integer interimResult = (Integer) t.get();
- log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+ log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)});
assertTrue(i.get() > 0);
t.blockUntilEnded();
Integer finalResult = (Integer) t.get();
- log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)});
- assertEquals(finalResult, (Integer)5);
+ log.info("ended ({}) {} {}", new Object[]{finalResult, t, t.getStatusDetail(false)});
+ assertEquals(finalResult, (Integer) 5);
assertEquals(i.get(), 5);
+ Asserts.eventually(m::getNumActiveTasks, l -> l==0);
}
@Test
@@ -169,35 +186,38 @@ public class ScheduledExecutionTest {
return new BasicTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() {
- log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
- ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
+ log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false));
+ ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask();
i.incrementAndGet();
if (i.get() >= 5) submitter.cancel();
return i.get();
- }});
- }});
-
- log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+ }
+ });
+ }
+ });
+
+ log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t, t.getStatusDetail(false));
m.submit(t);
log.info("submitted {} {}", t, t.getStatusDetail(false));
Integer interimResult = (Integer) t.get();
- log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+ log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)});
assertTrue(i.get() > 0);
t.blockUntilEnded();
// int finalResult = t.get()
- log.info("ended ({}) {} {}", new Object[] {i, t, t.getStatusDetail(false)});
+ log.info("ended ({}) {} {}", new Object[]{i, t, t.getStatusDetail(false)});
// assertEquals(finalResult, 5)
assertEquals(i.get(), 5);
+ Asserts.eventually(m::getNumActiveTasks, l -> l==0);
}
- @Test(groups="Integration")
+ @Test(groups = "Integration")
public void testScheduledTaskCancelOuter() throws Exception {
final Duration PERIOD = Duration.millis(20);
final Duration CYCLE_DELAY = Duration.ONE_SECOND;
// this should be enough to start the next cycle, but not so much that the cycle ends;
// and enough that when a task is interrupted it terminates within this period
final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1));
-
+
BasicExecutionManager m = new BasicExecutionManager("mycontextid");
final AtomicInteger i = new AtomicInteger();
ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
@@ -206,48 +226,51 @@ public class ScheduledExecutionTest {
return new BasicTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() {
- log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+ log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false));
Time.sleep(CYCLE_DELAY);
i.incrementAndGet();
return i.get();
- }});
- }});
-
- log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+ }
+ });
+ }
+ });
+
+ log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t, t.getStatusDetail(false));
m.submit(t);
log.info("submitted {} {}", t, t.getStatusDetail(false));
Integer interimResult = (Integer) t.get();
- log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+ log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)});
assertEquals(i.get(), 1);
-
+
Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
assertEquals(t.get(), 2);
-
+
Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
Stopwatch timer = Stopwatch.createUnstarted();
t.cancel(true);
t.blockUntilEnded();
// int finalResult = t.get()
- log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+ log.info("blocked until ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)});
try {
t.get();
- Assert.fail("Should have failed getting result of cancelled "+t);
+ Assert.fail("Should have failed getting result of cancelled " + t);
} catch (Exception e) {
/* expected */
}
assertEquals(i.get(), 2);
- log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+ log.info("ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)});
Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY));
+ Asserts.eventually(m::getNumActiveTasks, l -> l==0);
}
- @Test(groups="Integration")
+ @Test(groups = "Integration")
public void testScheduledTaskCancelInterrupts() throws Exception {
final Duration PERIOD = Duration.millis(20);
final Duration CYCLE_DELAY = Duration.ONE_SECOND;
// this should be enough to start the next cycle, but not so much that the cycle ends;
// and enough that when a task is interrupted it terminates within this period
final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1));
-
+
BasicExecutionManager m = new BasicExecutionManager("mycontextid");
final Semaphore interruptedSemaphore = new Semaphore(0);
final AtomicInteger i = new AtomicInteger();
@@ -258,7 +281,7 @@ public class ScheduledExecutionTest {
@Override
public Integer call() {
try {
- log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+ log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false));
Time.sleep(CYCLE_DELAY);
i.incrementAndGet();
return i.get();
@@ -266,45 +289,48 @@ public class ScheduledExecutionTest {
interruptedSemaphore.release();
throw Exceptions.propagate(e);
}
- }});
- }});
-
- log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+ }
+ });
+ }
+ });
+
+ log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t, t.getStatusDetail(false));
m.submit(t);
log.info("submitted {} {}", t, t.getStatusDetail(false));
Integer interimResult = (Integer) t.get();
- log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+ log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)});
assertEquals(i.get(), 1);
-
+
Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
assertEquals(t.get(), 2);
-
+
Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
Stopwatch timer = Stopwatch.createUnstarted();
t.cancel(true);
t.blockUntilEnded();
// int finalResult = t.get()
- log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+ log.info("blocked until ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)});
try {
t.get();
- Assert.fail("Should have failed getting result of cancelled "+t);
+ Assert.fail("Should have failed getting result of cancelled " + t);
} catch (Exception e) {
/* expected */
}
assertEquals(i.get(), 2);
Assert.assertTrue(interruptedSemaphore.tryAcquire(1, SMALL_FRACTION_OF_CYCLE_DELAY.toMilliseconds(), TimeUnit.MILLISECONDS), "child thread was not interrupted");
- log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+ log.info("ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)});
Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY));
+ Asserts.eventually(m::getNumActiveTasks, l -> l==0);
}
- @Test(groups="Integration")
+ @Test(groups = "Integration")
public void testScheduledTaskTakesLongerThanPeriod() throws Exception {
final int PERIOD = 1;
final int SLEEP_TIME = 100;
final int EARLY_RETURN_GRACE = 10;
BasicExecutionManager m = new BasicExecutionManager("mycontextid");
final List<Long> execTimes = new CopyOnWriteArrayList<Long>();
-
+
ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD, "period", PERIOD), new Callable<Task<?>>() {
@Override
public Task<?> call() throws Exception {
@@ -317,17 +343,20 @@ public class ScheduledExecutionTest {
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
- }});
- }});
-
+ }
+ });
+ }
+ });
+
m.submit(t);
-
+
Asserts.succeedsEventually(new Runnable() {
@Override
public void run() {
- assertTrue(execTimes.size() > 3, "size="+execTimes.size());
- }});
-
+ assertTrue(execTimes.size() > 3, "size=" + execTimes.size());
+ }
+ });
+
List<Long> timeDiffs = Lists.newArrayList();
long prevExecTime = -1;
for (Long execTime : execTimes) {
@@ -338,9 +367,86 @@ public class ScheduledExecutionTest {
prevExecTime = execTime;
}
}
-
+
for (Long timeDiff : timeDiffs) {
- if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE)) fail("timeDiffs="+timeDiffs+"; execTimes="+execTimes);
+ if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE))
+ fail("timeDiffs=" + timeDiffs + "; execTimes=" + execTimes);
}
+
+ t.cancel();
+ Asserts.eventually(m::getNumActiveTasks, l -> l==0);
}
-}
+
+ @Test(groups="Integration") // because slow
+ public void testScheduledTaskInContextClearing() throws Exception {
+ Duration PERIOD = Duration.millis(50);
+ int COUNT = 10;
+ BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+
+ final List<String> errors = MutableList.of();
+
+ final AtomicInteger i2 = new AtomicInteger();
+ // now start other task, in entity context
+ ScheduledTask t2 = new ScheduledTask(MutableMap.of("displayName", "t2", "period", PERIOD), new Callable<Task<?>>() {
+ @Override
+ public Task<?> call() throws Exception {
+ return new BasicTask<Integer>(MutableMap.of("displayName", "t2-i"), new Callable<Integer>() {
+ @Override
+ public Integer call() {
+ Entity ce = BrooklynTaskTags.getContextEntity(Tasks.current());
+ log.info("entity task t2 running (" + i2 + "): " + Thread.currentThread() + " " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)+" - "+ce);
+ if (ce==null) {
+ errors.add("Scheduled task t2 missing context entity");
+ }
+
+ ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask();
+ i2.incrementAndGet();
+ if (i2.get() >= COUNT) submitter.cancel();
+ return i2.get();
+ }
+ });
+ }
+ });
+ Entity contextEntity = new TestEntityImpl();
+ BasicExecutionContext exec = new BasicExecutionContext(m, MutableList.of(BrooklynTaskTags.tagForContextEntity(contextEntity)));
+ exec.submit(t2);
+
+ final AtomicInteger i1 = new AtomicInteger();
+
+ ScheduledTask t1 = new ScheduledTask(MutableMap.of("displayName", "t1", "period", PERIOD), new Callable<Task<?>>() {
+ @Override
+ public Task<?> call() throws Exception {
+ return new BasicTask<Integer>(MutableMap.of("displayName", "t1-i"), new Callable<Integer>() {
+ @Override
+ public Integer call() {
+ Entity ce = BrooklynTaskTags.getContextEntity(Tasks.current());
+ log.info("non-entity task t1 running (" + i1 + "): " + Thread.currentThread() + " " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)+" - "+ce);
+ if (ce!=null) {
+ errors.add("Scheduled task t1 has context entity "+ce);
+ }
+
+ ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask();
+ i1.incrementAndGet();
+ if (i1.get() >= COUNT) submitter.cancel();
+ return i1.get();
+ }
+ });
+ }
+ });
+
+ Time.sleep(70);
+ log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t1, t1.getStatusDetail(false));
+ m.submit(t1);
+ log.info("submitted {} {}", t1, t1.getStatusDetail(false));
+ Integer interimResult = (Integer) t1.get();
+ log.info("done one ({}) {} {}", new Object[]{interimResult, t1, t1.getStatusDetail(false)});
+ assertTrue(i1.get() > 0);
+
+ t1.blockUntilEnded();
+ t2.blockUntilEnded();
+
+ Asserts.assertSize(errors, 0);
+ Asserts.eventually(m::getNumActiveTasks, l -> l==0);
+ }
+
+}
\ No newline at end of file