You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ha...@apache.org on 2015/08/17 21:18:05 UTC
[34/42] incubator-brooklyn git commit: [BROOKLYN-162] Refactor
package in ./core/util
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/BasicTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/BasicTask.java b/core/src/main/java/brooklyn/util/task/BasicTask.java
deleted file mode 100644
index 57b2bb2..0000000
--- a/core/src/main/java/brooklyn/util/task/BasicTask.java
+++ /dev/null
@@ -1,892 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import static brooklyn.util.JavaGroovyEquivalents.asString;
-import static brooklyn.util.JavaGroovyEquivalents.elvisString;
-import groovy.lang.Closure;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.management.LockInfo;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.brooklyn.api.management.HasTaskChildren;
-import org.apache.brooklyn.api.management.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.util.GroovyJavaMethods;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.guava.Maybe;
-import brooklyn.util.text.Identifiers;
-import brooklyn.util.text.Strings;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Callables;
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * The basic concrete implementation of a {@link Task} to be executed.
- *
- * A {@link Task} is a wrapper for an executable unit, such as a {@link Closure} or a {@link Runnable} or
- * {@link Callable} and will run in its own {@link Thread}.
- * <p>
- * The task can be given an optional displayName and description in its constructor (as named
- * arguments in the first {@link Map} parameter). It is guaranteed to have {@link Object#notify()} called
- * once whenever the task starts running and once again when the task is about to complete. Due to
- * the way executors work it is ugly to guarantee notification <em>after</em> completion, so instead we
- * notify just before then expect the user to call {@link #get()} - which will throw errors if the underlying job
- * did so - or {@link #blockUntilEnded()} which will not throw errors.
- */
-public class BasicTask<T> implements TaskInternal<T> {
- private static final Logger log = LoggerFactory.getLogger(BasicTask.class);
-
- private String id = Identifiers.makeRandomId(8);
- protected Callable<T> job;
- public final String displayName;
- public final String description;
-
- protected final Set<Object> tags = Sets.newConcurrentHashSet();
- // for debugging, to record where tasks were created
-// { tags.add(new Throwable("Creation stack trace")); }
-
- protected Task<?> proxyTargetTask = null;
-
- protected String blockingDetails = null;
- protected Task<?> blockingTask = null;
- Object extraStatusText = null;
-
- /** listeners attached at task level; these are stored here, but run on the underlying ListenableFuture */
- protected final ExecutionList listeners = new ExecutionList();
-
- /**
- * Constructor needed to prevent confusion in groovy stubs when looking for default constructor,
- *
- * The generics on {@link Closure} break it if that is first constructor.
- */
- protected BasicTask() { this(Collections.emptyMap()); }
- protected BasicTask(Map<?,?> flags) { this(flags, (Callable<T>) null); }
-
- public BasicTask(Callable<T> job) { this(Collections.emptyMap(), job); }
-
- public BasicTask(Map<?,?> flags, Callable<T> job) {
- this.job = job;
-
- if (flags.containsKey("tag")) tags.add(flags.remove("tag"));
- Object ftags = flags.remove("tags");
- if (ftags!=null) {
- if (ftags instanceof Iterable) Iterables.addAll(tags, (Iterable<?>)ftags);
- else {
- log.info("deprecated use of non-collection argument for 'tags' ("+ftags+") in "+this, new Throwable("trace of discouraged use of non-colleciton tags argument"));
- tags.add(ftags);
- }
- }
-
- description = elvisString(flags.remove("description"), "");
- String d = asString(flags.remove("displayName"));
- displayName = (d==null ? "" : d);
- }
-
- public BasicTask(Runnable job) { this(GroovyJavaMethods.<T>callableFromRunnable(job)); }
- public BasicTask(Map<?,?> flags, Runnable job) { this(flags, GroovyJavaMethods.<T>callableFromRunnable(job)); }
- public BasicTask(Closure<T> job) { this(GroovyJavaMethods.callableFromClosure(job)); }
- public BasicTask(Map<?,?> flags, Closure<T> job) { this(flags, GroovyJavaMethods.callableFromClosure(job)); }
-
- @Override
- public String getId() {
- return id;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(id);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof Task)
- return ((Task<?>)obj).getId().equals(getId());
- return false;
- }
-
- @Override
- public String toString() {
- // give display name plus id, or job and tags plus id; some jobs have been extended to include nice tostrings
- return "Task["+
- (Strings.isNonEmpty(displayName) ?
- displayName :
- (job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) +
- ":"+getId()+"]";
- }
-
- @Override
- public Task<T> asTask() {
- return this;
- }
-
- // housekeeping --------------------
-
- /*
- * These flags are set by BasicExecutionManager.submit.
- *
- * Order is guaranteed to be as shown below, in order of #. Within each # line it is currently in the order specified by commas but this is not guaranteed.
- * (The spaces between the # section indicate longer delays / logical separation ... it should be clear!)
- *
- * # submitter, submit time set, tags and other submit-time fields set
- *
- * # thread set, ThreadLocal getCurrentTask set
- * # start time set, isBegun is true
- * # task end callback run, if supplied
- *
- * # task runs
- *
- * # task end callback run, if supplied
- * # end time set
- * # thread cleared, ThreadLocal getCurrentTask set
- * # Task.notifyAll()
- * # Task.get() (result.get()) available, Task.isDone is true
- *
- * Few _consumers_ should care, but internally we rely on this so that, for example, status is displayed correctly.
- * Tests should catch most things, but be careful if you change any of the above semantics.
- */
-
- protected long queuedTimeUtc = -1;
- protected long submitTimeUtc = -1;
- protected long startTimeUtc = -1;
- protected long endTimeUtc = -1;
- protected Maybe<Task<?>> submittedByTask;
-
- protected volatile Thread thread = null;
- private volatile boolean cancelled = false;
- /** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */
- protected volatile Future<T> internalFuture = null;
-
- @Override
- public synchronized void initInternalFuture(ListenableFuture<T> result) {
- if (this.internalFuture != null)
- throw new IllegalStateException("task "+this+" is being given a result twice");
- this.internalFuture = result;
- notifyAll();
- }
-
- // metadata accessors ------------
-
- @Override
- public Set<Object> getTags() { return Collections.unmodifiableSet(new LinkedHashSet<Object>(tags)); }
-
- /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here;
- * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */
- @Override
- public long getQueuedTimeUtc() { return queuedTimeUtc; }
-
- @Override
- public long getSubmitTimeUtc() { return submitTimeUtc; }
-
- @Override
- public long getStartTimeUtc() { return startTimeUtc; }
-
- @Override
- public long getEndTimeUtc() { return endTimeUtc; }
-
- @Override
- public Future<T> getInternalFuture() { return internalFuture; }
-
- @Override
- public Task<?> getSubmittedByTask() {
- if (submittedByTask==null) return null;
- return submittedByTask.orNull();
- }
-
- /** the thread where the task is running, if it is running */
- @Override
- public Thread getThread() { return thread; }
-
- // basic fields --------------------
-
- @Override
- public boolean isQueued() {
- return (queuedTimeUtc >= 0);
- }
-
- @Override
- public boolean isQueuedOrSubmitted() {
- return isQueued() || isSubmitted();
- }
-
- @Override
- public boolean isQueuedAndNotSubmitted() {
- return isQueued() && (!isSubmitted());
- }
-
- @Override
- public boolean isSubmitted() {
- return submitTimeUtc >= 0;
- }
-
- @Override
- public boolean isBegun() {
- return startTimeUtc >= 0;
- }
-
- /** marks the task as queued for execution */
- @Override
- public void markQueued() {
- if (queuedTimeUtc<0)
- queuedTimeUtc = System.currentTimeMillis();
- }
-
- @Override
- public final synchronized boolean cancel() { return cancel(true); }
-
- /** doesn't resume it, just means if something was cancelled but not submitted it could now be submitted;
- * probably going to be removed and perhaps some mechanism for running again made available
- * @since 0.7.0 */
- @Beta
- public synchronized boolean uncancel() {
- boolean wasCancelled = cancelled;
- cancelled = false;
- return wasCancelled;
- }
-
- @Override
- public synchronized boolean cancel(boolean mayInterruptIfRunning) {
- if (isDone()) return false;
- boolean cancel = true;
- cancelled = true;
- if (internalFuture!=null) {
- cancel = internalFuture.cancel(mayInterruptIfRunning);
- }
- notifyAll();
- return cancel;
- }
-
- @Override
- public boolean isCancelled() {
- return cancelled || (internalFuture!=null && internalFuture.isCancelled());
- }
-
- @Override
- public boolean isDone() {
- // if endTime is set, result might not be completed yet, but it will be set very soon
- // (the two values are set close in time, result right after the endTime;
- // but callback hooks might not see the result yet)
- return cancelled || (internalFuture!=null && internalFuture.isDone()) || endTimeUtc>0;
- }
-
- /**
- * Returns true if the task has had an error.
- *
- * Only true if calling {@link #get()} will throw an exception when it completes (including cancel).
- * Implementations may set this true before completion if they have that insight, or
- * (the default) they may compute it lazily after completion (returning false before completion).
- */
- @Override
- public boolean isError() {
- if (!isDone()) return false;
- if (isCancelled()) return true;
- try {
- get();
- return false;
- } catch (Throwable t) {
- return true;
- }
- }
-
- // future value --------------------
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- try {
- if (!isDone())
- Tasks.setBlockingTask(this);
- blockUntilStarted();
- return internalFuture.get();
- } finally {
- Tasks.resetBlockingTask();
- }
- }
-
- @Override
- public T getUnchecked() {
- try {
- return get();
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
- }
-
- @Override
- public synchronized void blockUntilStarted() {
- blockUntilStarted(null);
- }
-
- @Override
- public synchronized boolean blockUntilStarted(Duration timeout) {
- Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp();
- while (true) {
- if (cancelled) throw new CancellationException();
- if (internalFuture==null)
- try {
- if (timeout==null) {
- wait();
- } else {
- long remaining = endTime - System.currentTimeMillis();
- if (remaining>0)
- wait(remaining);
- else
- return false;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- Throwables.propagate(e);
- }
- if (internalFuture!=null) return true;
- }
- }
-
- @Override
- public void blockUntilEnded() {
- blockUntilEnded(null);
- }
-
- @Override
- public boolean blockUntilEnded(Duration timeout) {
- Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp();
- try {
- boolean started = blockUntilStarted(timeout);
- if (!started) return false;
- if (timeout==null) {
- internalFuture.get();
- } else {
- long remaining = endTime - System.currentTimeMillis();
- if (remaining>0)
- internalFuture.get(remaining, TimeUnit.MILLISECONDS);
- }
- return isDone();
- } catch (Throwable t) {
- Exceptions.propagateIfFatal(t);
- if (!(t instanceof TimeoutException) && log.isDebugEnabled())
- log.debug("call from "+Thread.currentThread()+", blocking until '"+this+"' finishes, ended with error: "+t);
- return isDone();
- }
- }
-
- @Override
- public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- return get(new Duration(timeout, unit));
- }
-
- @Override
- public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException {
- long start = System.currentTimeMillis();
- Long end = duration==null ? null : start + duration.toMillisecondsRoundingUp();
- while (end==null || end > System.currentTimeMillis()) {
- if (cancelled) throw new CancellationException();
- if (internalFuture == null) {
- synchronized (this) {
- long remaining = end - System.currentTimeMillis();
- if (internalFuture==null && remaining>0)
- wait(remaining);
- }
- }
- if (internalFuture != null) break;
- }
- Long remaining = end==null ? null : end - System.currentTimeMillis();
- if (isDone()) {
- return internalFuture.get(1, TimeUnit.MILLISECONDS);
- } else if (remaining == null) {
- return internalFuture.get();
- } else if (remaining > 0) {
- return internalFuture.get(remaining, TimeUnit.MILLISECONDS);
- } else {
- throw new TimeoutException();
- }
- }
-
- @Override
- public T getUnchecked(Duration duration) {
- try {
- return get(duration);
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
- }
-
- // ------------------ status ---------------------------
-
- /**
- * Returns a brief status string
- *
- * Plain-text format. Reported status if there is one, otherwise state which will be one of:
- * <ul>
- * <li>Not submitted
- * <li>Submitted for execution
- * <li>Ended by error
- * <li>Ended by cancellation
- * <li>Ended normally
- * <li>Running
- * <li>Waiting
- * </ul>
- */
- @Override
- public String getStatusSummary() {
- return getStatusString(0);
- }
-
- /**
- * Returns detailed status, suitable for a hover
- *
- * Plain-text format, with new-lines (and sometimes extra info) if multiline enabled.
- */
- @Override
- public String getStatusDetail(boolean multiline) {
- return getStatusString(multiline?2:1);
- }
-
- /**
- * This method is useful for callers to see the status of a task.
- *
- * Also for developers to see best practices for examining status fields etc
- *
- * @param verbosity 0 = brief, 1 = one-line with some detail, 2 = lots of detail
- */
- protected String getStatusString(int verbosity) {
-// Thread t = getThread();
- String rv;
- if (submitTimeUtc <= 0) rv = "Not submitted";
- else if (!isCancelled() && startTimeUtc <= 0) {
- rv = "Submitted for execution";
- if (verbosity>0) {
- long elapsed = System.currentTimeMillis() - submitTimeUtc;
- rv += " "+Time.makeTimeStringRoundedSince(elapsed)+" ago";
- }
- if (verbosity >= 2 && getExtraStatusText()!=null) {
- rv += "\n\n"+getExtraStatusText();
- }
- } else if (isDone()) {
- long elapsed = endTimeUtc - submitTimeUtc;
- String duration = Time.makeTimeStringRounded(elapsed);
- if (isCancelled()) {
- rv = "Cancelled";
- if (verbosity >= 1) rv+=" after "+duration;
-
- if (verbosity >= 2 && getExtraStatusText()!=null) {
- rv += "\n\n"+getExtraStatusText();
- }
- } else if (isError()) {
- rv = "Failed";
- if (verbosity >= 1) {
- rv += " after "+duration;
- Throwable error = Tasks.getError(this);
-
- if (verbosity >= 2 && getExtraStatusText()!=null) {
- rv += "\n\n"+getExtraStatusText();
- }
-
- //remove outer ExecException which is reported by the get(), we want the exception the task threw
- while (error instanceof ExecutionException) error = error.getCause();
- String errorMessage = Exceptions.collapseText(error);
-
- if (verbosity == 1) rv += ": "+abbreviate(errorMessage);
- if (verbosity >= 2) {
- rv += ": "+errorMessage;
- StringWriter sw = new StringWriter();
- ((Throwable)error).printStackTrace(new PrintWriter(sw));
- rv += "\n\n"+sw.getBuffer();
- }
- }
- } else {
- rv = "Completed";
- if (verbosity>=1) {
- if (verbosity==1) {
- try {
- Object v = get();
- rv += ", " +(v==null ? "no return value (null)" : "result: "+abbreviate(v.toString()));
- } catch (Exception e) {
- rv += ", but error accessing result ["+e+"]"; //shouldn't happen
- }
- } else {
- rv += " after "+duration;
- try {
- Object v = get();
- rv += "\n\n" + (v==null ? "No return value (null)" : "Result: "+v);
- } catch (Exception e) {
- rv += " at first\n" +
- "Error accessing result ["+e+"]"; //shouldn't happen
- }
- if (verbosity >= 2 && getExtraStatusText()!=null) {
- rv += "\n\n"+getExtraStatusText();
- }
- }
- }
- }
- } else {
- rv = getActiveTaskStatusString(verbosity);
- }
- return rv;
- }
-
- private static String abbreviate(String s) {
- s = Strings.getFirstLine(s);
- if (s.length()>255) s = s.substring(0, 252)+ "...";
- return s;
- }
-
- protected String getActiveTaskStatusString(int verbosity) {
- String rv = "";
- Thread t = getThread();
-
- // Normally, it's not possible for thread==null as we were started and not ended
-
- // However, there is a race where the task starts sand completes between the calls to getThread()
- // at the start of the method and this call to getThread(), so both return null even though
- // the intermediate checks returned started==true isDone()==false.
- if (t == null) {
- if (isDone()) {
- return getStatusString(verbosity);
- } else {
- //should only happen for repeating task which is not active
- return "Sleeping";
- }
- }
-
- ThreadInfo ti = ManagementFactory.getThreadMXBean().getThreadInfo(t.getId(), (verbosity<=0 ? 0 : verbosity==1 ? 1 : Integer.MAX_VALUE));
- if (getThread()==null)
- //thread might have moved on to a new task; if so, recompute (it should now say "done")
- return getStatusString(verbosity);
-
- if (verbosity >= 1 && Strings.isNonBlank(blockingDetails)) {
- if (verbosity==1)
- // short status string will just show blocking details
- return blockingDetails;
- //otherwise show the blocking details, then a new line, then additional information
- rv = blockingDetails + "\n\n";
- }
-
- if (verbosity >= 1 && blockingTask!=null) {
- if (verbosity==1)
- // short status string will just show blocking details
- return "Waiting on "+blockingTask;
- //otherwise show the blocking details, then a new line, then additional information
- rv = "Waiting on "+blockingTask + "\n\n";
- }
-
- if (verbosity>=2) {
- if (getExtraStatusText()!=null) {
- rv += getExtraStatusText()+"\n\n";
- }
-
- rv += ""+toString()+"\n";
- if (submittedByTask!=null) {
- rv += "Submitted by "+submittedByTask+"\n";
- }
-
- if (this instanceof HasTaskChildren) {
- // list children tasks for compound tasks
- try {
- Iterable<Task<?>> childrenTasks = ((HasTaskChildren)this).getChildren();
- if (childrenTasks.iterator().hasNext()) {
- rv += "Children:\n";
- for (Task<?> child: childrenTasks) {
- rv += " "+child+": "+child.getStatusDetail(false)+"\n";
- }
- }
- } catch (ConcurrentModificationException exc) {
- rv += " (children not available - currently being modified)\n";
- }
- }
- rv += "\n";
- }
-
- LockInfo lock = ti.getLockInfo();
- rv += "In progress";
- if (verbosity>=1) {
- if (lock==null && ti.getThreadState()==Thread.State.RUNNABLE) {
- //not blocked
- if (ti.isSuspended()) {
- // when does this happen?
- rv += ", thread suspended";
- } else {
- if (verbosity >= 2) rv += " ("+ti.getThreadState()+")";
- }
- } else {
- rv +=", thread waiting ";
- if (ti.getThreadState() == Thread.State.BLOCKED) {
- rv += "(mutex) on "+lookup(lock);
- //TODO could say who holds it
- } else if (ti.getThreadState() == Thread.State.WAITING) {
- rv += "(notify) on "+lookup(lock);
- } else if (ti.getThreadState() == Thread.State.TIMED_WAITING) {
- rv += "(timed) on "+lookup(lock);
- } else {
- rv = "("+ti.getThreadState()+") on "+lookup(lock);
- }
- }
- }
- if (verbosity>=2) {
- StackTraceElement[] st = ti.getStackTrace();
- st = brooklyn.util.javalang.StackTraceSimplifier.cleanStackTrace(st);
- if (st!=null && st.length>0)
- rv += "\n" +"At: "+st[0];
- for (int ii=1; ii<st.length; ii++) {
- rv += "\n" +" "+st[ii];
- }
- }
- return rv;
- }
-
- protected String lookup(LockInfo info) {
- return info!=null ? ""+info : "unknown (sleep)";
- }
-
- @Override
- public String getDisplayName() {
- return displayName;
- }
-
- @Override
- public String getDescription() {
- return description;
- }
-
-
- /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait,
- * and typically cleared immediately afterwards; referenced by management api to inspect a task
- * which is blocking
- */
- @Override
- public String setBlockingDetails(String blockingDetails) {
- String old = this.blockingDetails;
- this.blockingDetails = blockingDetails;
- return old;
- }
-
- @Override
- public Task<?> setBlockingTask(Task<?> blockingTask) {
- Task<?> old = this.blockingTask;
- this.blockingTask = blockingTask;
- return old;
- }
-
- @Override
- public void resetBlockingDetails() {
- this.blockingDetails = null;
- }
-
- @Override
- public void resetBlockingTask() {
- this.blockingTask = null;
- }
-
- /** returns a textual message giving details while the task is blocked */
- @Override
- public String getBlockingDetails() {
- return blockingDetails;
- }
-
- /** returns a task that this task is blocked on */
- @Override
- public Task<?> getBlockingTask() {
- return blockingTask;
- }
-
- @Override
- public void setExtraStatusText(Object extraStatus) {
- this.extraStatusText = extraStatus;
- }
-
- @Override
- public Object getExtraStatusText() {
- return extraStatusText;
- }
-
- // ---- add a way to warn if task is not run
-
- public interface TaskFinalizer {
- public void onTaskFinalization(Task<?> t);
- }
-
- public static final TaskFinalizer WARN_IF_NOT_RUN = new TaskFinalizer() {
- @Override
- public void onTaskFinalization(Task<?> t) {
- if (!Tasks.isAncestorCancelled(t) && !t.isSubmitted()) {
- log.warn(t+" was never submitted; did the code create it and forget to run it? ('cancel' the task to suppress this message)");
- log.debug("Detail of unsubmitted task "+t+":\n"+t.getStatusDetail(true));
- return;
- }
- if (!t.isDone()) {
- // shouldn't happen
- // TODO But does happen if management context was terminated (e.g. running test suite).
- // Should check if Execution Manager is running, and only log if it was not terminated?
- log.warn("Task "+t+" is being finalized before completion");
- return;
- }
- }
- };
-
- public static final TaskFinalizer NO_OP = new TaskFinalizer() {
- @Override
- public void onTaskFinalization(Task<?> t) {
- }
- };
-
- public void ignoreIfNotRun() {
- setFinalizer(NO_OP);
- }
-
- public void setFinalizer(TaskFinalizer f) {
- TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false);
- if (finalizer!=null && finalizer!=f)
- throw new IllegalStateException("Cannot apply multiple finalizers");
- if (isDone())
- throw new IllegalStateException("Finalizer cannot be set on task "+this+" after it is finished");
- tags.add(f);
- }
-
- @Override
- protected void finalize() throws Throwable {
- TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false);
- if (finalizer==null) finalizer = WARN_IF_NOT_RUN;
- finalizer.onTaskFinalization(this);
- }
-
- public static class SubmissionErrorCatchingExecutor implements Executor {
- final Executor target;
- public SubmissionErrorCatchingExecutor(Executor target) {
- this.target = target;
- }
- @Override
- public void execute(Runnable command) {
- if (isShutdown()) {
- log.debug("Skipping execution of task callback hook "+command+" because executor is shutdown.");
- return;
- }
- try {
- target.execute(command);
- } catch (Exception e) {
- if (isShutdown()) {
- log.debug("Ignoring failed execution of task callback hook "+command+" because executor is shutdown.");
- } else {
- log.warn("Execution of task callback hook "+command+" failed: "+e, e);
- }
- }
- }
- protected boolean isShutdown() {
- return target instanceof ExecutorService && ((ExecutorService)target).isShutdown();
- }
- }
-
- @Override
- public void addListener(Runnable listener, Executor executor) {
- listeners.add(listener, new SubmissionErrorCatchingExecutor(executor));
- }
-
- @Override
- public void runListeners() {
- listeners.execute();
- }
-
- @Override
- public void setEndTimeUtc(long val) {
- endTimeUtc = val;
- }
-
- @Override
- public void setThread(Thread thread) {
- this.thread = thread;
- }
-
- @Override
- public Callable<T> getJob() {
- return job;
- }
-
- @Override
- public void setJob(Callable<T> job) {
- this.job = job;
- }
-
- @Override
- public ExecutionList getListeners() {
- return listeners;
- }
-
- @Override
- public void setSubmitTimeUtc(long val) {
- submitTimeUtc = val;
- }
-
- private static <T> Task<T> newGoneTaskFor(Task<?> task) {
- Task<T> t = Tasks.<T>builder().dynamic(false).name(task.getDisplayName())
- .description("Details of the original task "+task+" have been forgotten.")
- .body(Callables.returning((T)null)).build();
- ((BasicTask<T>)t).ignoreIfNotRun();
- return t;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- public void setSubmittedByTask(Task<?> task) {
- submittedByTask = (Maybe)Maybe.softThen((Task)task, (Maybe)Maybe.of(BasicTask.newGoneTaskFor(task)));
- }
-
- @Override
- public Set<Object> getMutableTags() {
- return tags;
- }
-
- @Override
- public void setStartTimeUtc(long val) {
- startTimeUtc = val;
- }
-
- @Override
- public void applyTagModifier(Function<Set<Object>,Void> modifier) {
- modifier.apply(tags);
- }
-
- @Override
- public Task<?> getProxyTarget() {
- return proxyTargetTask;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/CanSetName.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/CanSetName.java b/core/src/main/java/brooklyn/util/task/CanSetName.java
deleted file mode 100644
index 760c99e..0000000
--- a/core/src/main/java/brooklyn/util/task/CanSetName.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-public interface CanSetName {
-
- void setName(String name);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/CompoundTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/CompoundTask.java b/core/src/main/java/brooklyn/util/task/CompoundTask.java
deleted file mode 100644
index e33120c..0000000
--- a/core/src/main/java/brooklyn/util/task/CompoundTask.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import groovy.lang.Closure;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.brooklyn.api.management.HasTaskChildren;
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.BrooklynTaskTags;
-import brooklyn.util.collections.MutableMap;
-
-
-/**
- * A {@link Task} that is comprised of other units of work: possibly a heterogeneous mix of {@link Task},
- * {@link Runnable}, {@link Callable} and {@link Closure} instances.
- *
- * This class holds the collection of child tasks, but subclasses have the responsibility of executing them in a
- * sensible manner by implementing the abstract {@link #runJobs} method.
- */
-public abstract class CompoundTask<T> extends BasicTask<List<T>> implements HasTaskChildren {
-
- @SuppressWarnings("unused")
- private static final Logger log = LoggerFactory.getLogger(CompoundTask.class);
-
- protected final List<Task<? extends T>> children;
- protected final List<Object> result;
-
- /**
- * Constructs a new compound task containing the specified units of work.
- *
- * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided.
- * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types
- */
- public CompoundTask(Object... jobs) {
- this( Arrays.asList(jobs) );
- }
-
- /**
- * Constructs a new compound task containing the specified units of work.
- *
- * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided.
- * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types
- */
- public CompoundTask(Collection<?> jobs) {
- this(MutableMap.of("tag", "compound"), jobs);
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public CompoundTask(Map<String,?> flags, Collection<?> jobs) {
- super(flags);
- super.job = new Callable<List<T>>() {
- @Override public List<T> call() throws Exception {
- return runJobs();
- }
- };
-
- this.result = new ArrayList<Object>(jobs.size());
- this.children = new ArrayList<Task<? extends T>>(jobs.size());
- for (Object job : jobs) {
- Task subtask;
- if (job instanceof TaskAdaptable) { subtask = ((TaskAdaptable)job).asTask(); }
- else if (job instanceof Closure) { subtask = new BasicTask<T>((Closure) job); }
- else if (job instanceof Callable) { subtask = new BasicTask<T>((Callable) job); }
- else if (job instanceof Runnable) { subtask = new BasicTask<T>((Runnable) job); }
-
- else throw new IllegalArgumentException("Invalid child "+(job == null ? null : job.getClass() + " ("+job+")")+
- " passed to compound task; must be Runnable, Callable, Closure or Task");
-
- BrooklynTaskTags.addTagDynamically(subtask, ManagementContextInternal.SUB_TASK_TAG);
- children.add(subtask);
- }
-
- for (Task<?> t: getChildren()) {
- ((TaskInternal<?>)t).markQueued();
- }
- }
-
- /** return value needs to be specified by subclass; subclass should also setBlockingDetails
- * @throws ExecutionException
- * @throws InterruptedException */
- protected abstract List<T> runJobs() throws InterruptedException, ExecutionException;
-
- protected void submitIfNecessary(TaskAdaptable<?> task) {
- if (!task.asTask().isSubmitted()) {
- if (BasicExecutionContext.getCurrentExecutionContext() == null) {
- throw new IllegalStateException("Compound task ("+task+") launched from "+this+" missing required execution context");
- } else {
- BasicExecutionContext.getCurrentExecutionContext().submit(task);
- }
- }
- }
-
- public List<Task<? extends T>> getChildrenTyped() {
- return children;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public List<Task<?>> getChildren() {
- return (List) getChildrenTyped();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/DeferredSupplier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/DeferredSupplier.java b/core/src/main/java/brooklyn/util/task/DeferredSupplier.java
deleted file mode 100644
index d82b3fb..0000000
--- a/core/src/main/java/brooklyn/util/task/DeferredSupplier.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import com.google.common.base.Supplier;
-
-/**
- * A class that supplies objects of a single type. When used as a ConfigKey value,
- * the evaluation is deferred until getConfig() is called. The returned value will then
- * be coerced to the correct type.
- *
- * Subsequent calls to getConfig will result in further calls to deferredProvider.get(),
- * rather than reusing the result. If you want to reuse the result, consider instead
- * using a Future.
- *
- * Note that this functionality replaces the ues of Closure in brooklyn 0.4.0, which
- * served the same purpose.
- */
-public interface DeferredSupplier<T> extends Supplier<T> {
- @Override
- T get();
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
deleted file mode 100644
index 455a889..0000000
--- a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java
+++ /dev/null
@@ -1,480 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import groovy.lang.Closure;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.brooklyn.api.management.HasTaskChildren;
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskQueueingContext;
-import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.BrooklynTaskTags;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.time.CountdownTimer;
-import brooklyn.util.time.Duration;
-
-import com.google.common.annotations.Beta;
-import com.google.common.collect.ImmutableList;
-
-/** Represents a task whose run() method can create other tasks
- * which are run sequentially, but that sequence runs in parallel to this task
- * <p>
- * There is an optional primary job run with this task, along with multiple secondary children.
- * If any secondary task fails (assuming it isn't {@link Tasks#markInessential()} then by default
- * subsequent tasks are not submitted and the primary task fails (but no tasks are cancelled or interrupted).
- * You can change the behavior of this task with fields in {@link FailureHandlingConfig},
- * or the convenience {@link TaskQueueingContext#swallowChildrenFailures()}
- * (and {@link DynamicTasks#swallowChildrenFailures()} if you are inside the task).
- * <p>
- * This synchronizes on secondary tasks when submitting them, in case they may be manually submitted
- * and the submitter wishes to ensure it is only submitted once.
- * <p>
- * Improvements which would be nice to have:
- * <li> unqueued tasks not visible in api; would like that
- * <li> uses an extra thread (submitted as background task) to monitor the secondary jobs; would be nice to remove this,
- * and rely on {@link BasicExecutionManager} to run the jobs sequentially (combined with fix to item above)
- * <li> would be nice to have cancel, resume, and possibly skipQueue available as operations (ideally in the REST API and GUI)
- **/
-public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChildren, TaskQueueingContext {
-
- private static final Logger log = LoggerFactory.getLogger(CompoundTask.class);
-
- protected final Queue<Task<?>> secondaryJobsAll = new ConcurrentLinkedQueue<Task<?>>();
- protected final Queue<Task<?>> secondaryJobsRemaining = new ConcurrentLinkedQueue<Task<?>>();
- protected final Object jobTransitionLock = new Object();
- protected volatile boolean primaryStarted = false;
- protected volatile boolean primaryFinished = false;
- protected volatile boolean secondaryQueueAborted = false;
- protected Thread primaryThread;
- protected DstJob dstJob;
- protected FailureHandlingConfig failureHandlingConfig = FailureHandlingConfig.DEFAULT;
-
- // default values for how to handle the various failures
- @Beta
- public static class FailureHandlingConfig {
- /** secondary queue runs independently of primary task (submitting and blocking on each secondary task in order),
- * but can set it up not to submit any more tasks if the primary fails */
- public final boolean abortSecondaryQueueOnPrimaryFailure;
- /** as {@link #abortSecondaryQueueOnPrimaryFailure} but controls cancelling of secondary queue*/
- public final boolean cancelSecondariesOnPrimaryFailure;
- /** secondary queue can continue submitting+blocking tasks even if a secondary task fails (unusual;
- * typically handled by {@link TaskTags#markInessential(Task)} on the secondary tasks, in which case
- * the secondary queue is never aborted */
- public final boolean abortSecondaryQueueOnSecondaryFailure;
- /** unsubmitted secondary tasks (ie those further in the queue) can be cancelled if a secondary task fails */
- public final boolean cancelSecondariesOnSecondaryFailure;
- /** whether to issue cancel against primary task if a secondary task fails */
- public final boolean cancelPrimaryOnSecondaryFailure;
- /** whether to fail this task if a secondary task fails */
- public final boolean failParentOnSecondaryFailure;
-
- @Beta
- public FailureHandlingConfig(
- boolean abortSecondaryQueueOnPrimaryFailure, boolean cancelSecondariesOnPrimaryFailure,
- boolean abortSecondaryQueueOnSecondaryFailure, boolean cancelSecondariesOnSecondaryFailure,
- boolean cancelPrimaryOnSecondaryFailure, boolean failParentOnSecondaryFailure) {
- this.abortSecondaryQueueOnPrimaryFailure = abortSecondaryQueueOnPrimaryFailure;
- this.cancelSecondariesOnPrimaryFailure = cancelSecondariesOnPrimaryFailure;
- this.abortSecondaryQueueOnSecondaryFailure = abortSecondaryQueueOnSecondaryFailure;
- this.cancelSecondariesOnSecondaryFailure = cancelSecondariesOnSecondaryFailure;
- this.cancelPrimaryOnSecondaryFailure = cancelPrimaryOnSecondaryFailure;
- this.failParentOnSecondaryFailure = failParentOnSecondaryFailure;
- }
-
- public static final FailureHandlingConfig DEFAULT = new FailureHandlingConfig(false, false, true, false, false, true);
- public static final FailureHandlingConfig SWALLOWING_CHILDREN_FAILURES = new FailureHandlingConfig(false, false, false, false, false, false);
- }
-
- public static class QueueAbortedException extends IllegalStateException {
- private static final long serialVersionUID = -7569362887826818524L;
-
- public QueueAbortedException(String msg) {
- super(msg);
- }
- public QueueAbortedException(String msg, Throwable cause) {
- super(msg, cause);
- }
- }
-
- /**
- * Constructs a new compound task containing the specified units of work.
- *
- * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided.
- * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types
- */
- public DynamicSequentialTask() {
- this(null);
- }
-
- public DynamicSequentialTask(Callable<T> mainJob) {
- this(MutableMap.of("tag", "compound"), mainJob);
- }
-
- public DynamicSequentialTask(Map<?,?> flags, Callable<T> mainJob) {
- super(flags);
- this.job = dstJob = new DstJob(mainJob);
- }
-
- @Override
- public void queue(Task<?> t) {
- synchronized (jobTransitionLock) {
- if (primaryFinished)
- throw new IllegalStateException("Cannot add a task to "+this+" which is already finished (trying to add "+t+")");
- if (secondaryQueueAborted)
- throw new QueueAbortedException("Cannot add a task to "+this+" whose queue has been aborted (trying to add "+t+")");
- secondaryJobsAll.add(t);
- secondaryJobsRemaining.add(t);
- BrooklynTaskTags.addTagsDynamically(t, ManagementContextInternal.SUB_TASK_TAG);
- ((TaskInternal<?>)t).markQueued();
- jobTransitionLock.notifyAll();
- }
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return cancel(mayInterruptIfRunning, mayInterruptIfRunning, true);
- }
- public boolean cancel(boolean mayInterruptTask, boolean interruptPrimaryThread, boolean alsoCancelChildren) {
- if (isDone()) return false;
- if (log.isTraceEnabled()) log.trace("cancelling {}", this);
- boolean cancel = super.cancel(mayInterruptTask);
- if (alsoCancelChildren) {
- for (Task<?> t: secondaryJobsAll)
- cancel |= t.cancel(mayInterruptTask);
- }
- synchronized (jobTransitionLock) {
- if (primaryThread!=null) {
- if (interruptPrimaryThread) {
- if (log.isTraceEnabled()) log.trace("cancelling {} - interrupting", this);
- primaryThread.interrupt();
- }
- cancel = true;
- }
- }
- return cancel;
- }
-
- @Override
- public synchronized boolean uncancel() {
- secondaryQueueAborted = false;
- return super.uncancel();
- }
-
- @Override
- public Iterable<Task<?>> getChildren() {
- return Collections.unmodifiableCollection(secondaryJobsAll);
- }
-
- /** submits the indicated task for execution in the current execution context, and returns immediately */
- protected void submitBackgroundInheritingContext(Task<?> task) {
- BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
- if (log.isTraceEnabled()) {
- log.trace("task {} - submitting background task {} ({})", new Object[] { Tasks.current(), task, ec });
- }
- if (ec==null) {
- String message = Tasks.current()!=null ?
- // user forgot ExecContext:
- "Task "+this+" submitting background task requires an ExecutionContext (an ExecutionManager is not enough): submitting "+task+" in "+Tasks.current()
- : // should not happen:
- "Cannot submit tasks inside DST when not in a task : submitting "+task+" in "+this;
- log.warn(message+" (rethrowing)");
- throw new IllegalStateException(message);
- }
- synchronized (task) {
- if (task.isSubmitted()) {
- if (log.isTraceEnabled()) {
- log.trace("DST "+this+" skipping submission of child "+task+" because it is already submitted");
- }
- } else {
- try {
- ec.submit(task);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- // Give some context when the submit fails (happens when the target is already unmanaged)
- throw new IllegalStateException("Failure submitting task "+task+" in "+this+": "+e.getMessage(), e);
- }
- }
- }
- }
-
- public void setFailureHandlingConfig(FailureHandlingConfig failureHandlingConfig) {
- this.failureHandlingConfig = failureHandlingConfig;
- }
- @Override
- public void swallowChildrenFailures() {
- setFailureHandlingConfig(FailureHandlingConfig.SWALLOWING_CHILDREN_FAILURES);
- }
-
- protected class DstJob implements Callable<T> {
- protected Callable<T> primaryJob;
- /** currently executing (or just completed) secondary task, or null if none;
- * with jobTransitionLock notified on change and completion */
- protected volatile Task<?> currentSecondary = null;
- protected volatile boolean finishedSecondaries = false;
-
- public DstJob(Callable<T> mainJob) {
- this.primaryJob = mainJob;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public T call() throws Exception {
-
- synchronized (jobTransitionLock) {
- primaryStarted = true;
- primaryThread = Thread.currentThread();
- for (Task<?> t: secondaryJobsAll)
- ((TaskInternal<?>)t).markQueued();
- }
- // TODO overkill having a thread/task for this, but it works
- // optimisation would either use newTaskEndCallback property on task to submit
- // or use some kind of single threaded executor for the queued tasks
- Task<List<Object>> secondaryJobMaster = Tasks.<List<Object>>builder().dynamic(false)
- .name("DST manager (internal)")
- // TODO marking it transient helps it be GC'd sooner,
- // but ideally we wouldn't have this,
- // or else it would be a child
- .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
- .body(new Callable<List<Object>>() {
-
- @Override
- public List<Object> call() throws Exception {
- List<Object> result = new ArrayList<Object>();
- try {
- while (!secondaryQueueAborted && (!primaryFinished || !secondaryJobsRemaining.isEmpty())) {
- synchronized (jobTransitionLock) {
- if (!primaryFinished && secondaryJobsRemaining.isEmpty()) {
- currentSecondary = null;
- jobTransitionLock.wait(1000);
- }
- }
- @SuppressWarnings("rawtypes")
- Task secondaryJob = secondaryJobsRemaining.poll();
- if (secondaryJob != null) {
- synchronized (jobTransitionLock) {
- currentSecondary = secondaryJob;
- submitBackgroundInheritingContext(secondaryJob);
- jobTransitionLock.notifyAll();
- }
- try {
- result.add(secondaryJob.get());
- } catch (Exception e) {
- if (TaskTags.isInessential(secondaryJob)) {
- result.add(Tasks.getError(secondaryJob));
- if (log.isDebugEnabled())
- log.debug("Secondary job queue for "+DynamicSequentialTask.this+" ignoring error in inessential task "+secondaryJob+": "+e);
- } else {
- if (failureHandlingConfig.cancelSecondariesOnSecondaryFailure) {
- if (log.isDebugEnabled())
- log.debug("Secondary job queue for "+DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in task "+secondaryJob+": "+e);
- synchronized (jobTransitionLock) {
- for (Task<?> t: secondaryJobsRemaining)
- t.cancel(true);
- jobTransitionLock.notifyAll();
- }
- }
-
- if (failureHandlingConfig.abortSecondaryQueueOnSecondaryFailure) {
- if (log.isDebugEnabled())
- log.debug("Aborting secondary job queue for "+DynamicSequentialTask.this+" due to error in child task "+secondaryJob+" ("+e+", being rethrown)");
- secondaryQueueAborted = true;
- throw e;
- }
-
- if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) {
- cancel(true, false, false);
- }
-
- result.add(Tasks.getError(secondaryJob));
- if (log.isDebugEnabled())
- log.debug("Secondary job queue for "+DynamicSequentialTask.this+" continuing in presence of error in child task "+secondaryJob+" ("+e+", being remembered)");
- }
- }
- }
- }
- } finally {
- synchronized (jobTransitionLock) {
- currentSecondary = null;
- finishedSecondaries = true;
- jobTransitionLock.notifyAll();
- }
- }
- return result;
- }
- }).build();
- ((BasicTask<?>)secondaryJobMaster).proxyTargetTask = DynamicSequentialTask.this;
-
- submitBackgroundInheritingContext(secondaryJobMaster);
-
- T result = null;
- Throwable error = null;
- Throwable uninterestingSelfError = null;
- boolean errorIsFromChild = false;
- try {
- if (log.isTraceEnabled()) log.trace("calling primary job for {}", this);
- if (primaryJob!=null) result = primaryJob.call();
- } catch (Throwable selfException) {
- Exceptions.propagateIfFatal(selfException);
- if (Exceptions.getFirstThrowableOfType(selfException, QueueAbortedException.class) != null) {
- // Error was caused by the task already having failed, and this thread calling queue() to try
- // to queue more work. The underlying cause will be much more interesting.
- // Without this special catch, we record error = "Cannot add a task to ... whose queue has been aborted",
- // which gets propagated instead of the more interesting child exception.
- uninterestingSelfError = selfException;
- } else {
- error = selfException;
- errorIsFromChild = false;
- }
- if (failureHandlingConfig.abortSecondaryQueueOnPrimaryFailure) {
- if (log.isDebugEnabled())
- log.debug("Secondary job queue for "+DynamicSequentialTask.this+" aborting with "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
- secondaryQueueAborted = true;
- }
- if (failureHandlingConfig.cancelSecondariesOnPrimaryFailure) {
- if (log.isDebugEnabled())
- log.debug(DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
- synchronized (jobTransitionLock) {
- for (Task<?> t: secondaryJobsRemaining)
- t.cancel(true);
- // do this early to prevent additions; and note we notify very soon below, so not notify is help off until below
- primaryThread = null;
- primaryFinished = true;
- }
- }
- } finally {
- try {
- if (log.isTraceEnabled()) log.trace("cleaning up for {}", this);
- synchronized (jobTransitionLock) {
- // semaphore might be nicer here (aled notes as it is this is a little hard to read)
- primaryThread = null;
- primaryFinished = true;
- jobTransitionLock.notifyAll();
- }
- if (!isCancelled() && !Thread.currentThread().isInterrupted()) {
- if (log.isTraceEnabled()) log.trace("waiting for secondaries for {}", this);
- // wait on tasks sequentially so that blocking information is more interesting
- DynamicTasks.waitForLast();
- List<Object> result2 = secondaryJobMaster.get();
- try {
- if (primaryJob==null) result = (T)result2;
- } catch (ClassCastException e) { /* ignore class cast exception; leave the result as null */ }
- }
- } catch (Throwable childException) {
- Exceptions.propagateIfFatal(childException);
- if (error==null) {
- error = childException;
- errorIsFromChild = true;
- } else {
- if (log.isDebugEnabled()) log.debug("Parent task "+this+" ignoring child error ("+childException+") in presence of our own error ("+error+")");
- }
- }
- }
- if (error!=null) {
- handleException(error, errorIsFromChild);
- }
- if (uninterestingSelfError != null) {
- handleException(uninterestingSelfError, false);
- }
- return result;
- }
-
- @Override
- public String toString() {
- return "DstJob:"+DynamicSequentialTask.this.getId();
- }
-
- /** waits for this job to complete, or the given time to elapse */
- public void join(boolean includePrimary, Duration optionalTimeout) throws InterruptedException {
- CountdownTimer timeLeft = optionalTimeout!=null ? CountdownTimer.newInstanceStarted(optionalTimeout) : null;
- while (true) {
- Task<?> cs;
- Duration remaining;
- synchronized (jobTransitionLock) {
- cs = currentSecondary;
- if (finishedSecondaries) return;
- remaining = timeLeft==null ? Duration.ONE_SECOND : timeLeft.getDurationRemaining();
- if (!remaining.isPositive()) return;
- if (cs==null) {
- if (!includePrimary && secondaryJobsRemaining.isEmpty()) return;
- // parent still running, no children though
- Tasks.setBlockingTask(DynamicSequentialTask.this);
- jobTransitionLock.wait(remaining.toMilliseconds());
- Tasks.resetBlockingDetails();
- }
- }
- if (cs!=null) {
- Tasks.setBlockingTask(cs);
- cs.blockUntilEnded(remaining);
- Tasks.resetBlockingDetails();
- }
- }
- }
- }
-
- @Override
- public List<Task<?>> getQueue() {
- return ImmutableList.copyOf(secondaryJobsAll);
- }
-
- public void handleException(Throwable throwable, boolean fromChild) throws Exception {
- Exceptions.propagateIfFatal(throwable);
- if (fromChild && !failureHandlingConfig.failParentOnSecondaryFailure) {
- log.debug("Parent task "+this+" swallowing child error: "+throwable);
- return;
- }
- handleException(throwable);
- }
- public void handleException(Throwable throwable) throws Exception {
- Exceptions.propagateIfFatal(throwable);
- if (throwable instanceof Exception) {
- // allow checked exceptions to be passed through
- throw (Exception)throwable;
- }
- throw Exceptions.propagate(throwable);
- }
-
- @Override
- public void drain(Duration optionalTimeout, boolean includePrimary, boolean throwFirstError) {
- try {
- dstJob.join(includePrimary, optionalTimeout);
- } catch (InterruptedException e) {
- throw Exceptions.propagate(e);
- }
- if (throwFirstError) {
- if (isError())
- getUnchecked();
- for (Task<?> t: getQueue())
- if (t.isError() && !TaskTags.isInessential(t))
- t.getUnchecked();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/DynamicTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/DynamicTasks.java b/core/src/main/java/brooklyn/util/task/DynamicTasks.java
deleted file mode 100644
index 9d552c6..0000000
--- a/core/src/main/java/brooklyn/util/task/DynamicTasks.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.management.ExecutionContext;
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-import org.apache.brooklyn.api.management.TaskFactory;
-import org.apache.brooklyn.api.management.TaskQueueingContext;
-import org.apache.brooklyn.api.management.TaskWrapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.time.Duration;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-
-/**
- * Contains static methods which detect and use the current {@link TaskQueueingContext} to execute tasks.
- *
- * @since 0.6.0
- */
-@Beta
-public class DynamicTasks {
-
- private static final Logger log = LoggerFactory.getLogger(DynamicTasks.class);
- private static final ThreadLocal<TaskQueueingContext> taskQueueingContext = new ThreadLocal<TaskQueueingContext>();
-
- public static void setTaskQueueingContext(TaskQueueingContext newTaskQC) {
- taskQueueingContext.set(newTaskQC);
- }
-
- public static TaskQueueingContext getThreadTaskQueuingContext() {
- return taskQueueingContext.get();
- }
-
- public static TaskQueueingContext getTaskQueuingContext() {
- TaskQueueingContext adder = getThreadTaskQueuingContext();
- if (adder!=null) return adder;
- Task<?> t = Tasks.current();
- if (t instanceof TaskQueueingContext) return (TaskQueueingContext) t;
- return null;
- }
-
-
- public static void removeTaskQueueingContext() {
- taskQueueingContext.remove();
- }
-
- public static class TaskQueueingResult<T> implements TaskWrapper<T> {
- private final Task<T> task;
- private final boolean wasQueued;
- private ExecutionContext execContext = null;
-
- private TaskQueueingResult(TaskAdaptable<T> task, boolean wasQueued) {
- this.task = task.asTask();
- this.wasQueued = wasQueued;
- }
- @Override
- public Task<T> asTask() {
- return task;
- }
- @Override
- public Task<T> getTask() {
- return task;
- }
- /** returns true if the task was queued */
- public boolean wasQueued() {
- return wasQueued;
- }
- /** returns true if the task either is currently queued or has been submitted */
- public boolean isQueuedOrSubmitted() {
- return wasQueued || Tasks.isQueuedOrSubmitted(task);
- }
- /** specifies an execContext to use if the task has to be explicitly submitted;
- * if omitted it will attempt to find one based on the current thread's context */
- public TaskQueueingResult<T> executionContext(ExecutionContext execContext) {
- this.execContext = execContext;
- return this;
- }
- /** as {@link #executionContext(ExecutionContext)} but inferring from the entity */
- public TaskQueueingResult<T> executionContext(Entity entity) {
- this.execContext = ((EntityInternal)entity).getManagementSupport().getExecutionContext();
- return this;
- }
- private boolean orSubmitInternal() {
- if (!wasQueued()) {
- if (isQueuedOrSubmitted()) {
- log.warn("Redundant call to execute "+getTask()+"; skipping");
- return false;
- } else {
- ExecutionContext ec = execContext;
- if (ec==null)
- ec = BasicExecutionContext.getCurrentExecutionContext();
- if (ec==null)
- throw new IllegalStateException("Cannot execute "+getTask()+" without an execution context; ensure caller is in an ExecutionContext");
- ec.submit(getTask());
- return true;
- }
- } else {
- return false;
- }
- }
- /** causes the task to be submitted (asynchronously) if it hasn't already been,
- * requiring an entity execution context (will try to find a default if not set) */
- public TaskQueueingResult<T> orSubmitAsync() {
- orSubmitInternal();
- return this;
- }
- /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */
- public TaskQueueingResult<T> orSubmitAsync(Entity entity) {
- executionContext(entity);
- return orSubmitAsync();
- }
- /** causes the task to be submitted *synchronously* if it hasn't already been submitted;
- * useful in contexts such as libraries where callers may be either on a legacy call path
- * (which assumes all commands complete immediately);
- * requiring an entity execution context (will try to find a default if not set) */
- public TaskQueueingResult<T> orSubmitAndBlock() {
- if (orSubmitInternal()) task.getUnchecked();
- return this;
- }
- /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */
- public TaskQueueingResult<T> orSubmitAndBlock(Entity entity) {
- executionContext(entity);
- return orSubmitAndBlock();
- }
- /** blocks for the task to be completed
- * <p>
- * needed in any context where subsequent commands assume the task has completed.
- * not needed in a context where the task is simply being built up and queued.
- * <p>
- * throws if there are any errors
- */
- public T andWaitForSuccess() {
- return task.getUnchecked();
- }
- public void orCancel() {
- if (!wasQueued()) {
- task.cancel(false);
- }
- }
- }
-
- /**
- * Tries to add the task to the current addition context if there is one, otherwise does nothing.
- * <p/>
- * Call {@link TaskQueueingResult#orSubmitAsync() orSubmitAsync()} on the returned
- * {@link TaskQueueingResult TaskQueueingResult} to handle execution of tasks in a
- * {@link BasicExecutionContext}.
- */
- public static <T> TaskQueueingResult<T> queueIfPossible(TaskAdaptable<T> task) {
- TaskQueueingContext adder = getTaskQueuingContext();
- boolean result = false;
- if (adder!=null)
- result = Tasks.tryQueueing(adder, task);
- return new TaskQueueingResult<T>(task, result);
- }
-
- /** @see #queueIfPossible(TaskAdaptable) */
- public static <T> TaskQueueingResult<T> queueIfPossible(TaskFactory<? extends TaskAdaptable<T>> task) {
- return queueIfPossible(task.newTask());
- }
-
- /** adds the given task to the nearest task addition context,
- * either set as a thread-local, or in the current task, or the submitter of the task, etc
- * <p>
- * throws if it cannot add */
- public static <T> Task<T> queueInTaskHierarchy(Task<T> task) {
- Preconditions.checkNotNull(task, "Task to queue cannot be null");
- Preconditions.checkState(!Tasks.isQueuedOrSubmitted(task), "Task to queue must not yet be submitted: {}", task);
-
- TaskQueueingContext adder = getTaskQueuingContext();
- if (adder!=null) {
- if (Tasks.tryQueueing(adder, task)) {
- log.debug("Queued task {} at context {} (no hierarchy)", task, adder);
- return task;
- }
- }
-
- Task<?> t = Tasks.current();
- Preconditions.checkState(t!=null || adder!=null, "No task addition context available for queueing task "+task);
-
- while (t!=null) {
- if (t instanceof TaskQueueingContext) {
- if (Tasks.tryQueueing((TaskQueueingContext)t, task)) {
- log.debug("Queued task {} at hierarchical context {}", task, t);
- return task;
- }
- }
- t = t.getSubmittedByTask();
- }
-
- throw new IllegalStateException("No task addition context available in current task hierarchy for adding task "+task);
- }
-
- /**
- * Queues the given task.
- * <p/>
- * This method is only valid within a dynamic task. Use {@link #queueIfPossible(TaskAdaptable)}
- * and {@link TaskQueueingResult#orSubmitAsync()} if the calling context is a basic task.
- *
- * @param task The task to queue
- * @throws IllegalStateException if no task queueing context is available
- * @return The queued task
- */
- public static <V extends TaskAdaptable<?>> V queue(V task) {
- try {
- Preconditions.checkNotNull(task, "Task to queue cannot be null");
- Preconditions.checkState(!Tasks.isQueued(task), "Task to queue must not yet be queued: %s", task);
- TaskQueueingContext adder = getTaskQueuingContext();
- if (adder==null) {
- throw new IllegalStateException("Task "+task+" cannot be queued here; no queueing context available");
- }
- adder.queue(task.asTask());
- return task;
- } catch (Throwable e) {
- log.warn("Error queueing "+task+" (rethrowing): "+e);
- throw Exceptions.propagate(e);
- }
- }
-
- /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */
- public static void queue(TaskAdaptable<?> task1, TaskAdaptable<?> task2, TaskAdaptable<?> ...tasks) {
- queue(task1);
- queue(task2);
- for (TaskAdaptable<?> task: tasks) queue(task);
- }
-
- /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */
- public static <T extends TaskAdaptable<?>> T queue(TaskFactory<T> taskFactory) {
- return queue(taskFactory.newTask());
- }
-
- /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */
- public static void queue(TaskFactory<?> task1, TaskFactory<?> task2, TaskFactory<?> ...tasks) {
- queue(task1.newTask());
- queue(task2.newTask());
- for (TaskFactory<?> task: tasks) queue(task.newTask());
- }
-
- /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */
- public static <T> Task<T> queue(String name, Callable<T> job) {
- return DynamicTasks.queue(Tasks.<T>builder().name(name).body(job).build());
- }
-
- /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */
- public static <T> Task<T> queue(String name, Runnable job) {
- return DynamicTasks.queue(Tasks.<T>builder().name(name).body(job).build());
- }
-
- /** queues the task if needed, i.e. if it is not yet submitted (so it will run),
- * or if it is submitted but not queued and we are in a queueing context (so it is available for informational purposes) */
- public static <T extends TaskAdaptable<?>> T queueIfNeeded(T task) {
- if (!Tasks.isQueued(task)) {
- if (Tasks.isSubmitted(task) && getTaskQueuingContext()==null) {
- // already submitted and not in a queueing context, don't try to queue
- } else {
- // needs submitting, put it in the queue
- // (will throw an error if we are not a queueing context)
- queue(task);
- }
- }
- return task;
- }
-
- /** submits/queues the given task if needed, and gets the result (unchecked)
- * only permitted in a queueing context (ie a DST main job) if the task is not yet submitted */
- // things get really confusing if you try to queueInTaskHierarchy -- easy to cause deadlocks!
- public static <T> T get(TaskAdaptable<T> t) {
- return queueIfNeeded(t).asTask().getUnchecked();
- }
-
- /** As {@link #drain(Duration, boolean)} waiting forever and throwing the first error
- * (excluding errors in inessential tasks),
- * then returning the last task in the queue (which is guaranteed to have finished without error,
- * if this method returns without throwing) */
- public static Task<?> waitForLast() {
- drain(null, true);
- // this call to last is safe, as the above guarantees everything will have run
- // (on errors the above will throw so we won't come here)
- List<Task<?>> q = DynamicTasks.getTaskQueuingContext().getQueue();
- return q.isEmpty() ? null : Iterables.getLast(q);
- }
-
- /** Calls {@link TaskQueueingContext#drain(Duration, boolean, boolean)} on the current task context */
- public static TaskQueueingContext drain(Duration optionalTimeout, boolean throwFirstError) {
- TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
- Preconditions.checkNotNull(qc, "Cannot drain when there is no queueing context");
- qc.drain(optionalTimeout, false, throwFirstError);
- return qc;
- }
-
- /** as {@link Tasks#swallowChildrenFailures()} but requiring a {@link TaskQueueingContext}. */
- @Beta
- public static void swallowChildrenFailures() {
- Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here");
- Tasks.swallowChildrenFailures();
- }
-
- /** same as {@link Tasks#markInessential()}
- * (but included here for convenience as it is often used in conjunction with {@link DynamicTasks}) */
- public static void markInessential() {
- Tasks.markInessential();
- }
-
- /** queues the task if possible, otherwise submits it asynchronously; returns the task for callers to
- * {@link Task#getUnchecked()} or {@link Task#blockUntilEnded()} */
- public static <T> Task<T> submit(TaskAdaptable<T> task, Entity entity) {
- return queueIfPossible(task).orSubmitAsync(entity).asTask();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ExecutionListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/ExecutionListener.java b/core/src/main/java/brooklyn/util/task/ExecutionListener.java
deleted file mode 100644
index 7753588..0000000
--- a/core/src/main/java/brooklyn/util/task/ExecutionListener.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import org.apache.brooklyn.api.management.Task;
-
-public interface ExecutionListener {
-
- /** invoked when a task completes:
- * {@link Task#getEndTimeUtc()} and {@link Task#isDone()} are guaranteed to be set,
- * and {@link Task#get()} should return immediately for most Task implementations
- * (care has been taken to avoid potential deadlocks here, waiting for a result!) */
- public void onTaskDone(Task<?> task);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ExecutionUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/ExecutionUtils.java b/core/src/main/java/brooklyn/util/task/ExecutionUtils.java
deleted file mode 100644
index 37c19d2..0000000
--- a/core/src/main/java/brooklyn/util/task/ExecutionUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import groovy.lang.Closure;
-
-import java.util.concurrent.Callable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Throwables;
-
-public class ExecutionUtils {
- /**
- * Attempts to run/call the given object, with the given arguments if possible, preserving the return value if there is one (null otherwise);
- * throws exception if the callable is a non-null object which cannot be invoked (not a callable or runnable)
- * @deprecated since 0.7.0 ; this super-loose typing should be avoided; if it is needed, let's move it to one of the Groovy compatibility classes
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static Object invoke(Object callable, Object ...args) {
- if (callable instanceof Closure) return ((Closure<?>)callable).call(args);
- if (callable instanceof Callable) {
- try {
- return ((Callable<?>)callable).call();
- } catch (Throwable t) {
- throw Throwables.propagate(t);
- }
- }
- if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; }
- if (callable instanceof Function && args.length == 1) { return ((Function)callable).apply(args[0]); }
- if (callable==null) return null;
- throw new IllegalArgumentException("Cannot invoke unexpected object "+callable+" of type "+callable.getClass()+", with "+args.length+" args");
- }
-}