You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ha...@apache.org on 2015/08/17 21:17:53 UTC

[22/42] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/util

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java
new file mode 100644
index 0000000..794dea9
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.brooklyn.api.management.Task;
+
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class ForwardingTask<T> extends ForwardingObject implements TaskInternal<T> {
+
+    /** Constructor for use by subclasses. */
+    protected ForwardingTask() {}
+
+    @Override
+    protected abstract TaskInternal<T> delegate();
+
+    @Override
+    public void addListener(Runnable listener, Executor executor) {
+        delegate().addListener(listener, executor);
+    }
+
+    @Override
+    public boolean cancel(boolean arg0) {
+        return delegate().cancel(arg0);
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        return delegate().get();
+    }
+
+    @Override
+    public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
+        return delegate().get(arg0, arg1);
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return delegate().isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+        return delegate().isDone();
+    }
+
+    @Override
+    public Task<T> asTask() {
+        return delegate().asTask();
+    }
+
+    @Override
+    public String getId() {
+        return delegate().getId();
+    }
+
+    @Override
+    public Set<Object> getTags() {
+        return delegate().getTags();
+    }
+
+    @Override
+    public long getSubmitTimeUtc() {
+        return delegate().getSubmitTimeUtc();
+    }
+
+    @Override
+    public long getStartTimeUtc() {
+        return delegate().getStartTimeUtc();
+    }
+
+    @Override
+    public long getEndTimeUtc() {
+        return delegate().getEndTimeUtc();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return delegate().getDisplayName();
+    }
+
+    @Override
+    public String getDescription() {
+        return delegate().getDescription();
+    }
+
+    @Override
+    public Task<?> getSubmittedByTask() {
+        return delegate().getSubmittedByTask();
+    }
+
+    @Override
+    public Thread getThread() {
+        return delegate().getThread();
+    }
+
+    @Override
+    public boolean isSubmitted() {
+        return delegate().isSubmitted();
+    }
+
+    @Override
+    public boolean isBegun() {
+        return delegate().isBegun();
+    }
+
+    @Override
+    public boolean isError() {
+        return delegate().isError();
+    }
+
+    @Override
+    public void blockUntilStarted() {
+        delegate().blockUntilStarted();
+    }
+
+    @Override
+    public void blockUntilEnded() {
+        delegate().blockUntilEnded();
+    }
+
+    @Override
+    public boolean blockUntilEnded(Duration timeout) {
+        return delegate().blockUntilEnded(timeout);
+    }
+
+    @Override
+    public String getStatusSummary() {
+        return delegate().getStatusSummary();
+    }
+
+    @Override
+    public String getStatusDetail(boolean multiline) {
+        return delegate().getStatusDetail(multiline);
+    }
+
+    @Override
+    public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException {
+        return delegate().get(duration);
+    }
+
+    @Override
+    public T getUnchecked() {
+        return delegate().getUnchecked();
+    }
+
+    @Override
+    public T getUnchecked(Duration duration) {
+        return delegate().getUnchecked(duration);
+    }
+
+    @Override
+    public void initInternalFuture(ListenableFuture<T> result) {
+        delegate().initInternalFuture(result);
+    }
+
+    @Override
+    public long getQueuedTimeUtc() {
+        return delegate().getQueuedTimeUtc();
+    }
+
+    @Override
+    public Future<T> getInternalFuture() {
+        return delegate().getInternalFuture();
+    }
+
+    @Override
+    public boolean isQueued() {
+        return delegate().isQueued();
+    }
+
+    @Override
+    public boolean isQueuedOrSubmitted() {
+        return delegate().isQueuedOrSubmitted();
+    }
+
+    @Override
+    public boolean isQueuedAndNotSubmitted() {
+        return delegate().isQueuedAndNotSubmitted();
+    }
+
+    @Override
+    public void markQueued() {
+        delegate().markQueued();
+    }
+
+    @Override
+    public boolean cancel() {
+        return delegate().cancel();
+    }
+
+    @Override
+    public boolean blockUntilStarted(Duration timeout) {
+        return delegate().blockUntilStarted(timeout);
+    }
+
+    @Override
+    public String setBlockingDetails(String blockingDetails) {
+        return delegate().setBlockingDetails(blockingDetails);
+    }
+
+    @Override
+    public Task<?> setBlockingTask(Task<?> blockingTask) {
+        return delegate().setBlockingTask(blockingTask);
+    }
+
+    @Override
+    public void resetBlockingDetails() {
+        delegate().resetBlockingDetails();
+    }
+
+    @Override
+    public void resetBlockingTask() {
+        delegate().resetBlockingTask();
+    }
+
+    @Override
+    public String getBlockingDetails() {
+        return delegate().getBlockingDetails();
+    }
+
+    @Override
+    public Task<?> getBlockingTask() {
+        return delegate().getBlockingTask();
+    }
+
+    @Override
+    public void setExtraStatusText(Object extraStatus) {
+        delegate().setExtraStatusText(extraStatus);
+    }
+
+    @Override
+    public Object getExtraStatusText() {
+        return delegate().getExtraStatusText();
+    }
+
+    @Override
+    public void runListeners() {
+        delegate().runListeners();
+    }
+
+    @Override
+    public void setEndTimeUtc(long val) {
+        delegate().setEndTimeUtc(val);
+    }
+
+    @Override
+    public void setThread(Thread thread) {
+        delegate().setThread(thread);
+    }
+
+    @Override
+    public Callable<T> getJob() {
+        return delegate().getJob();
+    }
+
+    @Override
+    public void setJob(Callable<T> job) {
+        delegate().setJob(job);
+    }
+
+    @Override
+    public ExecutionList getListeners() {
+        return delegate().getListeners();
+    }
+
+    @Override
+    public void setSubmitTimeUtc(long currentTimeMillis) {
+        delegate().setSubmitTimeUtc(currentTimeMillis);
+    }
+
+    @Override
+    public void setSubmittedByTask(Task<?> task) {
+        delegate().setSubmittedByTask(task);
+    }
+
+    @Override
+    public Set<Object> getMutableTags() {
+        return delegate().getMutableTags();
+    }
+
+    @Override
+    public void setStartTimeUtc(long currentTimeMillis) {
+        delegate().setStartTimeUtc(currentTimeMillis);
+    }
+
+    @Override
+    public void applyTagModifier(Function<Set<Object>, Void> modifier) {
+        delegate().applyTagModifier(modifier);
+    }
+    
+    @Override
+    public Task<?> getProxyTarget() {
+        return delegate().getProxyTarget();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java
new file mode 100644
index 0000000..bfe88b0
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the resposibility to:
+ * <li> invoke the listeners on job completion (success or error)
+ * <li> invoke the listeners on cancel */
+public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> {
+
+    final ExecutionList listeners;
+    
+    protected ListenableForwardingFuture(Future<T> delegate) {
+        super(delegate);
+        this.listeners = new ExecutionList();
+    }
+
+    protected ListenableForwardingFuture(Future<T> delegate, ExecutionList list) {
+        super(delegate);
+        this.listeners = list;
+    }
+
+    @Override
+    public void addListener(Runnable listener, Executor executor) {
+        listeners.add(listener, executor);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java
new file mode 100644
index 0000000..10da414
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.brooklyn.api.management.Task;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.text.Strings;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Runs {@link Task}s in parallel.
+ *
+ * No guarantees of order of starting the tasks, but the return value is a
+ * {@link List} of the return values of supplied tasks in the same
+ * order they were passed as arguments.
+ */
+public class ParallelTask<T> extends CompoundTask<T> {
+    public ParallelTask(Object... tasks) { super(tasks); }
+    
+    public ParallelTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
+    public ParallelTask(Collection<? extends Object> tasks) { super(tasks); }
+    
+    public ParallelTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
+    public ParallelTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
+
+    @Override
+    protected List<T> runJobs() throws InterruptedException, ExecutionException {
+        setBlockingDetails("Executing "+
+                (children.size()==1 ? "1 child task" :
+                children.size()+" children tasks in parallel") );
+        for (Task<? extends T> task : children) {
+            submitIfNecessary(task);
+        }
+
+        List<T> result = Lists.newArrayList();
+        List<Exception> exceptions = Lists.newArrayList();
+        for (Task<? extends T> task : children) {
+            T x;
+            try {
+                x = task.get();
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+                if (TaskTags.isInessential(task)) {
+                    // ignore exception as it's inessential
+                } else {
+                    exceptions.add(e);
+                }
+                x = null;
+            }
+            result.add(x);
+        }
+        
+        if (exceptions.isEmpty()) {
+            return result;
+        } else {
+            if (result.size()==1 && exceptions.size()==1)
+                throw Exceptions.propagate( exceptions.get(0) );
+            throw Exceptions.propagate(exceptions.size()+" of "+result.size()+" parallel child task"+Strings.s(result.size())+" failed", exceptions);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java
new file mode 100644
index 0000000..5c4b208
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static brooklyn.util.GroovyJavaMethods.elvis;
+import static brooklyn.util.GroovyJavaMethods.truth;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Throwables;
+
+/**
+ * A task which runs with a fixed period.
+ * <p>
+ * Note that some termination logic, including {@link #addListener(Runnable, java.util.concurrent.Executor)},
+ * is not precisely defined. 
+ */
+// TODO ScheduledTask is a very pragmatic implementation; would be nice to tighten, 
+// reduce external assumptions about internal structure, and clarify "done" semantics
+public class ScheduledTask extends BasicTask {
+    
+    final Callable<Task<?>> taskFactory;
+    /** initial delay before running, set as flag in constructor; defaults to 0 */
+    protected Duration delay;
+    /** time to wait between executions, or null if not to repeat (default), set as flag to constructor;
+     * this may be modified for subsequent submissions by a running task generated by the factory 
+     * using getSubmittedByTask().setPeriod(Duration) */
+    protected Duration period = null;
+    /** optional, set as flag in constructor; defaults to null meaning no limit */
+    protected Integer maxIterations = null;
+    
+    protected int runCount=0;
+    protected Task<?> recentRun, nextRun;
+
+    public int getRunCount() { return runCount; }
+    public ScheduledFuture<?> getNextScheduled() { return (ScheduledFuture<?>)internalFuture; }
+
+    public ScheduledTask(Callable<Task<?>> taskFactory) {
+        this(MutableMap.of(), taskFactory);
+    }
+
+    public ScheduledTask(final Task<?> task) {
+        this(MutableMap.of(), task);
+    }
+
+    public ScheduledTask(Map flags, final Task<?> task){
+        this(flags, new Callable<Task<?>>(){
+            @Override
+            public Task<?> call() throws Exception {
+                return task;
+            }});
+    }
+
+    public ScheduledTask(Map flags, Callable<Task<?>> taskFactory) {
+        super(flags);
+        this.taskFactory = taskFactory;
+        
+        delay = Duration.of(elvis(flags.remove("delay"), 0));
+        period = Duration.of(elvis(flags.remove("period"), null));
+        maxIterations = elvis(flags.remove("maxIterations"), null);
+    }
+    
+    public ScheduledTask delay(Duration d) {
+        this.delay = d;
+        return this;
+    }
+    public ScheduledTask delay(long val) {
+        return delay(Duration.millis(val));
+    }
+
+    public ScheduledTask period(Duration d) {
+        this.period = d;
+        return this;
+    }
+    public ScheduledTask period(long val) {
+        return period(Duration.millis(val));
+    }
+
+    public ScheduledTask maxIterations(int val) {
+        this.maxIterations = val;
+        return this;
+    }
+
+    public Callable<Task<?>> getTaskFactory() {
+        return taskFactory;
+    }
+
+    public Task<?> newTask() {
+        try {
+            return taskFactory.call();
+        } catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+    
+    protected String getActiveTaskStatusString(int verbosity) {
+        StringBuilder rv = new StringBuilder("Scheduler");
+        if (runCount>0) rv.append(", iteration "+(runCount+1));
+        if (recentRun!=null) rv.append(", last run "+
+            Duration.sinceUtc(recentRun.getStartTimeUtc())+" ms ago");
+        if (truth(getNextScheduled())) {
+            Duration untilNext = Duration.millis(getNextScheduled().getDelay(TimeUnit.MILLISECONDS));
+            if (untilNext.isPositive())
+                rv.append(", next in "+untilNext);
+            else 
+                rv.append(", next imminent");
+        }
+        return rv.toString();
+    }
+    
+    @Override
+    public boolean isDone() {
+        return isCancelled() || (maxIterations!=null && maxIterations <= runCount) || (period==null && nextRun!=null && nextRun.isDone());
+    }
+    
+    public synchronized void blockUntilFirstScheduleStarted() {
+        // TODO Assumes that maxIterations is not negative!
+        while (true) {
+            if (isCancelled()) throw new CancellationException();
+            if (recentRun==null)
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    Throwables.propagate(e);
+                }
+            if (recentRun!=null) return;
+        }
+    }
+    
+    public void blockUntilEnded() {
+        while (!isDone()) super.blockUntilEnded();
+    }
+
+    /** gets the value of the most recently run task */
+    public Object get() throws InterruptedException, ExecutionException {
+        blockUntilStarted();
+        blockUntilFirstScheduleStarted();
+        return (truth(recentRun)) ? recentRun.get() : internalFuture.get();
+    }
+    
+    @Override
+    public synchronized boolean cancel(boolean mayInterrupt) {
+        boolean result = super.cancel(mayInterrupt);
+        if (nextRun!=null) {
+            nextRun.cancel(mayInterrupt);
+            notifyAll();
+        }
+        return result;
+    }
+    
+    /** internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation 
+     * @param duration */ 
+    @Beta
+    public boolean blockUntilNextRunFinished(Duration timeout) {
+        return Tasks.blockUntilInternalTasksEnded(nextRun, timeout);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java
new file mode 100644
index 0000000..93ecdf6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.brooklyn.api.management.Task;
+
+import com.google.common.collect.ImmutableList;
+
+
+/** runs tasks in order, waiting for one to finish before starting the next; return value here is TBD;
+ * (currently is all the return values of individual tasks, but we
+ * might want some pipeline support and eventually only to return final value...) */
+public class SequentialTask<T> extends CompoundTask<T> {
+
+    public SequentialTask(Object... tasks) { super(tasks); }
+    
+    public SequentialTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
+    public SequentialTask(Collection<? extends Object> tasks) { super(tasks); }
+    
+    public SequentialTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
+    public SequentialTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
+    
+    protected List<T> runJobs() throws InterruptedException, ExecutionException {
+        setBlockingDetails("Executing "+
+                (children.size()==1 ? "1 child task" :
+                children.size()+" children tasks sequentially") );
+
+        List<T> result = new ArrayList<T>();
+        for (Task<? extends T> task : children) {
+            submitIfNecessary(task);
+            // throw exception (and cancel subsequent tasks) on error
+            result.add(task.get());
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java b/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java
new file mode 100644
index 0000000..2a5b51d
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.brooklyn.api.management.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Instances of this class ensures that {@link Task}s execute with in-order
+ * single-threaded semantics.
+ *
+ * Tasks can be presented through {@link #submit(Callable)}. The order of execution is the
+ * sumbission order.
+ * <p>
+ * This implementation does so by blocking on a {@link ConcurrentLinkedQueue}, <em>after</em>
+ * the task is started in a thread (and {@link Task#isBegun()} returns true), but (of course)
+ * <em>before</em> the {@link TaskInternal#getJob()} actually gets invoked.
+ */
+public class SingleThreadedScheduler implements TaskScheduler, CanSetName {
+    private static final Logger LOG = LoggerFactory.getLogger(SingleThreadedScheduler.class);
+    
+    private final Queue<QueuedSubmission<?>> order = new ConcurrentLinkedQueue<QueuedSubmission<?>>();
+    private int queueSize = 0;
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    
+    private ExecutorService executor;
+
+    private String name;
+    
+    @Override
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String toString() {
+        return name!=null ? "SingleThreadedScheduler["+name+"]" : super.toString();
+    }
+    
+    @Override
+    public void injectExecutor(ExecutorService executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public synchronized <T> Future<T> submit(Callable<T> c) {
+        if (running.compareAndSet(false, true)) {
+            return executeNow(c);
+        } else {
+            WrappingFuture<T> f = new WrappingFuture<T>();
+            order.add(new QueuedSubmission<T>(c, f));
+            queueSize++;
+            if (queueSize>0 && (queueSize == 50 || (queueSize<=500 && (queueSize%100)==0) || (queueSize%1000)==0) && queueSize!=lastSizeWarn) {
+                LOG.warn("{} is backing up, {} tasks queued", this, queueSize);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Task queue backing up detail, queue "+this+"; task context is "+Tasks.current()+"; latest task is "+c+"; first task is "+order.peek());
+                }
+                lastSizeWarn = queueSize;
+            }
+            return f;
+        }
+    }
+    int lastSizeWarn = 0;
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private synchronized void onEnd() {
+        boolean done = false;
+        while (!done) {
+            if (order.isEmpty()) {
+                running.set(false);
+                done = true;
+            } else {
+                QueuedSubmission<?> qs = order.remove();
+                queueSize--;
+                if (!qs.f.isCancelled()) {
+                    Future future = executeNow(qs.c);
+                    qs.f.setDelegate(future);
+                    done = true;
+                }
+            }
+        }
+    }
+
+    private synchronized <T> Future<T> executeNow(final Callable<T> c) {
+        return executor.submit(new Callable<T>() {
+            @Override public T call() throws Exception {
+                try {
+                    return c.call();
+                } finally {
+                    onEnd();
+                }
+            }});
+    }
+    
+    
+    private static class QueuedSubmission<T> {
+        final Callable<T> c;
+        final WrappingFuture<T> f;
+        
+        QueuedSubmission(Callable<T> c, WrappingFuture<T> f) {
+            this.c = c;
+            this.f = f;
+        }
+        
+        @Override
+        public String toString() {
+            return "QueuedSubmission["+c+"]@"+Integer.toHexString(System.identityHashCode(this));
+        }
+    }
+    
+    /**
+     * A future, where the task may not yet have been submitted to the real executor.
+     * It delegates to the real future if present, and otherwise waits for that to appear
+     */
+    private static class WrappingFuture<T> implements Future<T> {
+        private volatile Future<T> delegate;
+        private boolean cancelled;
+        
+        void setDelegate(Future<T> delegate) {
+            synchronized (this) {
+                this.delegate = delegate;
+                notifyAll();
+            }
+        }
+        
+        @Override public boolean cancel(boolean mayInterruptIfRunning) {
+            if (delegate != null) {
+                return delegate.cancel(mayInterruptIfRunning);
+            } else {
+                cancelled = true;
+                synchronized (this) {
+                    notifyAll();
+                }
+                return true;
+            }
+        }
+        
+        @Override public boolean isCancelled() {
+            if (delegate != null) {
+                return delegate.isCancelled();
+            } else {
+                return cancelled;
+            }
+        }
+        
+        @Override public boolean isDone() {
+            return (delegate != null) ? delegate.isDone() : cancelled;
+        }
+        
+        @Override public T get() throws CancellationException, ExecutionException, InterruptedException {
+            if (cancelled) {
+                throw new CancellationException();
+            } else if (delegate != null) {
+                return delegate.get();
+            } else {
+                synchronized (this) {
+                    while (delegate == null && !cancelled) {
+                        wait();
+                    }
+                }
+                return get();
+            }
+        }
+        
+        @Override public T get(long timeout, TimeUnit unit) throws CancellationException, ExecutionException, InterruptedException, TimeoutException {
+            long endtime = System.currentTimeMillis()+unit.toMillis(timeout);
+            
+            if (cancelled) {
+                throw new CancellationException();
+            } else if (delegate != null) {
+                return delegate.get(timeout, unit);
+            } else if (System.currentTimeMillis() >= endtime) {
+                throw new TimeoutException();
+            } else {
+                synchronized (this) {
+                    while (delegate == null && !cancelled && System.currentTimeMillis() < endtime) {
+                        long remaining = endtime - System.currentTimeMillis();
+                        if (remaining > 0) {
+                            wait(remaining);
+                        }
+                    }
+                }
+                long remaining = endtime - System.currentTimeMillis();
+                return get(remaining, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java
new file mode 100644
index 0000000..b105a00
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.api.management.TaskFactory;
+import org.apache.brooklyn.api.management.TaskQueueingContext;
+
+import brooklyn.util.JavaGroovyEquivalents;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+
+import com.google.common.collect.Iterables;
+
+/** Convenience for creating tasks; note that DynamicSequentialTask is the default */
+public class TaskBuilder<T> {
+
+    String name = null;
+    String description = null;
+    Callable<T> body = null;
+    Boolean swallowChildrenFailures = null;
+    List<TaskAdaptable<?>> children = MutableList.of();
+    Set<Object> tags = MutableSet.of();
+    Map<String,Object> flags = MutableMap.of();
+    Boolean dynamic = null;
+    boolean parallel = false;
+    
+    public static <T> TaskBuilder<T> builder() {
+        return new TaskBuilder<T>();
+    }
+    
+    public TaskBuilder<T> name(String name) {
+        this.name = name;
+        return this;
+    }
+    
+    public TaskBuilder<T> description(String description) {
+        this.description = description;
+        return this;
+    }
+    
+    /** whether task that is built has been explicitly specified to be a dynamic task 
+     * (ie a Task which is also a {@link TaskQueueingContext}
+     * whereby new tasks can be added after creation */
+    public TaskBuilder<T> dynamic(boolean dynamic) {
+        this.dynamic = dynamic;
+        return this;
+    }
+    
+    /** whether task that is built should be parallel; cannot (currently) also be dynamic */
+    public TaskBuilder<T> parallel(boolean parallel) {
+        this.parallel = parallel;
+        return this;
+    }
+    
+    public TaskBuilder<T> body(Callable<T> body) {
+        this.body = body;
+        return this;
+    }
+    
+    /** sets up a dynamic task not to fail even if children fail */
+    public TaskBuilder<T> swallowChildrenFailures(boolean swallowChildrenFailures) {
+        this.swallowChildrenFailures = swallowChildrenFailures;
+        return this;
+    }
+    
+    public TaskBuilder<T> body(Runnable body) {
+        this.body = JavaGroovyEquivalents.<T>toCallable(body);
+        return this;
+    }
+
+    /** adds a child to the given task; the semantics of how the child is executed is set using
+     * {@link #dynamic(boolean)} and {@link #parallel(boolean)} */
+    public TaskBuilder<T> add(TaskAdaptable<?> child) {
+        children.add(child);
+        return this;
+    }
+
+    public TaskBuilder<T> addAll(Iterable<? extends TaskAdaptable<?>> additionalChildren) {
+        Iterables.addAll(children, additionalChildren);
+        return this;
+    }
+
+    public TaskBuilder<T> add(TaskAdaptable<?>... additionalChildren) {
+        children.addAll(Arrays.asList(additionalChildren));
+        return this;
+    }
+
+    /** adds a tag to the given task */
+    public TaskBuilder<T> tag(Object tag) {
+        tags.add(tag);
+        return this;
+    }
+    
+    /** adds a flag to the given task */
+    public TaskBuilder<T> flag(String flag, Object value) {
+        flags.put(flag, value);
+        return this;
+    }
+
+    /** adds the given flags to the given task */
+    public TaskBuilder<T> flags(Map<String,Object> flags) {
+        this.flags.putAll(flags);
+        return this;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public Task<T> build() {
+        MutableMap<String, Object> taskFlags = MutableMap.copyOf(flags);
+        if (name!=null) taskFlags.put("displayName", name);
+        if (description!=null) taskFlags.put("description", description);
+        if (!tags.isEmpty()) taskFlags.put("tags", tags);
+        
+        if (Boolean.FALSE.equals(dynamic) && children.isEmpty()) {
+            if (swallowChildrenFailures!=null)
+                throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this);
+            return new BasicTask<T>(taskFlags, body);
+        }
+        
+        // prefer dynamic set unless (a) user has said not dynamic, or (b) it's parallel (since there is no dynamic parallel yet)
+        // dynamic has better cancel (will interrupt the thread) and callers can submit tasks flexibly;
+        // however dynamic uses an extra thread and task and is noisy for contexts which don't need it
+        if (Boolean.TRUE.equals(dynamic) || (dynamic==null && !parallel)) {
+            if (parallel)
+                throw new UnsupportedOperationException("No implementation of parallel dynamic aggregate task available");
+            DynamicSequentialTask<T> result = new DynamicSequentialTask<T>(taskFlags, body);
+            if (swallowChildrenFailures!=null && swallowChildrenFailures.booleanValue()) result.swallowChildrenFailures();
+            for (TaskAdaptable t: children)
+                result.queue(t.asTask());
+            return result;
+        }
+        
+        // T must be of type List<V> for these to be valid
+        if (body != null) {
+            throw new UnsupportedOperationException("No implementation of non-dynamic task with both body and children");
+        }
+        if (swallowChildrenFailures!=null) {
+            throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this);
+        }
+        
+        if (parallel)
+            return new ParallelTask(taskFlags, children);
+        else
+            return new SequentialTask(taskFlags, children);
+    }
+
+    /** returns a a factory based on this builder */
+    public TaskFactory<Task<T>> buildFactory() {
+        return new TaskFactory<Task<T>>() {
+            public Task<T> newTask() {
+                return build();
+            }
+        };
+    }
+    
+    @Override
+    public String toString() {
+        return super.toString()+"["+name+"]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java
new file mode 100644
index 0000000..b4a6569
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.brooklyn.api.management.ExecutionManager;
+import org.apache.brooklyn.api.management.Task;
+
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * All tasks being passed to the {@link ExecutionManager} should implement this.
+ * Users are strongly encouraged to use (or extend) {@link BasicTask}, rather than
+ * implementing a task from scratch.
+ * 
+ * The methods on this interface will change in subsequent releases. Because this is
+ * marked as beta, the normal deprecation policy for these methods does not apply.
+ * 
+ * @author aled
+ */
+@Beta
+public interface TaskInternal<T> extends Task<T> {
+    
+    /** sets the internal future object used to record the association to a job submitted to an {@link ExecutorService} */
+    void initInternalFuture(ListenableFuture<T> result);
+
+    /** returns the underlying future where this task's results will come in; see {@link #initInternalFuture(ListenableFuture)} */
+    Future<T> getInternalFuture();
+    
+    /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here;
+     * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */
+    long getQueuedTimeUtc();
+    
+    boolean isQueuedOrSubmitted();
+    boolean isQueuedAndNotSubmitted();
+    boolean isQueued();
+
+    /** marks the task as queued for execution */
+    void markQueued();
+
+    boolean cancel();
+    
+    boolean blockUntilStarted(Duration timeout);
+
+    /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait,
+     * and typically cleared immediately afterwards; referenced by management api to inspect a task
+     * which is blocking
+     * <p>
+     * returns previous details, in case caller wishes to recall and restore it (e.g. if it is doing a sub-blocking)
+     */
+    String setBlockingDetails(String blockingDetails);
+
+    /** as {@link #setBlockingDetails(String)} but records a task which is blocking,
+     * for use e.g. in a gui to navigate to the current active subtask
+     * <p>
+     * returns previous blocking task, in case caller wishes to recall and restore it
+     */
+    Task<?> setBlockingTask(Task<?> blockingTask);
+    
+    void resetBlockingDetails();
+    
+    void resetBlockingTask();
+
+    /** returns a textual message giving details while the task is blocked */
+    String getBlockingDetails();
+    
+    /** returns a task that this task is blocked on */
+    Task<?> getBlockingTask();
+    
+    void setExtraStatusText(Object extraStatus);
+    
+    Object getExtraStatusText();
+
+    void runListeners();
+
+    void setEndTimeUtc(long val);
+
+    void setThread(Thread thread);
+
+    Callable<T> getJob();
+    
+    void setJob(Callable<T> job);
+
+    ExecutionList getListeners();
+
+    void setSubmitTimeUtc(long currentTimeMillis);
+
+    void setSubmittedByTask(Task<?> task);
+    
+    Set<Object> getMutableTags();
+
+    void setStartTimeUtc(long currentTimeMillis);
+
+    void applyTagModifier(Function<Set<Object>,Void> modifier);
+    
+    /** if a task is a proxy for another one (used mainly for internal tasks),
+     * this returns the "real" task represented by this one */
+    Task<?> getProxyTarget();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java
new file mode 100644
index 0000000..7c5d8a2
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.brooklyn.api.management.Task;
+
+/**
+ * The scheduler is an internal mechanism to decorate {@link Task}s.
+ *
+ * It can control how the tasks are scheduled for execution (e.g. single-threaded execution,
+ * prioritised, etc).
+ */
+public interface TaskScheduler {
+    
+    public void injectExecutor(ExecutorService executor);
+
+    /**
+     * Called by {@link BasicExecutionManager} to schedule tasks.
+     */
+    public <T> Future<T> submit(Callable<T> c);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java
new file mode 100644
index 0000000..e404e87
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+
+import com.google.common.base.Function;
+
+public class TaskTags {
+
+    /** marks a task which is allowed to fail without failing his parent */
+    public static final String INESSENTIAL_TASK = "inessential";
+
+    /** marks a task which is a subtask of another */
+    public static final String SUB_TASK_TAG = "SUB-TASK";
+
+    public static void addTagDynamically(TaskAdaptable<?> task, final Object tag) {
+        ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() {
+            public Void apply(@Nullable Set<Object> input) {
+                input.add(tag);
+                return null;
+            }
+        });
+    }
+    
+    public static void addTagsDynamically(TaskAdaptable<?> task, final Object tag1, final Object ...tags) {
+        ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() {
+            public Void apply(@Nullable Set<Object> input) {
+                input.add(tag1);
+                for (Object tag: tags) input.add(tag);
+                return null;
+            }
+        });
+    }
+
+    
+    public static boolean isInessential(Task<?> task) {
+        return hasTag(task, INESSENTIAL_TASK);
+    }
+
+    public static boolean hasTag(Task<?> task, Object tag) {
+        return task.getTags().contains(tag);
+    }
+    
+    public static <U,V extends TaskAdaptable<U>> V markInessential(V task) {
+        addTagDynamically(task, INESSENTIAL_TASK);
+        return task;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java b/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java
new file mode 100644
index 0000000..d391a90
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.api.management.TaskFactory;
+import org.apache.brooklyn.api.management.TaskQueueingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.ReferenceWithError;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.time.CountdownTimer;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+
+public class Tasks {
+    
+    private static final Logger log = LoggerFactory.getLogger(Tasks.class);
+    
+    /** convenience for setting "blocking details" on any task where the current thread is running;
+     * typically invoked prior to a wait, for transparency to a user;
+     * then invoked with 'null' just after the wait */
+    public static String setBlockingDetails(String description) {
+        Task<?> current = current();
+        if (current instanceof TaskInternal)
+            return ((TaskInternal<?>)current).setBlockingDetails(description);
+        return null;
+    }
+    public static void resetBlockingDetails() {
+        Task<?> current = current();
+        if (current instanceof TaskInternal)
+            ((TaskInternal<?>)current).resetBlockingDetails(); 
+    }
+    public static Task<?> setBlockingTask(Task<?> blocker) {
+        Task<?> current = current();
+        if (current instanceof TaskInternal)
+            return ((TaskInternal<?>)current).setBlockingTask(blocker);
+        return null;
+    }
+    public static void resetBlockingTask() {
+        Task<?> current = current();
+        if (current instanceof TaskInternal)
+            ((TaskInternal<?>)current).resetBlockingTask(); 
+    }
+    
+    /** convenience for setting "blocking details" on any task where the current thread is running,
+     * while the passed code is executed; often used from groovy as
+     * <pre>{@code withBlockingDetails("sleeping 5s") { Thread.sleep(5000); } }</pre>
+     * If code block is null, the description is set until further notice (not cleareed). */
+    @SuppressWarnings("rawtypes")
+    public static <T> T withBlockingDetails(String description, Callable<T> code) throws Exception {
+        Task current = current();
+        if (code==null) {
+            log.warn("legacy invocation of withBlockingDetails with null code block, ignoring");
+            return null;
+        }
+        String prevBlockingDetails = null;
+        if (current instanceof TaskInternal) {
+            prevBlockingDetails = ((TaskInternal)current).setBlockingDetails(description);
+        } 
+        try {
+            return code.call();
+        } finally {
+            if (current instanceof TaskInternal)
+                ((TaskInternal)current).setBlockingDetails(prevBlockingDetails); 
+        }
+    }
+
+    /** the {@link Task} where the current thread is executing, if executing in a Task, otherwise null;
+     * if the current task is a proxy, this returns the target of that proxy */
+    @SuppressWarnings("rawtypes")
+    public static Task current() { 
+        return getFinalProxyTarget(BasicExecutionManager.getPerThreadCurrentTask().get());
+    }
+
+    public static Task<?> getFinalProxyTarget(Task<?> task) {
+        if (task==null) return null;
+        Task<?> proxy = ((TaskInternal<?>)task).getProxyTarget();
+        if (proxy==null || proxy.equals(task)) return task;
+        return getFinalProxyTarget(proxy);
+    }
+    
+    /** creates a {@link ValueResolver} instance which allows significantly more customization than
+     * the various {@link #resolveValue(Object, Class, ExecutionContext)} methods here */
+    public static <T> ValueResolver<T> resolving(Object v, Class<T> type) {
+        return new ValueResolver<T>(v, type);
+    }
+
+    public static ValueResolver.ResolverBuilderPretype resolving(Object v) {
+        return new ValueResolver.ResolverBuilderPretype(v);
+    }
+
+    /** @see #resolveValue(Object, Class, ExecutionContext, String) */
+    public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec) throws ExecutionException, InterruptedException {
+        return new ValueResolver<T>(v, type).context(exec).get();
+    }
+    
+    /** attempt to resolve the given value as the given type, waiting on futures, submitting if necessary,
+     * and coercing as allowed by TypeCoercions;
+     * contextMessage (optional) will be displayed in status reports while it waits (e.g. the name of the config key being looked up).
+     * if no execution context supplied (null) this method will throw an exception if the object is an unsubmitted task */
+    public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException {
+        return new ValueResolver<T>(v, type).context(exec).description(contextMessage).get();
+    }
+    
+    /**
+     * @see #resolveDeepValue(Object, Class, ExecutionContext, String)
+     */
+    public static Object resolveDeepValue(Object v, Class<?> type, ExecutionContext exec) throws ExecutionException, InterruptedException {
+        return resolveDeepValue(v, type, exec, null);
+    }
+
+    /**
+     * Resolves the given object, blocking on futures and coercing it to the given type. If the object is a 
+     * map or iterable (or a list of map of maps, etc, etc) then walks these maps/iterables to convert all of 
+     * their values to the given type. For example, the following will return a list containing a map with "1"="true":
+     * 
+     *   {@code Object result = resolveDeepValue(ImmutableList.of(ImmutableMap.of(1, true)), String.class, exec)} 
+     *
+     * To perform a deep conversion of futures contained within Iterables or Maps without coercion of each element,
+     * the type should normally be Object, not the type of the collection. This differs from
+     * {@link #resolveValue(Object, Class, ExecutionContext, String)} which will accept Map and Iterable
+     * as the required type.
+     */
+    public static <T> T resolveDeepValue(Object v, Class<T> type, ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException {
+        return new ValueResolver<T>(v, type).context(exec).deep(true).description(contextMessage).get();
+    }
+
+    /** sets extra status details on the current task, if possible (otherwise does nothing).
+     * the extra status is presented in Task.getStatusDetails(true)
+     */
+    public static void setExtraStatusDetails(String notes) {
+        Task<?> current = current();
+        if (current instanceof TaskInternal)
+            ((TaskInternal<?>)current).setExtraStatusText(notes); 
+    }
+
+    public static <T> TaskBuilder<T> builder() {
+        return TaskBuilder.<T>builder();
+    }
+    
+    private static Task<?>[] asTasks(TaskAdaptable<?> ...tasks) {
+        Task<?>[] result = new Task<?>[tasks.length];
+        for (int i=0; i<tasks.length; i++)
+            result[i] = tasks[i].asTask();
+        return result;
+    }
+
+    public static Task<List<?>> parallel(TaskAdaptable<?> ...tasks) {
+        return parallelInternal("parallelised tasks", asTasks(tasks));
+    }
+    public static Task<List<?>> parallel(String name, TaskAdaptable<?> ...tasks) {
+        return parallelInternal(name, asTasks(tasks));
+    }
+    public static Task<List<?>> parallel(Iterable<? extends TaskAdaptable<?>> tasks) {
+        return parallel(asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
+    }
+    public static Task<List<?>> parallel(String name, Iterable<? extends TaskAdaptable<?>> tasks) {
+        return parallelInternal(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
+    }
+    private static Task<List<?>> parallelInternal(String name, Task<?>[] tasks) {
+        return Tasks.<List<?>>builder().name(name).parallel(true).add(tasks).build();
+    }
+
+    public static Task<List<?>> sequential(TaskAdaptable<?> ...tasks) {
+        return sequentialInternal("sequential tasks", asTasks(tasks));
+    }
+    public static Task<List<?>> sequential(String name, TaskAdaptable<?> ...tasks) {
+        return sequentialInternal(name, asTasks(tasks));
+    }
+    public static TaskFactory<?> sequential(TaskFactory<?> ...taskFactories) {
+        return sequentialInternal("sequential tasks", taskFactories);
+    }
+    public static TaskFactory<?> sequential(String name, TaskFactory<?> ...taskFactories) {
+        return sequentialInternal(name, taskFactories);
+    }
+    public static Task<List<?>> sequential(List<? extends TaskAdaptable<?>> tasks) {
+        return sequential(asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
+    }
+    public static Task<List<?>> sequential(String name, List<? extends TaskAdaptable<?>> tasks) {
+        return sequential(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
+    }
+    private static Task<List<?>> sequentialInternal(String name, Task<?>[] tasks) {
+        return Tasks.<List<?>>builder().name(name).parallel(false).add(tasks).build();
+    }
+    private static TaskFactory<?> sequentialInternal(final String name, final TaskFactory<?> ...taskFactories) {
+        return new TaskFactory<TaskAdaptable<?>>() {
+            @Override
+            public TaskAdaptable<?> newTask() {
+                TaskBuilder<List<?>> tb = Tasks.<List<?>>builder().name(name).parallel(false);
+                for (TaskFactory<?> tf: taskFactories)
+                    tb.add(tf.newTask().asTask());
+                return tb.build();
+            }
+        };
+    }
+
+    /** returns the first tag found on the given task which matches the given type, looking up the submission hierarachy if necessary */
+    @SuppressWarnings("unchecked")
+    public static <T> T tag(@Nullable Task<?> task, Class<T> type, boolean recurseHierarchy) {
+        // support null task to make it easier for callers to walk hierarchies
+        if (task==null) return null;
+        for (Object tag: task.getTags())
+            if (type.isInstance(tag)) return (T)tag;
+        if (!recurseHierarchy) return null;
+        return tag(task.getSubmittedByTask(), type, true);
+    }
+    
+    public static boolean isAncestorCancelled(Task<?> t) {
+        if (t==null) return false;
+        if (t.isCancelled()) return true;
+        return isAncestorCancelled(t.getSubmittedByTask());
+    }
+
+    public static boolean isQueued(TaskAdaptable<?> task) {
+        return ((TaskInternal<?>)task.asTask()).isQueued();
+    }
+
+    public static boolean isSubmitted(TaskAdaptable<?> task) {
+        return ((TaskInternal<?>)task.asTask()).isSubmitted();
+    }
+    
+    public static boolean isQueuedOrSubmitted(TaskAdaptable<?> task) {
+        return ((TaskInternal<?>)task.asTask()).isQueuedOrSubmitted();
+    }
+    
+    /**
+     * Adds the given task to the given context. Does not throw an exception if the addition fails.
+     * @return true if the task was added, false otherwise.
+     */
+    public static boolean tryQueueing(TaskQueueingContext adder, TaskAdaptable<?> task) {
+        if (task==null || isQueued(task))
+            return false;
+        try {
+            adder.queue(task.asTask());
+            return true;
+        } catch (Exception e) {
+            if (log.isDebugEnabled())
+                log.debug("Could not add task "+task+" at "+adder+": "+e);
+            return false;
+        }        
+    }
+    
+    /** see also {@link #resolving(Object)} which gives much more control about submission, timeout, etc */
+    public static <T> Supplier<T> supplier(final TaskAdaptable<T> task) {
+        return new Supplier<T>() {
+            @Override
+            public T get() {
+                return task.asTask().getUnchecked();
+            }
+        };
+    }
+    
+    /** return all children tasks of the given tasks, if it has children, else empty list */
+    public static Iterable<Task<?>> children(Task<?> task) {
+        if (task instanceof HasTaskChildren)
+            return ((HasTaskChildren)task).getChildren();
+        return Collections.emptyList();
+    }
+    
+    /** returns failed tasks */
+    public static Iterable<Task<?>> failed(Iterable<Task<?>> subtasks) {
+        return Iterables.filter(subtasks, new Predicate<Task<?>>() {
+            @Override
+            public boolean apply(Task<?> input) {
+                return input.isError();
+            }
+        });
+    }
+    
+    /** returns the task, its children, and all its children, and so on;
+     * @param root task whose descendants should be iterated
+     * @param parentFirst whether to put parents before children or after
+     */
+    public static Iterable<Task<?>> descendants(Task<?> root, final boolean parentFirst) {
+        Iterable<Task<?>> descs = Iterables.concat(Iterables.transform(Tasks.children(root), new Function<Task<?>,Iterable<Task<?>>>() {
+            @Override
+            public Iterable<Task<?>> apply(Task<?> input) {
+                return descendants(input, parentFirst);
+            }
+        }));
+        if (parentFirst) return Iterables.concat(Collections.singleton(root), descs);
+        else return Iterables.concat(descs, Collections.singleton(root));
+    }
+
+    /** returns the error thrown by the task if {@link Task#isError()}, or null if no error or not done */
+    public static Throwable getError(Task<?> t) {
+        if (t==null) return null;
+        if (!t.isDone()) return null;
+        if (t.isCancelled()) return new CancellationException();
+        try {
+            t.get();
+            return null;
+        } catch (Throwable error) {
+            // do not propagate as we are pretty much guaranteed above that it wasn't this
+            // thread which originally threw the error
+            return error;
+        }
+    }
+    public static Task<Void> fail(final String name, final Throwable optionalError) {
+        return Tasks.<Void>builder().dynamic(false).name(name).body(new Runnable() { public void run() { 
+            if (optionalError!=null) throw Exceptions.propagate(optionalError); else throw new RuntimeException("Failed: "+name);
+        } }).build();
+    }
+    public static Task<Void> warning(final String message, final Throwable optionalError) {
+        log.warn(message);
+        return TaskTags.markInessential(fail(message, optionalError));
+    }
+
+    /** marks the current task inessential; this mainly matters if the task is running in a parent
+     * {@link TaskQueueingContext} and we don't want the parent to fail if this task fails
+     * <p>
+     * no-op (silently ignored) if not in a task */
+    public static void markInessential() {
+        Task<?> task = Tasks.current();
+        if (task==null) {
+            TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
+            if (qc!=null) task = qc.asTask();
+        }
+        if (task!=null) {
+            TaskTags.markInessential(task);
+        }
+    }
+    
+    /** causes failures in subtasks of the current task not to fail the parent;
+     * no-op if not in a {@link TaskQueueingContext}.
+     * <p>
+     * essentially like a {@link #markInessential()} on all tasks in the current 
+     * {@link TaskQueueingContext}, including tasks queued subsequently */
+    @Beta
+    public static void swallowChildrenFailures() {
+        Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here");
+        TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
+        if (qc!=null) {
+            qc.swallowChildrenFailures();
+        }
+    }
+
+    /** as {@link TaskTags#addTagDynamically(TaskAdaptable, Object)} but for current task, skipping if no current task */
+    public static void addTagDynamically(Object tag) {
+        Task<?> t = Tasks.current();
+        if (t!=null) TaskTags.addTagDynamically(t, tag);
+    }
+    
+    /** 
+     * Workaround for limitation described at {@link Task#cancel(boolean)};
+     * internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation.
+     * <p> 
+     * It is irritating that {@link FutureTask} sync's object clears the runner thread, 
+     * so even if {@link BasicTask#getInternalFuture()} is used, there is no means of determining if the underlying object is done.
+     * The {@link Task#getEndTimeUtc()} seems the only way.
+     *  
+     * @return true if tasks ended; false if timed out
+     **/ 
+    @Beta
+    public static boolean blockUntilInternalTasksEnded(Task<?> t, Duration timeout) {
+        CountdownTimer timer = timeout.countdownTimer();
+        
+        if (t==null)
+            return true;
+        
+        if (t instanceof ScheduledTask) {
+            boolean result = ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining());
+            if (!result) return false;
+        }
+
+        t.blockUntilEnded(timer.getDurationRemaining());
+        
+        while (true) {
+            if (t.getEndTimeUtc()>=0) return true;
+            // above should be sufficient; but just in case, trying the below
+            Thread tt = t.getThread();
+            if (t instanceof ScheduledTask) {
+                ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining());
+                return true;
+            } else {
+                if (tt==null || !tt.isAlive()) {
+                    if (!t.isCancelled()) {
+                        // may happen for a cancelled task, interrupted after submit but before start
+                        log.warn("Internal task thread is dead or null ("+tt+") but task not ended: "+t.getEndTimeUtc()+" ("+t+")");
+                    }
+                    return true;
+                }
+            }
+            if (timer.isExpired())
+                return false;
+            Time.sleep(Repeater.DEFAULT_REAL_QUICK_PERIOD);
+        }
+    }
+    
+    /** returns true if either the current thread or the current task is interrupted/cancelled */
+    public static boolean isInterrupted() {
+        if (Thread.currentThread().isInterrupted()) return true;
+        Task<?> t = current();
+        if (t==null) return false;
+        return t.isCancelled();
+    }
+
+    private static class WaitForRepeaterCallable implements Callable<Boolean> {
+        protected Repeater repeater;
+        protected boolean requireTrue;
+
+        public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) {
+            this.repeater = repeater;
+            this.requireTrue = requireTrue;
+        }
+
+        @Override
+        public Boolean call() {
+            ReferenceWithError<Boolean> result;
+            Tasks.setBlockingDetails(repeater.getDescription());
+            try {
+               result = repeater.runKeepingError();
+            } finally {
+                Tasks.resetBlockingDetails();
+            }
+
+            if (Boolean.TRUE.equals(result.getWithoutError()))
+                return true;
+            if (result.hasError()) 
+                throw Exceptions.propagate(result.getError());
+            if (requireTrue)
+                throw new IllegalStateException("timeout - "+repeater.getDescription());
+            return false;
+        }
+    }
+
+    /** @return a {@link TaskBuilder} which tests whether the repeater terminates with success in its configured timeframe,
+     * returning true or false depending on whether repeater succeed */
+    public static TaskBuilder<Boolean> testing(Repeater repeater) {
+        return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, false))
+            .name("waiting for condition")
+            .description("Testing whether " + getTimeoutString(repeater) + ": "+repeater.getDescription());
+    }
+
+    /** @return a {@link TaskBuilder} which requires that the repeater terminate with success in its configured timeframe,
+     * throwing if it does not */
+    public static TaskBuilder<?> requiring(Repeater repeater) {
+        return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, true))
+            .name("waiting for condition")
+            .description("Requiring " + getTimeoutString(repeater) + ": " + repeater.getDescription());
+    }
+    
+    private static String getTimeoutString(Repeater repeater) {
+        Duration timeout = repeater.getTimeLimit();
+        if (timeout==null || Duration.PRACTICALLY_FOREVER.equals(timeout))
+            return "eventually";
+        return "in "+timeout;
+    }
+
+}