You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2015/11/19 15:36:30 UTC
[3/5] incubator-brooklyn git commit: Adds cancelOnException to
ScheduledTask
Adds cancelOnException to ScheduledTask
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/e0014b14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/e0014b14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/e0014b14
Branch: refs/heads/master
Commit: e0014b14062ce40fd0269ea5c362a707240ab324
Parents: 26773c0
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Wed Nov 18 12:47:44 2015 +0000
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Thu Nov 19 14:01:52 2015 +0000
----------------------------------------------------------------------
.../util/core/task/BasicExecutionManager.java | 50 +++++++++++++-----
.../brooklyn/util/core/task/ScheduledTask.java | 54 +++++++++++++++-----
.../util/core/task/ScheduledExecutionTest.java | 48 +++++++++++++++--
3 files changed, 123 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e0014b14/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 4c9858a..02277a1 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -398,7 +398,7 @@ public class BasicExecutionManager implements ExecutionManager {
final Callable<?> oldJob = taskScheduled.getJob();
final TaskInternal<?> taskScheduledF = taskScheduled;
taskScheduled.setJob(new Callable() { public Object call() {
- boolean resubmitted = false;
+ boolean shouldResubmit = true;
task.recentRun = taskScheduledF;
try {
synchronized (task) {
@@ -407,25 +407,19 @@ public class BasicExecutionManager implements ExecutionManager {
Object result;
try {
result = oldJob.call();
+ task.lastThrownType = null;
} catch (Exception e) {
- if (!Tasks.isInterrupted()) {
- log.warn("Error executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled execution", e);
- } else {
- log.debug("Interrupted executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled execution: "+e);
- }
+ shouldResubmit = shouldResubmitOnException(oldJob, e);
throw Exceptions.propagate(e);
}
- task.runCount++;
- if (task.period!=null && !task.isCancelled()) {
- task.delay = task.period;
- submitSubsequentScheduledTask(flags, task);
- resubmitted = true;
- }
return result;
} finally {
// do in finally block in case we were interrupted
- if (!resubmitted)
+ if (shouldResubmit) {
+ resubmit();
+ } else {
afterEndScheduledTaskAllIterations(flags, task);
+ }
}
}});
task.nextRun = taskScheduled;
@@ -437,6 +431,36 @@ public class BasicExecutionManager implements ExecutionManager {
}
}
+ private void resubmit() {
+ task.runCount++;
+ if (task.period!=null && !task.isCancelled()) {
+ task.delay = task.period;
+ submitSubsequentScheduledTask(flags, task);
+ }
+ }
+
+ private boolean shouldResubmitOnException(Callable<?> oldJob, Exception e) {
+ String message = "Error executing " + oldJob + " (scheduled job of " + task + " - " + task.getDescription() + ")";
+ if (Tasks.isInterrupted()) {
+ log.debug(message + "; cancelling scheduled execution: " + e);
+ return false;
+ } else if (task.cancelOnException) {
+ log.warn(message + "; cancelling scheduled execution.", e);
+ return false;
+ } else {
+ message += "; resubmitting task and throwing: " + e;
+ if (!e.getClass().equals(task.lastThrownType)) {
+ task.lastThrownType = e.getClass();
+ message += " (logging subsequent exceptions at trace)";
+ log.debug(message);
+ } else {
+ message += " (repeat exception)";
+ log.trace(message);
+ }
+ return true;
+ }
+ }
+
@Override
public String toString() {
return "ScheduledTaskCallable["+task+","+flags+"]";
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e0014b14/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
index 94327a1..c1ad4f8 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
@@ -46,17 +46,32 @@ import com.google.common.base.Throwables;
public class ScheduledTask extends BasicTask {
final Callable<Task<?>> taskFactory;
- /** initial delay before running, set as flag in constructor; defaults to 0 */
+
+ /**
+ * Initial delay before running, set as flag in constructor; defaults to 0
+ */
protected Duration delay;
- /** time to wait between executions, or null if not to repeat (default), set as flag to constructor;
+
+ /**
+ * The time to wait between executions, or null if not to repeat (default), set as flag to constructor;
* this may be modified for subsequent submissions by a running task generated by the factory
- * using getSubmittedByTask().setPeriod(Duration) */
+ * using {@link #getSubmittedByTask().setPeriod(Duration)}
+ */
protected Duration period = null;
- /** optional, set as flag in constructor; defaults to null meaning no limit */
+
+ /**
+ * Optional, set as flag in constructor; defaults to null meaning no limit.
+ */
protected Integer maxIterations = null;
-
+
+ /**
+ * Set false if the task should be rescheduled after throwing an exception; defaults to true.
+ */
+ protected boolean cancelOnException = true;
+
protected int runCount=0;
protected Task<?> recentRun, nextRun;
+ Class<? extends Exception> lastThrownType;
public int getRunCount() { return runCount; }
public ScheduledFuture<?> getNextScheduled() { return (ScheduledFuture<?>)internalFuture; }
@@ -84,12 +99,15 @@ public class ScheduledTask extends BasicTask {
delay = Duration.of(elvis(flags.remove("delay"), 0));
period = Duration.of(elvis(flags.remove("period"), null));
maxIterations = elvis(flags.remove("maxIterations"), null);
+ Object cancelFlag = flags.remove("cancelOnException");
+ cancelOnException = cancelFlag == null || Boolean.TRUE.equals(cancelFlag);
}
public ScheduledTask delay(Duration d) {
this.delay = d;
return this;
}
+
public ScheduledTask delay(long val) {
return delay(Duration.millis(val));
}
@@ -98,6 +116,7 @@ public class ScheduledTask extends BasicTask {
this.period = d;
return this;
}
+
public ScheduledTask period(long val) {
return period(Duration.millis(val));
}
@@ -107,6 +126,11 @@ public class ScheduledTask extends BasicTask {
return this;
}
+ public ScheduledTask cancelOnException(boolean cancel) {
+ this.cancelOnException = cancel;
+ return this;
+ }
+
public Callable<Task<?>> getTaskFactory() {
return taskFactory;
}
@@ -121,13 +145,17 @@ public class ScheduledTask extends BasicTask {
protected String getActiveTaskStatusString(int verbosity) {
StringBuilder rv = new StringBuilder("Scheduler");
- if (runCount>0) rv.append(", iteration "+(runCount+1));
- if (recentRun!=null) rv.append(", last run "+
- Duration.sinceUtc(recentRun.getStartTimeUtc())+" ms ago");
+ if (runCount > 0) {
+ rv.append(", iteration ").append(runCount + 1);
+ }
+ if (recentRun != null) {
+ Duration start = Duration.sinceUtc(recentRun.getStartTimeUtc());
+ rv.append(", last run ").append(start).append(" ago");
+ }
if (truth(getNextScheduled())) {
Duration untilNext = Duration.millis(getNextScheduled().getDelay(TimeUnit.MILLISECONDS));
if (untilNext.isPositive())
- rv.append(", next in "+untilNext);
+ rv.append(", next in ").append(untilNext);
else
rv.append(", next imminent");
}
@@ -158,7 +186,7 @@ public class ScheduledTask extends BasicTask {
while (!isDone()) super.blockUntilEnded();
}
- /** gets the value of the most recently run task */
+ /** @return The value of the most recently run task */
public Object get() throws InterruptedException, ExecutionException {
blockUntilStarted();
blockUntilFirstScheduleStarted();
@@ -175,8 +203,10 @@ public class ScheduledTask extends BasicTask {
return result;
}
- /** internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation
- * @param duration */
+ /**
+ * Internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation.
+ * @param timeout maximum time to wait
+ */
@Beta
public boolean blockUntilNextRunFinished(Duration timeout) {
return Tasks.blockUntilInternalTasksEnded(nextRun, timeout);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e0014b14/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
index 6884040..1d551e8 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
@@ -32,10 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.core.task.BasicExecutionManager;
-import org.apache.brooklyn.util.core.task.BasicTask;
-import org.apache.brooklyn.util.core.task.ScheduledTask;
-import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.javalang.JavaClassNames;
@@ -81,6 +77,50 @@ public class ScheduledExecutionTest {
assertEquals(i.get(), 5);
}
+ @Test
+ public void testScheduledTaskCancelledIfExceptionThrown() throws Exception {
+ BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+ final AtomicInteger calls = new AtomicInteger(0);
+ ScheduledTask t = new ScheduledTask(MutableMap.of("period", Duration.ONE_MILLISECOND, "maxIterations", 5), new Callable<Task<?>>() {
+ public Task<?> call() throws Exception {
+ return new BasicTask<>(new Callable<Integer>() {
+ public Integer call() {
+ calls.incrementAndGet();
+ throw new RuntimeException("boo");
+ }});
+ }});
+
+ m.submit(t);
+ Runnable callsIsOne = new Runnable() {
+ @Override public void run() {
+ if (calls.get() != 1) {
+ throw new RuntimeException("not yet");
+ }
+ }
+
+ };
+ Asserts.succeedsEventually(callsIsOne);
+ Asserts.succeedsContinually(callsIsOne);
+ }
+
+ @Test
+ public void testScheduledTaskResubmittedIfExceptionThrownAndCancelOnExceptionFalse() {
+ BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+ final AtomicInteger calls = new AtomicInteger(0);
+ ScheduledTask t = new ScheduledTask(MutableMap.of("period", Duration.ONE_MILLISECOND, "maxIterations", 5, "cancelOnException", false), new Callable<Task<?>>() {
+ public Task<?> call() throws Exception {
+ return new BasicTask<>(new Callable<Integer>() {
+ public Integer call() {
+ calls.incrementAndGet();
+ throw new RuntimeException("boo");
+ }});
+ }});
+
+ m.submit(t);
+ t.blockUntilEnded();
+ assertEquals(calls.get(), 5, "Expected task to be resubmitted despite throwing an exception");
+ }
+
/** like testScheduledTask but the loop is terminated by the task itself adjusting the period */
@Test
public void testScheduledTaskSelfEnding() throws Exception {