You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:00:35 UTC
[20/64] incubator-brooklyn git commit: [BROOKLYN-162] Refactor
package in ./core/util
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java
new file mode 100644
index 0000000..c776e4d
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java
@@ -0,0 +1,892 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static brooklyn.util.JavaGroovyEquivalents.asString;
+import static brooklyn.util.JavaGroovyEquivalents.elvisString;
+import groovy.lang.Closure;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.util.GroovyJavaMethods;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Callables;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * The basic concrete implementation of a {@link Task} to be executed.
+ *
+ * A {@link Task} is a wrapper for an executable unit, such as a {@link Closure} or a {@link Runnable} or
+ * {@link Callable} and will run in its own {@link Thread}.
+ * <p>
+ * The task can be given an optional displayName and description in its constructor (as named
+ * arguments in the first {@link Map} parameter). It is guaranteed to have {@link Object#notify()} called
+ * once whenever the task starts running and once again when the task is about to complete. Due to
+ * the way executors work it is ugly to guarantee notification <em>after</em> completion, so instead we
+ * notify just before then expect the user to call {@link #get()} - which will throw errors if the underlying job
+ * did so - or {@link #blockUntilEnded()} which will not throw errors.
+ */
+public class BasicTask<T> implements TaskInternal<T> {
+ private static final Logger log = LoggerFactory.getLogger(BasicTask.class);
+
+ private String id = Identifiers.makeRandomId(8);
+ protected Callable<T> job;
+ public final String displayName;
+ public final String description;
+
+ protected final Set<Object> tags = Sets.newConcurrentHashSet();
+ // for debugging, to record where tasks were created
+// { tags.add(new Throwable("Creation stack trace")); }
+
+ protected Task<?> proxyTargetTask = null;
+
+ protected String blockingDetails = null;
+ protected Task<?> blockingTask = null;
+ Object extraStatusText = null;
+
+ /** listeners attached at task level; these are stored here, but run on the underlying ListenableFuture */
+ protected final ExecutionList listeners = new ExecutionList();
+
+ /**
+ * Constructor needed to prevent confusion in groovy stubs when looking for default constructor,
+ *
+ * The generics on {@link Closure} break it if that is first constructor.
+ */
+ protected BasicTask() { this(Collections.emptyMap()); }
+ protected BasicTask(Map<?,?> flags) { this(flags, (Callable<T>) null); }
+
+ public BasicTask(Callable<T> job) { this(Collections.emptyMap(), job); }
+
+ public BasicTask(Map<?,?> flags, Callable<T> job) {
+ this.job = job;
+
+ if (flags.containsKey("tag")) tags.add(flags.remove("tag"));
+ Object ftags = flags.remove("tags");
+ if (ftags!=null) {
+ if (ftags instanceof Iterable) Iterables.addAll(tags, (Iterable<?>)ftags);
+ else {
+ log.info("deprecated use of non-collection argument for 'tags' ("+ftags+") in "+this, new Throwable("trace of discouraged use of non-colleciton tags argument"));
+ tags.add(ftags);
+ }
+ }
+
+ description = elvisString(flags.remove("description"), "");
+ String d = asString(flags.remove("displayName"));
+ displayName = (d==null ? "" : d);
+ }
+
+ public BasicTask(Runnable job) { this(GroovyJavaMethods.<T>callableFromRunnable(job)); }
+ public BasicTask(Map<?,?> flags, Runnable job) { this(flags, GroovyJavaMethods.<T>callableFromRunnable(job)); }
+ public BasicTask(Closure<T> job) { this(GroovyJavaMethods.callableFromClosure(job)); }
+ public BasicTask(Map<?,?> flags, Closure<T> job) { this(flags, GroovyJavaMethods.callableFromClosure(job)); }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Task)
+ return ((Task<?>)obj).getId().equals(getId());
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ // give display name plus id, or job and tags plus id; some jobs have been extended to include nice tostrings
+ return "Task["+
+ (Strings.isNonEmpty(displayName) ?
+ displayName :
+ (job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) +
+ ":"+getId()+"]";
+ }
+
+ @Override
+ public Task<T> asTask() {
+ return this;
+ }
+
+ // housekeeping --------------------
+
+ /*
+ * These flags are set by BasicExecutionManager.submit.
+ *
+ * Order is guaranteed to be as shown below, in order of #. Within each # line it is currently in the order specified by commas but this is not guaranteed.
+ * (The spaces between the # section indicate longer delays / logical separation ... it should be clear!)
+ *
+ * # submitter, submit time set, tags and other submit-time fields set
+ *
+ * # thread set, ThreadLocal getCurrentTask set
+ * # start time set, isBegun is true
+ * # task end callback run, if supplied
+ *
+ * # task runs
+ *
+ * # task end callback run, if supplied
+ * # end time set
+ * # thread cleared, ThreadLocal getCurrentTask set
+ * # Task.notifyAll()
+ * # Task.get() (result.get()) available, Task.isDone is true
+ *
+ * Few _consumers_ should care, but internally we rely on this so that, for example, status is displayed correctly.
+ * Tests should catch most things, but be careful if you change any of the above semantics.
+ */
+
+ protected long queuedTimeUtc = -1;
+ protected long submitTimeUtc = -1;
+ protected long startTimeUtc = -1;
+ protected long endTimeUtc = -1;
+ protected Maybe<Task<?>> submittedByTask;
+
+ protected volatile Thread thread = null;
+ private volatile boolean cancelled = false;
+ /** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */
+ protected volatile Future<T> internalFuture = null;
+
+ @Override
+ public synchronized void initInternalFuture(ListenableFuture<T> result) {
+ if (this.internalFuture != null)
+ throw new IllegalStateException("task "+this+" is being given a result twice");
+ this.internalFuture = result;
+ notifyAll();
+ }
+
+ // metadata accessors ------------
+
+ @Override
+ public Set<Object> getTags() { return Collections.unmodifiableSet(new LinkedHashSet<Object>(tags)); }
+
+ /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here;
+ * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */
+ @Override
+ public long getQueuedTimeUtc() { return queuedTimeUtc; }
+
+ @Override
+ public long getSubmitTimeUtc() { return submitTimeUtc; }
+
+ @Override
+ public long getStartTimeUtc() { return startTimeUtc; }
+
+ @Override
+ public long getEndTimeUtc() { return endTimeUtc; }
+
+ @Override
+ public Future<T> getInternalFuture() { return internalFuture; }
+
+ @Override
+ public Task<?> getSubmittedByTask() {
+ if (submittedByTask==null) return null;
+ return submittedByTask.orNull();
+ }
+
+ /** the thread where the task is running, if it is running */
+ @Override
+ public Thread getThread() { return thread; }
+
+ // basic fields --------------------
+
+ @Override
+ public boolean isQueued() {
+ return (queuedTimeUtc >= 0);
+ }
+
+ @Override
+ public boolean isQueuedOrSubmitted() {
+ return isQueued() || isSubmitted();
+ }
+
+ @Override
+ public boolean isQueuedAndNotSubmitted() {
+ return isQueued() && (!isSubmitted());
+ }
+
+ @Override
+ public boolean isSubmitted() {
+ return submitTimeUtc >= 0;
+ }
+
+ @Override
+ public boolean isBegun() {
+ return startTimeUtc >= 0;
+ }
+
+ /** marks the task as queued for execution */
+ @Override
+ public void markQueued() {
+ if (queuedTimeUtc<0)
+ queuedTimeUtc = System.currentTimeMillis();
+ }
+
+ @Override
+ public final synchronized boolean cancel() { return cancel(true); }
+
+ /** doesn't resume it, just means if something was cancelled but not submitted it could now be submitted;
+ * probably going to be removed and perhaps some mechanism for running again made available
+ * @since 0.7.0 */
+ @Beta
+ public synchronized boolean uncancel() {
+ boolean wasCancelled = cancelled;
+ cancelled = false;
+ return wasCancelled;
+ }
+
+ @Override
+ public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+ if (isDone()) return false;
+ boolean cancel = true;
+ cancelled = true;
+ if (internalFuture!=null) {
+ cancel = internalFuture.cancel(mayInterruptIfRunning);
+ }
+ notifyAll();
+ return cancel;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled || (internalFuture!=null && internalFuture.isCancelled());
+ }
+
+ @Override
+ public boolean isDone() {
+ // if endTime is set, result might not be completed yet, but it will be set very soon
+ // (the two values are set close in time, result right after the endTime;
+ // but callback hooks might not see the result yet)
+ return cancelled || (internalFuture!=null && internalFuture.isDone()) || endTimeUtc>0;
+ }
+
+ /**
+ * Returns true if the task has had an error.
+ *
+ * Only true if calling {@link #get()} will throw an exception when it completes (including cancel).
+ * Implementations may set this true before completion if they have that insight, or
+ * (the default) they may compute it lazily after completion (returning false before completion).
+ */
+ @Override
+ public boolean isError() {
+ if (!isDone()) return false;
+ if (isCancelled()) return true;
+ try {
+ get();
+ return false;
+ } catch (Throwable t) {
+ return true;
+ }
+ }
+
+ // future value --------------------
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ try {
+ if (!isDone())
+ Tasks.setBlockingTask(this);
+ blockUntilStarted();
+ return internalFuture.get();
+ } finally {
+ Tasks.resetBlockingTask();
+ }
+ }
+
+ @Override
+ public T getUnchecked() {
+ try {
+ return get();
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ @Override
+ public synchronized void blockUntilStarted() {
+ blockUntilStarted(null);
+ }
+
+ @Override
+ public synchronized boolean blockUntilStarted(Duration timeout) {
+ Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp();
+ while (true) {
+ if (cancelled) throw new CancellationException();
+ if (internalFuture==null)
+ try {
+ if (timeout==null) {
+ wait();
+ } else {
+ long remaining = endTime - System.currentTimeMillis();
+ if (remaining>0)
+ wait(remaining);
+ else
+ return false;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ Throwables.propagate(e);
+ }
+ if (internalFuture!=null) return true;
+ }
+ }
+
+ @Override
+ public void blockUntilEnded() {
+ blockUntilEnded(null);
+ }
+
+ @Override
+ public boolean blockUntilEnded(Duration timeout) {
+ Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp();
+ try {
+ boolean started = blockUntilStarted(timeout);
+ if (!started) return false;
+ if (timeout==null) {
+ internalFuture.get();
+ } else {
+ long remaining = endTime - System.currentTimeMillis();
+ if (remaining>0)
+ internalFuture.get(remaining, TimeUnit.MILLISECONDS);
+ }
+ return isDone();
+ } catch (Throwable t) {
+ Exceptions.propagateIfFatal(t);
+ if (!(t instanceof TimeoutException) && log.isDebugEnabled())
+ log.debug("call from "+Thread.currentThread()+", blocking until '"+this+"' finishes, ended with error: "+t);
+ return isDone();
+ }
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return get(new Duration(timeout, unit));
+ }
+
+ @Override
+ public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException {
+ long start = System.currentTimeMillis();
+ Long end = duration==null ? null : start + duration.toMillisecondsRoundingUp();
+ while (end==null || end > System.currentTimeMillis()) {
+ if (cancelled) throw new CancellationException();
+ if (internalFuture == null) {
+ synchronized (this) {
+ long remaining = end - System.currentTimeMillis();
+ if (internalFuture==null && remaining>0)
+ wait(remaining);
+ }
+ }
+ if (internalFuture != null) break;
+ }
+ Long remaining = end==null ? null : end - System.currentTimeMillis();
+ if (isDone()) {
+ return internalFuture.get(1, TimeUnit.MILLISECONDS);
+ } else if (remaining == null) {
+ return internalFuture.get();
+ } else if (remaining > 0) {
+ return internalFuture.get(remaining, TimeUnit.MILLISECONDS);
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ @Override
+ public T getUnchecked(Duration duration) {
+ try {
+ return get(duration);
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ // ------------------ status ---------------------------
+
+ /**
+ * Returns a brief status string
+ *
+ * Plain-text format. Reported status if there is one, otherwise state which will be one of:
+ * <ul>
+ * <li>Not submitted
+ * <li>Submitted for execution
+ * <li>Ended by error
+ * <li>Ended by cancellation
+ * <li>Ended normally
+ * <li>Running
+ * <li>Waiting
+ * </ul>
+ */
+ @Override
+ public String getStatusSummary() {
+ return getStatusString(0);
+ }
+
+ /**
+ * Returns detailed status, suitable for a hover
+ *
+ * Plain-text format, with new-lines (and sometimes extra info) if multiline enabled.
+ */
+ @Override
+ public String getStatusDetail(boolean multiline) {
+ return getStatusString(multiline?2:1);
+ }
+
+ /**
+ * This method is useful for callers to see the status of a task.
+ *
+ * Also for developers to see best practices for examining status fields etc
+ *
+ * @param verbosity 0 = brief, 1 = one-line with some detail, 2 = lots of detail
+ */
+ protected String getStatusString(int verbosity) {
+// Thread t = getThread();
+ String rv;
+ if (submitTimeUtc <= 0) rv = "Not submitted";
+ else if (!isCancelled() && startTimeUtc <= 0) {
+ rv = "Submitted for execution";
+ if (verbosity>0) {
+ long elapsed = System.currentTimeMillis() - submitTimeUtc;
+ rv += " "+Time.makeTimeStringRoundedSince(elapsed)+" ago";
+ }
+ if (verbosity >= 2 && getExtraStatusText()!=null) {
+ rv += "\n\n"+getExtraStatusText();
+ }
+ } else if (isDone()) {
+ long elapsed = endTimeUtc - submitTimeUtc;
+ String duration = Time.makeTimeStringRounded(elapsed);
+ if (isCancelled()) {
+ rv = "Cancelled";
+ if (verbosity >= 1) rv+=" after "+duration;
+
+ if (verbosity >= 2 && getExtraStatusText()!=null) {
+ rv += "\n\n"+getExtraStatusText();
+ }
+ } else if (isError()) {
+ rv = "Failed";
+ if (verbosity >= 1) {
+ rv += " after "+duration;
+ Throwable error = Tasks.getError(this);
+
+ if (verbosity >= 2 && getExtraStatusText()!=null) {
+ rv += "\n\n"+getExtraStatusText();
+ }
+
+ //remove outer ExecException which is reported by the get(), we want the exception the task threw
+ while (error instanceof ExecutionException) error = error.getCause();
+ String errorMessage = Exceptions.collapseText(error);
+
+ if (verbosity == 1) rv += ": "+abbreviate(errorMessage);
+ if (verbosity >= 2) {
+ rv += ": "+errorMessage;
+ StringWriter sw = new StringWriter();
+ ((Throwable)error).printStackTrace(new PrintWriter(sw));
+ rv += "\n\n"+sw.getBuffer();
+ }
+ }
+ } else {
+ rv = "Completed";
+ if (verbosity>=1) {
+ if (verbosity==1) {
+ try {
+ Object v = get();
+ rv += ", " +(v==null ? "no return value (null)" : "result: "+abbreviate(v.toString()));
+ } catch (Exception e) {
+ rv += ", but error accessing result ["+e+"]"; //shouldn't happen
+ }
+ } else {
+ rv += " after "+duration;
+ try {
+ Object v = get();
+ rv += "\n\n" + (v==null ? "No return value (null)" : "Result: "+v);
+ } catch (Exception e) {
+ rv += " at first\n" +
+ "Error accessing result ["+e+"]"; //shouldn't happen
+ }
+ if (verbosity >= 2 && getExtraStatusText()!=null) {
+ rv += "\n\n"+getExtraStatusText();
+ }
+ }
+ }
+ }
+ } else {
+ rv = getActiveTaskStatusString(verbosity);
+ }
+ return rv;
+ }
+
+ private static String abbreviate(String s) {
+ s = Strings.getFirstLine(s);
+ if (s.length()>255) s = s.substring(0, 252)+ "...";
+ return s;
+ }
+
+ protected String getActiveTaskStatusString(int verbosity) {
+ String rv = "";
+ Thread t = getThread();
+
+ // Normally, it's not possible for thread==null as we were started and not ended
+
+ // However, there is a race where the task starts sand completes between the calls to getThread()
+ // at the start of the method and this call to getThread(), so both return null even though
+ // the intermediate checks returned started==true isDone()==false.
+ if (t == null) {
+ if (isDone()) {
+ return getStatusString(verbosity);
+ } else {
+ //should only happen for repeating task which is not active
+ return "Sleeping";
+ }
+ }
+
+ ThreadInfo ti = ManagementFactory.getThreadMXBean().getThreadInfo(t.getId(), (verbosity<=0 ? 0 : verbosity==1 ? 1 : Integer.MAX_VALUE));
+ if (getThread()==null)
+ //thread might have moved on to a new task; if so, recompute (it should now say "done")
+ return getStatusString(verbosity);
+
+ if (verbosity >= 1 && Strings.isNonBlank(blockingDetails)) {
+ if (verbosity==1)
+ // short status string will just show blocking details
+ return blockingDetails;
+ //otherwise show the blocking details, then a new line, then additional information
+ rv = blockingDetails + "\n\n";
+ }
+
+ if (verbosity >= 1 && blockingTask!=null) {
+ if (verbosity==1)
+ // short status string will just show blocking details
+ return "Waiting on "+blockingTask;
+ //otherwise show the blocking details, then a new line, then additional information
+ rv = "Waiting on "+blockingTask + "\n\n";
+ }
+
+ if (verbosity>=2) {
+ if (getExtraStatusText()!=null) {
+ rv += getExtraStatusText()+"\n\n";
+ }
+
+ rv += ""+toString()+"\n";
+ if (submittedByTask!=null) {
+ rv += "Submitted by "+submittedByTask+"\n";
+ }
+
+ if (this instanceof HasTaskChildren) {
+ // list children tasks for compound tasks
+ try {
+ Iterable<Task<?>> childrenTasks = ((HasTaskChildren)this).getChildren();
+ if (childrenTasks.iterator().hasNext()) {
+ rv += "Children:\n";
+ for (Task<?> child: childrenTasks) {
+ rv += " "+child+": "+child.getStatusDetail(false)+"\n";
+ }
+ }
+ } catch (ConcurrentModificationException exc) {
+ rv += " (children not available - currently being modified)\n";
+ }
+ }
+ rv += "\n";
+ }
+
+ LockInfo lock = ti.getLockInfo();
+ rv += "In progress";
+ if (verbosity>=1) {
+ if (lock==null && ti.getThreadState()==Thread.State.RUNNABLE) {
+ //not blocked
+ if (ti.isSuspended()) {
+ // when does this happen?
+ rv += ", thread suspended";
+ } else {
+ if (verbosity >= 2) rv += " ("+ti.getThreadState()+")";
+ }
+ } else {
+ rv +=", thread waiting ";
+ if (ti.getThreadState() == Thread.State.BLOCKED) {
+ rv += "(mutex) on "+lookup(lock);
+ //TODO could say who holds it
+ } else if (ti.getThreadState() == Thread.State.WAITING) {
+ rv += "(notify) on "+lookup(lock);
+ } else if (ti.getThreadState() == Thread.State.TIMED_WAITING) {
+ rv += "(timed) on "+lookup(lock);
+ } else {
+ rv = "("+ti.getThreadState()+") on "+lookup(lock);
+ }
+ }
+ }
+ if (verbosity>=2) {
+ StackTraceElement[] st = ti.getStackTrace();
+ st = brooklyn.util.javalang.StackTraceSimplifier.cleanStackTrace(st);
+ if (st!=null && st.length>0)
+ rv += "\n" +"At: "+st[0];
+ for (int ii=1; ii<st.length; ii++) {
+ rv += "\n" +" "+st[ii];
+ }
+ }
+ return rv;
+ }
+
+ protected String lookup(LockInfo info) {
+ return info!=null ? ""+info : "unknown (sleep)";
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+
+ /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait,
+ * and typically cleared immediately afterwards; referenced by management api to inspect a task
+ * which is blocking
+ */
+ @Override
+ public String setBlockingDetails(String blockingDetails) {
+ String old = this.blockingDetails;
+ this.blockingDetails = blockingDetails;
+ return old;
+ }
+
+ @Override
+ public Task<?> setBlockingTask(Task<?> blockingTask) {
+ Task<?> old = this.blockingTask;
+ this.blockingTask = blockingTask;
+ return old;
+ }
+
+ @Override
+ public void resetBlockingDetails() {
+ this.blockingDetails = null;
+ }
+
+ @Override
+ public void resetBlockingTask() {
+ this.blockingTask = null;
+ }
+
+ /** returns a textual message giving details while the task is blocked */
+ @Override
+ public String getBlockingDetails() {
+ return blockingDetails;
+ }
+
+ /** returns a task that this task is blocked on */
+ @Override
+ public Task<?> getBlockingTask() {
+ return blockingTask;
+ }
+
+ @Override
+ public void setExtraStatusText(Object extraStatus) {
+ this.extraStatusText = extraStatus;
+ }
+
+ @Override
+ public Object getExtraStatusText() {
+ return extraStatusText;
+ }
+
+ // ---- add a way to warn if task is not run
+
+ public interface TaskFinalizer {
+ public void onTaskFinalization(Task<?> t);
+ }
+
+ public static final TaskFinalizer WARN_IF_NOT_RUN = new TaskFinalizer() {
+ @Override
+ public void onTaskFinalization(Task<?> t) {
+ if (!Tasks.isAncestorCancelled(t) && !t.isSubmitted()) {
+ log.warn(t+" was never submitted; did the code create it and forget to run it? ('cancel' the task to suppress this message)");
+ log.debug("Detail of unsubmitted task "+t+":\n"+t.getStatusDetail(true));
+ return;
+ }
+ if (!t.isDone()) {
+ // shouldn't happen
+ // TODO But does happen if management context was terminated (e.g. running test suite).
+ // Should check if Execution Manager is running, and only log if it was not terminated?
+ log.warn("Task "+t+" is being finalized before completion");
+ return;
+ }
+ }
+ };
+
+ public static final TaskFinalizer NO_OP = new TaskFinalizer() {
+ @Override
+ public void onTaskFinalization(Task<?> t) {
+ }
+ };
+
+ public void ignoreIfNotRun() {
+ setFinalizer(NO_OP);
+ }
+
+ public void setFinalizer(TaskFinalizer f) {
+ TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false);
+ if (finalizer!=null && finalizer!=f)
+ throw new IllegalStateException("Cannot apply multiple finalizers");
+ if (isDone())
+ throw new IllegalStateException("Finalizer cannot be set on task "+this+" after it is finished");
+ tags.add(f);
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false);
+ if (finalizer==null) finalizer = WARN_IF_NOT_RUN;
+ finalizer.onTaskFinalization(this);
+ }
+
+ public static class SubmissionErrorCatchingExecutor implements Executor {
+ final Executor target;
+ public SubmissionErrorCatchingExecutor(Executor target) {
+ this.target = target;
+ }
+ @Override
+ public void execute(Runnable command) {
+ if (isShutdown()) {
+ log.debug("Skipping execution of task callback hook "+command+" because executor is shutdown.");
+ return;
+ }
+ try {
+ target.execute(command);
+ } catch (Exception e) {
+ if (isShutdown()) {
+ log.debug("Ignoring failed execution of task callback hook "+command+" because executor is shutdown.");
+ } else {
+ log.warn("Execution of task callback hook "+command+" failed: "+e, e);
+ }
+ }
+ }
+ protected boolean isShutdown() {
+ return target instanceof ExecutorService && ((ExecutorService)target).isShutdown();
+ }
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ listeners.add(listener, new SubmissionErrorCatchingExecutor(executor));
+ }
+
+ @Override
+ public void runListeners() {
+ listeners.execute();
+ }
+
+ @Override
+ public void setEndTimeUtc(long val) {
+ endTimeUtc = val;
+ }
+
+ @Override
+ public void setThread(Thread thread) {
+ this.thread = thread;
+ }
+
+ @Override
+ public Callable<T> getJob() {
+ return job;
+ }
+
+ @Override
+ public void setJob(Callable<T> job) {
+ this.job = job;
+ }
+
+ @Override
+ public ExecutionList getListeners() {
+ return listeners;
+ }
+
+ @Override
+ public void setSubmitTimeUtc(long val) {
+ submitTimeUtc = val;
+ }
+
+ private static <T> Task<T> newGoneTaskFor(Task<?> task) {
+ Task<T> t = Tasks.<T>builder().dynamic(false).name(task.getDisplayName())
+ .description("Details of the original task "+task+" have been forgotten.")
+ .body(Callables.returning((T)null)).build();
+ ((BasicTask<T>)t).ignoreIfNotRun();
+ return t;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void setSubmittedByTask(Task<?> task) {
+ submittedByTask = (Maybe)Maybe.softThen((Task)task, (Maybe)Maybe.of(BasicTask.newGoneTaskFor(task)));
+ }
+
+ @Override
+ public Set<Object> getMutableTags() {
+ return tags;
+ }
+
+ @Override
+ public void setStartTimeUtc(long val) {
+ startTimeUtc = val;
+ }
+
+ @Override
+ public void applyTagModifier(Function<Set<Object>,Void> modifier) {
+ modifier.apply(tags);
+ }
+
+ @Override
+ public Task<?> getProxyTarget() {
+ return proxyTargetTask;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java b/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java
new file mode 100644
index 0000000..407a93a
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+public interface CanSetName {
+
+ void setName(String name);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java
new file mode 100644
index 0000000..8fdb146
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import groovy.lang.Closure;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.util.collections.MutableMap;
+
+
+/**
+ * A {@link Task} that is comprised of other units of work: possibly a heterogeneous mix of {@link Task},
+ * {@link Runnable}, {@link Callable} and {@link Closure} instances.
+ *
+ * This class holds the collection of child tasks, but subclasses have the responsibility of executing them in a
+ * sensible manner by implementing the abstract {@link #runJobs} method.
+ */
+public abstract class CompoundTask<T> extends BasicTask<List<T>> implements HasTaskChildren {
+
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(CompoundTask.class);
+
+ protected final List<Task<? extends T>> children;
+ protected final List<Object> result;
+
+ /**
+ * Constructs a new compound task containing the specified units of work.
+ *
+ * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided.
+ * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types
+ */
+ public CompoundTask(Object... jobs) {
+ this( Arrays.asList(jobs) );
+ }
+
+ /**
+ * Constructs a new compound task containing the specified units of work.
+ *
+ * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided.
+ * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types
+ */
+ public CompoundTask(Collection<?> jobs) {
+ this(MutableMap.of("tag", "compound"), jobs);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public CompoundTask(Map<String,?> flags, Collection<?> jobs) {
+ super(flags);
+ super.job = new Callable<List<T>>() {
+ @Override public List<T> call() throws Exception {
+ return runJobs();
+ }
+ };
+
+ this.result = new ArrayList<Object>(jobs.size());
+ this.children = new ArrayList<Task<? extends T>>(jobs.size());
+ for (Object job : jobs) {
+ Task subtask;
+ if (job instanceof TaskAdaptable) { subtask = ((TaskAdaptable)job).asTask(); }
+ else if (job instanceof Closure) { subtask = new BasicTask<T>((Closure) job); }
+ else if (job instanceof Callable) { subtask = new BasicTask<T>((Callable) job); }
+ else if (job instanceof Runnable) { subtask = new BasicTask<T>((Runnable) job); }
+
+ else throw new IllegalArgumentException("Invalid child "+(job == null ? null : job.getClass() + " ("+job+")")+
+ " passed to compound task; must be Runnable, Callable, Closure or Task");
+
+ BrooklynTaskTags.addTagDynamically(subtask, ManagementContextInternal.SUB_TASK_TAG);
+ children.add(subtask);
+ }
+
+ for (Task<?> t: getChildren()) {
+ ((TaskInternal<?>)t).markQueued();
+ }
+ }
+
+ /** return value needs to be specified by subclass; subclass should also setBlockingDetails
+ * @throws ExecutionException
+ * @throws InterruptedException */
+ protected abstract List<T> runJobs() throws InterruptedException, ExecutionException;
+
+ protected void submitIfNecessary(TaskAdaptable<?> task) {
+ if (!task.asTask().isSubmitted()) {
+ if (BasicExecutionContext.getCurrentExecutionContext() == null) {
+ throw new IllegalStateException("Compound task ("+task+") launched from "+this+" missing required execution context");
+ } else {
+ BasicExecutionContext.getCurrentExecutionContext().submit(task);
+ }
+ }
+ }
+
+ public List<Task<? extends T>> getChildrenTyped() {
+ return children;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public List<Task<?>> getChildren() {
+ return (List) getChildrenTyped();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java b/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java
new file mode 100644
index 0000000..ad9416b
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import com.google.common.base.Supplier;
+
+/**
+ * A class that supplies objects of a single type. When used as a ConfigKey value,
+ * the evaluation is deferred until getConfig() is called. The returned value will then
+ * be coerced to the correct type.
+ *
+ * Subsequent calls to getConfig will result in further calls to deferredProvider.get(),
+ * rather than reusing the result. If you want to reuse the result, consider instead
+ * using a Future.
+ *
+ * Note that this functionality replaces the ues of Closure in brooklyn 0.4.0, which
+ * served the same purpose.
+ */
+public interface DeferredSupplier<T> extends Supplier<T> {
+ @Override
+ T get();
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java
new file mode 100644
index 0000000..e197705
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import groovy.lang.Closure;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskQueueingContext;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.CountdownTimer;
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableList;
+
+/** Represents a task whose run() method can create other tasks
+ * which are run sequentially, but that sequence runs in parallel to this task
+ * <p>
+ * There is an optional primary job run with this task, along with multiple secondary children.
+ * If any secondary task fails (assuming it isn't {@link Tasks#markInessential()} then by default
+ * subsequent tasks are not submitted and the primary task fails (but no tasks are cancelled or interrupted).
+ * You can change the behavior of this task with fields in {@link FailureHandlingConfig},
+ * or the convenience {@link TaskQueueingContext#swallowChildrenFailures()}
+ * (and {@link DynamicTasks#swallowChildrenFailures()} if you are inside the task).
+ * <p>
+ * This synchronizes on secondary tasks when submitting them, in case they may be manually submitted
+ * and the submitter wishes to ensure it is only submitted once.
+ * <p>
+ * Improvements which would be nice to have:
+ * <li> unqueued tasks not visible in api; would like that
+ * <li> uses an extra thread (submitted as background task) to monitor the secondary jobs; would be nice to remove this,
+ * and rely on {@link BasicExecutionManager} to run the jobs sequentially (combined with fix to item above)
+ * <li> would be nice to have cancel, resume, and possibly skipQueue available as operations (ideally in the REST API and GUI)
+ **/
+public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChildren, TaskQueueingContext {
+
+ private static final Logger log = LoggerFactory.getLogger(CompoundTask.class);
+
+ protected final Queue<Task<?>> secondaryJobsAll = new ConcurrentLinkedQueue<Task<?>>();
+ protected final Queue<Task<?>> secondaryJobsRemaining = new ConcurrentLinkedQueue<Task<?>>();
+ protected final Object jobTransitionLock = new Object();
+ protected volatile boolean primaryStarted = false;
+ protected volatile boolean primaryFinished = false;
+ protected volatile boolean secondaryQueueAborted = false;
+ protected Thread primaryThread;
+ protected DstJob dstJob;
+ protected FailureHandlingConfig failureHandlingConfig = FailureHandlingConfig.DEFAULT;
+
+ // default values for how to handle the various failures
+ @Beta
+ public static class FailureHandlingConfig {
+ /** secondary queue runs independently of primary task (submitting and blocking on each secondary task in order),
+ * but can set it up not to submit any more tasks if the primary fails */
+ public final boolean abortSecondaryQueueOnPrimaryFailure;
+ /** as {@link #abortSecondaryQueueOnPrimaryFailure} but controls cancelling of secondary queue*/
+ public final boolean cancelSecondariesOnPrimaryFailure;
+ /** secondary queue can continue submitting+blocking tasks even if a secondary task fails (unusual;
+ * typically handled by {@link TaskTags#markInessential(Task)} on the secondary tasks, in which case
+ * the secondary queue is never aborted */
+ public final boolean abortSecondaryQueueOnSecondaryFailure;
+ /** unsubmitted secondary tasks (ie those further in the queue) can be cancelled if a secondary task fails */
+ public final boolean cancelSecondariesOnSecondaryFailure;
+ /** whether to issue cancel against primary task if a secondary task fails */
+ public final boolean cancelPrimaryOnSecondaryFailure;
+ /** whether to fail this task if a secondary task fails */
+ public final boolean failParentOnSecondaryFailure;
+
+ @Beta
+ public FailureHandlingConfig(
+ boolean abortSecondaryQueueOnPrimaryFailure, boolean cancelSecondariesOnPrimaryFailure,
+ boolean abortSecondaryQueueOnSecondaryFailure, boolean cancelSecondariesOnSecondaryFailure,
+ boolean cancelPrimaryOnSecondaryFailure, boolean failParentOnSecondaryFailure) {
+ this.abortSecondaryQueueOnPrimaryFailure = abortSecondaryQueueOnPrimaryFailure;
+ this.cancelSecondariesOnPrimaryFailure = cancelSecondariesOnPrimaryFailure;
+ this.abortSecondaryQueueOnSecondaryFailure = abortSecondaryQueueOnSecondaryFailure;
+ this.cancelSecondariesOnSecondaryFailure = cancelSecondariesOnSecondaryFailure;
+ this.cancelPrimaryOnSecondaryFailure = cancelPrimaryOnSecondaryFailure;
+ this.failParentOnSecondaryFailure = failParentOnSecondaryFailure;
+ }
+
+ public static final FailureHandlingConfig DEFAULT = new FailureHandlingConfig(false, false, true, false, false, true);
+ public static final FailureHandlingConfig SWALLOWING_CHILDREN_FAILURES = new FailureHandlingConfig(false, false, false, false, false, false);
+ }
+
+ public static class QueueAbortedException extends IllegalStateException {
+ private static final long serialVersionUID = -7569362887826818524L;
+
+ public QueueAbortedException(String msg) {
+ super(msg);
+ }
+ public QueueAbortedException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+
+ /**
+ * Constructs a new compound task containing the specified units of work.
+ *
+ * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided.
+ * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types
+ */
+ public DynamicSequentialTask() {
+ this(null);
+ }
+
+ public DynamicSequentialTask(Callable<T> mainJob) {
+ this(MutableMap.of("tag", "compound"), mainJob);
+ }
+
+ public DynamicSequentialTask(Map<?,?> flags, Callable<T> mainJob) {
+ super(flags);
+ this.job = dstJob = new DstJob(mainJob);
+ }
+
+ @Override
+ public void queue(Task<?> t) {
+ synchronized (jobTransitionLock) {
+ if (primaryFinished)
+ throw new IllegalStateException("Cannot add a task to "+this+" which is already finished (trying to add "+t+")");
+ if (secondaryQueueAborted)
+ throw new QueueAbortedException("Cannot add a task to "+this+" whose queue has been aborted (trying to add "+t+")");
+ secondaryJobsAll.add(t);
+ secondaryJobsRemaining.add(t);
+ BrooklynTaskTags.addTagsDynamically(t, ManagementContextInternal.SUB_TASK_TAG);
+ ((TaskInternal<?>)t).markQueued();
+ jobTransitionLock.notifyAll();
+ }
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return cancel(mayInterruptIfRunning, mayInterruptIfRunning, true);
+ }
+ public boolean cancel(boolean mayInterruptTask, boolean interruptPrimaryThread, boolean alsoCancelChildren) {
+ if (isDone()) return false;
+ if (log.isTraceEnabled()) log.trace("cancelling {}", this);
+ boolean cancel = super.cancel(mayInterruptTask);
+ if (alsoCancelChildren) {
+ for (Task<?> t: secondaryJobsAll)
+ cancel |= t.cancel(mayInterruptTask);
+ }
+ synchronized (jobTransitionLock) {
+ if (primaryThread!=null) {
+ if (interruptPrimaryThread) {
+ if (log.isTraceEnabled()) log.trace("cancelling {} - interrupting", this);
+ primaryThread.interrupt();
+ }
+ cancel = true;
+ }
+ }
+ return cancel;
+ }
+
+ @Override
+ public synchronized boolean uncancel() {
+ secondaryQueueAborted = false;
+ return super.uncancel();
+ }
+
+ @Override
+ public Iterable<Task<?>> getChildren() {
+ return Collections.unmodifiableCollection(secondaryJobsAll);
+ }
+
+ /** submits the indicated task for execution in the current execution context, and returns immediately */
+ protected void submitBackgroundInheritingContext(Task<?> task) {
+ BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
+ if (log.isTraceEnabled()) {
+ log.trace("task {} - submitting background task {} ({})", new Object[] { Tasks.current(), task, ec });
+ }
+ if (ec==null) {
+ String message = Tasks.current()!=null ?
+ // user forgot ExecContext:
+ "Task "+this+" submitting background task requires an ExecutionContext (an ExecutionManager is not enough): submitting "+task+" in "+Tasks.current()
+ : // should not happen:
+ "Cannot submit tasks inside DST when not in a task : submitting "+task+" in "+this;
+ log.warn(message+" (rethrowing)");
+ throw new IllegalStateException(message);
+ }
+ synchronized (task) {
+ if (task.isSubmitted()) {
+ if (log.isTraceEnabled()) {
+ log.trace("DST "+this+" skipping submission of child "+task+" because it is already submitted");
+ }
+ } else {
+ try {
+ ec.submit(task);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ // Give some context when the submit fails (happens when the target is already unmanaged)
+ throw new IllegalStateException("Failure submitting task "+task+" in "+this+": "+e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ public void setFailureHandlingConfig(FailureHandlingConfig failureHandlingConfig) {
+ this.failureHandlingConfig = failureHandlingConfig;
+ }
+ @Override
+ public void swallowChildrenFailures() {
+ setFailureHandlingConfig(FailureHandlingConfig.SWALLOWING_CHILDREN_FAILURES);
+ }
+
+ protected class DstJob implements Callable<T> {
+ protected Callable<T> primaryJob;
+ /** currently executing (or just completed) secondary task, or null if none;
+ * with jobTransitionLock notified on change and completion */
+ protected volatile Task<?> currentSecondary = null;
+ protected volatile boolean finishedSecondaries = false;
+
+ public DstJob(Callable<T> mainJob) {
+ this.primaryJob = mainJob;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T call() throws Exception {
+
+ synchronized (jobTransitionLock) {
+ primaryStarted = true;
+ primaryThread = Thread.currentThread();
+ for (Task<?> t: secondaryJobsAll)
+ ((TaskInternal<?>)t).markQueued();
+ }
+ // TODO overkill having a thread/task for this, but it works
+ // optimisation would either use newTaskEndCallback property on task to submit
+ // or use some kind of single threaded executor for the queued tasks
+ Task<List<Object>> secondaryJobMaster = Tasks.<List<Object>>builder().dynamic(false)
+ .name("DST manager (internal)")
+ // TODO marking it transient helps it be GC'd sooner,
+ // but ideally we wouldn't have this,
+ // or else it would be a child
+ .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
+ .body(new Callable<List<Object>>() {
+
+ @Override
+ public List<Object> call() throws Exception {
+ List<Object> result = new ArrayList<Object>();
+ try {
+ while (!secondaryQueueAborted && (!primaryFinished || !secondaryJobsRemaining.isEmpty())) {
+ synchronized (jobTransitionLock) {
+ if (!primaryFinished && secondaryJobsRemaining.isEmpty()) {
+ currentSecondary = null;
+ jobTransitionLock.wait(1000);
+ }
+ }
+ @SuppressWarnings("rawtypes")
+ Task secondaryJob = secondaryJobsRemaining.poll();
+ if (secondaryJob != null) {
+ synchronized (jobTransitionLock) {
+ currentSecondary = secondaryJob;
+ submitBackgroundInheritingContext(secondaryJob);
+ jobTransitionLock.notifyAll();
+ }
+ try {
+ result.add(secondaryJob.get());
+ } catch (Exception e) {
+ if (TaskTags.isInessential(secondaryJob)) {
+ result.add(Tasks.getError(secondaryJob));
+ if (log.isDebugEnabled())
+ log.debug("Secondary job queue for "+DynamicSequentialTask.this+" ignoring error in inessential task "+secondaryJob+": "+e);
+ } else {
+ if (failureHandlingConfig.cancelSecondariesOnSecondaryFailure) {
+ if (log.isDebugEnabled())
+ log.debug("Secondary job queue for "+DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in task "+secondaryJob+": "+e);
+ synchronized (jobTransitionLock) {
+ for (Task<?> t: secondaryJobsRemaining)
+ t.cancel(true);
+ jobTransitionLock.notifyAll();
+ }
+ }
+
+ if (failureHandlingConfig.abortSecondaryQueueOnSecondaryFailure) {
+ if (log.isDebugEnabled())
+ log.debug("Aborting secondary job queue for "+DynamicSequentialTask.this+" due to error in child task "+secondaryJob+" ("+e+", being rethrown)");
+ secondaryQueueAborted = true;
+ throw e;
+ }
+
+ if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) {
+ cancel(true, false, false);
+ }
+
+ result.add(Tasks.getError(secondaryJob));
+ if (log.isDebugEnabled())
+ log.debug("Secondary job queue for "+DynamicSequentialTask.this+" continuing in presence of error in child task "+secondaryJob+" ("+e+", being remembered)");
+ }
+ }
+ }
+ }
+ } finally {
+ synchronized (jobTransitionLock) {
+ currentSecondary = null;
+ finishedSecondaries = true;
+ jobTransitionLock.notifyAll();
+ }
+ }
+ return result;
+ }
+ }).build();
+ ((BasicTask<?>)secondaryJobMaster).proxyTargetTask = DynamicSequentialTask.this;
+
+ submitBackgroundInheritingContext(secondaryJobMaster);
+
+ T result = null;
+ Throwable error = null;
+ Throwable uninterestingSelfError = null;
+ boolean errorIsFromChild = false;
+ try {
+ if (log.isTraceEnabled()) log.trace("calling primary job for {}", this);
+ if (primaryJob!=null) result = primaryJob.call();
+ } catch (Throwable selfException) {
+ Exceptions.propagateIfFatal(selfException);
+ if (Exceptions.getFirstThrowableOfType(selfException, QueueAbortedException.class) != null) {
+ // Error was caused by the task already having failed, and this thread calling queue() to try
+ // to queue more work. The underlying cause will be much more interesting.
+ // Without this special catch, we record error = "Cannot add a task to ... whose queue has been aborted",
+ // which gets propagated instead of the more interesting child exception.
+ uninterestingSelfError = selfException;
+ } else {
+ error = selfException;
+ errorIsFromChild = false;
+ }
+ if (failureHandlingConfig.abortSecondaryQueueOnPrimaryFailure) {
+ if (log.isDebugEnabled())
+ log.debug("Secondary job queue for "+DynamicSequentialTask.this+" aborting with "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
+ secondaryQueueAborted = true;
+ }
+ if (failureHandlingConfig.cancelSecondariesOnPrimaryFailure) {
+ if (log.isDebugEnabled())
+ log.debug(DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
+ synchronized (jobTransitionLock) {
+ for (Task<?> t: secondaryJobsRemaining)
+ t.cancel(true);
+ // do this early to prevent additions; and note we notify very soon below, so not notify is help off until below
+ primaryThread = null;
+ primaryFinished = true;
+ }
+ }
+ } finally {
+ try {
+ if (log.isTraceEnabled()) log.trace("cleaning up for {}", this);
+ synchronized (jobTransitionLock) {
+ // semaphore might be nicer here (aled notes as it is this is a little hard to read)
+ primaryThread = null;
+ primaryFinished = true;
+ jobTransitionLock.notifyAll();
+ }
+ if (!isCancelled() && !Thread.currentThread().isInterrupted()) {
+ if (log.isTraceEnabled()) log.trace("waiting for secondaries for {}", this);
+ // wait on tasks sequentially so that blocking information is more interesting
+ DynamicTasks.waitForLast();
+ List<Object> result2 = secondaryJobMaster.get();
+ try {
+ if (primaryJob==null) result = (T)result2;
+ } catch (ClassCastException e) { /* ignore class cast exception; leave the result as null */ }
+ }
+ } catch (Throwable childException) {
+ Exceptions.propagateIfFatal(childException);
+ if (error==null) {
+ error = childException;
+ errorIsFromChild = true;
+ } else {
+ if (log.isDebugEnabled()) log.debug("Parent task "+this+" ignoring child error ("+childException+") in presence of our own error ("+error+")");
+ }
+ }
+ }
+ if (error!=null) {
+ handleException(error, errorIsFromChild);
+ }
+ if (uninterestingSelfError != null) {
+ handleException(uninterestingSelfError, false);
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "DstJob:"+DynamicSequentialTask.this.getId();
+ }
+
+ /** waits for this job to complete, or the given time to elapse */
+ public void join(boolean includePrimary, Duration optionalTimeout) throws InterruptedException {
+ CountdownTimer timeLeft = optionalTimeout!=null ? CountdownTimer.newInstanceStarted(optionalTimeout) : null;
+ while (true) {
+ Task<?> cs;
+ Duration remaining;
+ synchronized (jobTransitionLock) {
+ cs = currentSecondary;
+ if (finishedSecondaries) return;
+ remaining = timeLeft==null ? Duration.ONE_SECOND : timeLeft.getDurationRemaining();
+ if (!remaining.isPositive()) return;
+ if (cs==null) {
+ if (!includePrimary && secondaryJobsRemaining.isEmpty()) return;
+ // parent still running, no children though
+ Tasks.setBlockingTask(DynamicSequentialTask.this);
+ jobTransitionLock.wait(remaining.toMilliseconds());
+ Tasks.resetBlockingDetails();
+ }
+ }
+ if (cs!=null) {
+ Tasks.setBlockingTask(cs);
+ cs.blockUntilEnded(remaining);
+ Tasks.resetBlockingDetails();
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<Task<?>> getQueue() {
+ return ImmutableList.copyOf(secondaryJobsAll);
+ }
+
+ public void handleException(Throwable throwable, boolean fromChild) throws Exception {
+ Exceptions.propagateIfFatal(throwable);
+ if (fromChild && !failureHandlingConfig.failParentOnSecondaryFailure) {
+ log.debug("Parent task "+this+" swallowing child error: "+throwable);
+ return;
+ }
+ handleException(throwable);
+ }
+ public void handleException(Throwable throwable) throws Exception {
+ Exceptions.propagateIfFatal(throwable);
+ if (throwable instanceof Exception) {
+ // allow checked exceptions to be passed through
+ throw (Exception)throwable;
+ }
+ throw Exceptions.propagate(throwable);
+ }
+
+ @Override
+ public void drain(Duration optionalTimeout, boolean includePrimary, boolean throwFirstError) {
+ try {
+ dstJob.join(includePrimary, optionalTimeout);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ if (throwFirstError) {
+ if (isError())
+ getUnchecked();
+ for (Task<?> t: getQueue())
+ if (t.isError() && !TaskTags.isInessential(t))
+ t.getUnchecked();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java
new file mode 100644
index 0000000..ed46558
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.api.management.TaskFactory;
+import org.apache.brooklyn.api.management.TaskQueueingContext;
+import org.apache.brooklyn.api.management.TaskWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+/**
+ * Contains static methods which detect and use the current {@link TaskQueueingContext} to execute tasks.
+ *
+ * @since 0.6.0
+ */
+@Beta
+public class DynamicTasks {
+
+ private static final Logger log = LoggerFactory.getLogger(DynamicTasks.class);
+ private static final ThreadLocal<TaskQueueingContext> taskQueueingContext = new ThreadLocal<TaskQueueingContext>();
+
+ public static void setTaskQueueingContext(TaskQueueingContext newTaskQC) {
+ taskQueueingContext.set(newTaskQC);
+ }
+
+ public static TaskQueueingContext getThreadTaskQueuingContext() {
+ return taskQueueingContext.get();
+ }
+
+ public static TaskQueueingContext getTaskQueuingContext() {
+ TaskQueueingContext adder = getThreadTaskQueuingContext();
+ if (adder!=null) return adder;
+ Task<?> t = Tasks.current();
+ if (t instanceof TaskQueueingContext) return (TaskQueueingContext) t;
+ return null;
+ }
+
+
+ public static void removeTaskQueueingContext() {
+ taskQueueingContext.remove();
+ }
+
+ public static class TaskQueueingResult<T> implements TaskWrapper<T> {
+ private final Task<T> task;
+ private final boolean wasQueued;
+ private ExecutionContext execContext = null;
+
+ private TaskQueueingResult(TaskAdaptable<T> task, boolean wasQueued) {
+ this.task = task.asTask();
+ this.wasQueued = wasQueued;
+ }
+ @Override
+ public Task<T> asTask() {
+ return task;
+ }
+ @Override
+ public Task<T> getTask() {
+ return task;
+ }
+ /** returns true if the task was queued */
+ public boolean wasQueued() {
+ return wasQueued;
+ }
+ /** returns true if the task either is currently queued or has been submitted */
+ public boolean isQueuedOrSubmitted() {
+ return wasQueued || Tasks.isQueuedOrSubmitted(task);
+ }
+ /** specifies an execContext to use if the task has to be explicitly submitted;
+ * if omitted it will attempt to find one based on the current thread's context */
+ public TaskQueueingResult<T> executionContext(ExecutionContext execContext) {
+ this.execContext = execContext;
+ return this;
+ }
+ /** as {@link #executionContext(ExecutionContext)} but inferring from the entity */
+ public TaskQueueingResult<T> executionContext(Entity entity) {
+ this.execContext = ((EntityInternal)entity).getManagementSupport().getExecutionContext();
+ return this;
+ }
+ private boolean orSubmitInternal() {
+ if (!wasQueued()) {
+ if (isQueuedOrSubmitted()) {
+ log.warn("Redundant call to execute "+getTask()+"; skipping");
+ return false;
+ } else {
+ ExecutionContext ec = execContext;
+ if (ec==null)
+ ec = BasicExecutionContext.getCurrentExecutionContext();
+ if (ec==null)
+ throw new IllegalStateException("Cannot execute "+getTask()+" without an execution context; ensure caller is in an ExecutionContext");
+ ec.submit(getTask());
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+ /** causes the task to be submitted (asynchronously) if it hasn't already been,
+ * requiring an entity execution context (will try to find a default if not set) */
+ public TaskQueueingResult<T> orSubmitAsync() {
+ orSubmitInternal();
+ return this;
+ }
+ /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */
+ public TaskQueueingResult<T> orSubmitAsync(Entity entity) {
+ executionContext(entity);
+ return orSubmitAsync();
+ }
+ /** causes the task to be submitted *synchronously* if it hasn't already been submitted;
+ * useful in contexts such as libraries where callers may be either on a legacy call path
+ * (which assumes all commands complete immediately);
+ * requiring an entity execution context (will try to find a default if not set) */
+ public TaskQueueingResult<T> orSubmitAndBlock() {
+ if (orSubmitInternal()) task.getUnchecked();
+ return this;
+ }
+ /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */
+ public TaskQueueingResult<T> orSubmitAndBlock(Entity entity) {
+ executionContext(entity);
+ return orSubmitAndBlock();
+ }
+ /** blocks for the task to be completed
+ * <p>
+ * needed in any context where subsequent commands assume the task has completed.
+ * not needed in a context where the task is simply being built up and queued.
+ * <p>
+ * throws if there are any errors
+ */
+ public T andWaitForSuccess() {
+ return task.getUnchecked();
+ }
+ public void orCancel() {
+ if (!wasQueued()) {
+ task.cancel(false);
+ }
+ }
+ }
+
+ /**
+ * Tries to add the task to the current addition context if there is one, otherwise does nothing.
+ * <p/>
+ * Call {@link TaskQueueingResult#orSubmitAsync() orSubmitAsync()} on the returned
+ * {@link TaskQueueingResult TaskQueueingResult} to handle execution of tasks in a
+ * {@link BasicExecutionContext}.
+ */
+ public static <T> TaskQueueingResult<T> queueIfPossible(TaskAdaptable<T> task) {
+ TaskQueueingContext adder = getTaskQueuingContext();
+ boolean result = false;
+ if (adder!=null)
+ result = Tasks.tryQueueing(adder, task);
+ return new TaskQueueingResult<T>(task, result);
+ }
+
+ /** @see #queueIfPossible(TaskAdaptable) */
+ public static <T> TaskQueueingResult<T> queueIfPossible(TaskFactory<? extends TaskAdaptable<T>> task) {
+ return queueIfPossible(task.newTask());
+ }
+
+ /** adds the given task to the nearest task addition context,
+ * either set as a thread-local, or in the current task, or the submitter of the task, etc
+ * <p>
+ * throws if it cannot add */
+ public static <T> Task<T> queueInTaskHierarchy(Task<T> task) {
+ Preconditions.checkNotNull(task, "Task to queue cannot be null");
+ Preconditions.checkState(!Tasks.isQueuedOrSubmitted(task), "Task to queue must not yet be submitted: {}", task);
+
+ TaskQueueingContext adder = getTaskQueuingContext();
+ if (adder!=null) {
+ if (Tasks.tryQueueing(adder, task)) {
+ log.debug("Queued task {} at context {} (no hierarchy)", task, adder);
+ return task;
+ }
+ }
+
+ Task<?> t = Tasks.current();
+ Preconditions.checkState(t!=null || adder!=null, "No task addition context available for queueing task "+task);
+
+ while (t!=null) {
+ if (t instanceof TaskQueueingContext) {
+ if (Tasks.tryQueueing((TaskQueueingContext)t, task)) {
+ log.debug("Queued task {} at hierarchical context {}", task, t);
+ return task;
+ }
+ }
+ t = t.getSubmittedByTask();
+ }
+
+ throw new IllegalStateException("No task addition context available in current task hierarchy for adding task "+task);
+ }
+
+ /**
+ * Queues the given task.
+ * <p/>
+ * This method is only valid within a dynamic task. Use {@link #queueIfPossible(TaskAdaptable)}
+ * and {@link TaskQueueingResult#orSubmitAsync()} if the calling context is a basic task.
+ *
+ * @param task The task to queue
+ * @throws IllegalStateException if no task queueing context is available
+ * @return The queued task
+ */
+ public static <V extends TaskAdaptable<?>> V queue(V task) {
+ try {
+ Preconditions.checkNotNull(task, "Task to queue cannot be null");
+ Preconditions.checkState(!Tasks.isQueued(task), "Task to queue must not yet be queued: %s", task);
+ TaskQueueingContext adder = getTaskQueuingContext();
+ if (adder==null) {
+ throw new IllegalStateException("Task "+task+" cannot be queued here; no queueing context available");
+ }
+ adder.queue(task.asTask());
+ return task;
+ } catch (Throwable e) {
+ log.warn("Error queueing "+task+" (rethrowing): "+e);
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */
+ public static void queue(TaskAdaptable<?> task1, TaskAdaptable<?> task2, TaskAdaptable<?> ...tasks) {
+ queue(task1);
+ queue(task2);
+ for (TaskAdaptable<?> task: tasks) queue(task);
+ }
+
+ /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */
+ public static <T extends TaskAdaptable<?>> T queue(TaskFactory<T> taskFactory) {
+ return queue(taskFactory.newTask());
+ }
+
+ /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */
+ public static void queue(TaskFactory<?> task1, TaskFactory<?> task2, TaskFactory<?> ...tasks) {
+ queue(task1.newTask());
+ queue(task2.newTask());
+ for (TaskFactory<?> task: tasks) queue(task.newTask());
+ }
+
+ /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */
+ public static <T> Task<T> queue(String name, Callable<T> job) {
+ return DynamicTasks.queue(Tasks.<T>builder().name(name).body(job).build());
+ }
+
+ /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */
+ public static <T> Task<T> queue(String name, Runnable job) {
+ return DynamicTasks.queue(Tasks.<T>builder().name(name).body(job).build());
+ }
+
+ /** queues the task if needed, i.e. if it is not yet submitted (so it will run),
+ * or if it is submitted but not queued and we are in a queueing context (so it is available for informational purposes) */
+ public static <T extends TaskAdaptable<?>> T queueIfNeeded(T task) {
+ if (!Tasks.isQueued(task)) {
+ if (Tasks.isSubmitted(task) && getTaskQueuingContext()==null) {
+ // already submitted and not in a queueing context, don't try to queue
+ } else {
+ // needs submitting, put it in the queue
+ // (will throw an error if we are not a queueing context)
+ queue(task);
+ }
+ }
+ return task;
+ }
+
+ /** submits/queues the given task if needed, and gets the result (unchecked)
+ * only permitted in a queueing context (ie a DST main job) if the task is not yet submitted */
+ // things get really confusing if you try to queueInTaskHierarchy -- easy to cause deadlocks!
+ public static <T> T get(TaskAdaptable<T> t) {
+ return queueIfNeeded(t).asTask().getUnchecked();
+ }
+
+ /** As {@link #drain(Duration, boolean)} waiting forever and throwing the first error
+ * (excluding errors in inessential tasks),
+ * then returning the last task in the queue (which is guaranteed to have finished without error,
+ * if this method returns without throwing) */
+ public static Task<?> waitForLast() {
+ drain(null, true);
+ // this call to last is safe, as the above guarantees everything will have run
+ // (on errors the above will throw so we won't come here)
+ List<Task<?>> q = DynamicTasks.getTaskQueuingContext().getQueue();
+ return q.isEmpty() ? null : Iterables.getLast(q);
+ }
+
+ /** Calls {@link TaskQueueingContext#drain(Duration, boolean, boolean)} on the current task context */
+ public static TaskQueueingContext drain(Duration optionalTimeout, boolean throwFirstError) {
+ TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
+ Preconditions.checkNotNull(qc, "Cannot drain when there is no queueing context");
+ qc.drain(optionalTimeout, false, throwFirstError);
+ return qc;
+ }
+
+ /** as {@link Tasks#swallowChildrenFailures()} but requiring a {@link TaskQueueingContext}. */
+ @Beta
+ public static void swallowChildrenFailures() {
+ Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here");
+ Tasks.swallowChildrenFailures();
+ }
+
+ /** same as {@link Tasks#markInessential()}
+ * (but included here for convenience as it is often used in conjunction with {@link DynamicTasks}) */
+ public static void markInessential() {
+ Tasks.markInessential();
+ }
+
+ /** queues the task if possible, otherwise submits it asynchronously; returns the task for callers to
+ * {@link Task#getUnchecked()} or {@link Task#blockUntilEnded()} */
+ public static <T> Task<T> submit(TaskAdaptable<T> task, Entity entity) {
+ return queueIfPossible(task).orSubmitAsync(entity).asTask();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java
new file mode 100644
index 0000000..5341b21
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import org.apache.brooklyn.api.management.Task;
+
+public interface ExecutionListener {
+
+ /** invoked when a task completes:
+ * {@link Task#getEndTimeUtc()} and {@link Task#isDone()} are guaranteed to be set,
+ * and {@link Task#get()} should return immediately for most Task implementations
+ * (care has been taken to avoid potential deadlocks here, waiting for a result!) */
+ public void onTaskDone(Task<?> task);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java
new file mode 100644
index 0000000..be677e3
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import groovy.lang.Closure;
+
+import java.util.concurrent.Callable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+
+public class ExecutionUtils {
+ /**
+ * Attempts to run/call the given object, with the given arguments if possible, preserving the return value if there is one (null otherwise);
+ * throws exception if the callable is a non-null object which cannot be invoked (not a callable or runnable)
+ * @deprecated since 0.7.0 ; this super-loose typing should be avoided; if it is needed, let's move it to one of the Groovy compatibility classes
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static Object invoke(Object callable, Object ...args) {
+ if (callable instanceof Closure) return ((Closure<?>)callable).call(args);
+ if (callable instanceof Callable) {
+ try {
+ return ((Callable<?>)callable).call();
+ } catch (Throwable t) {
+ throw Throwables.propagate(t);
+ }
+ }
+ if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
+ if (callable instanceof Function && args.length == 1) { return ((Function)callable).apply(args[0]); }
+ if (callable==null) return null;
+ throw new IllegalArgumentException("Cannot invoke unexpected object "+callable+" of type "+callable.getClass()+", with "+args.length+" args");
+ }
+}