You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:00:45 UTC

[30/64] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/util

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ForwardingTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/ForwardingTask.java b/core/src/main/java/brooklyn/util/task/ForwardingTask.java
deleted file mode 100644
index 3bc3427..0000000
--- a/core/src/main/java/brooklyn/util/task/ForwardingTask.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.brooklyn.api.management.Task;
-
-import brooklyn.util.time.Duration;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ForwardingObject;
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
-
-public abstract class ForwardingTask<T> extends ForwardingObject implements TaskInternal<T> {
-
-    /** Constructor for use by subclasses. */
-    protected ForwardingTask() {}
-
-    @Override
-    protected abstract TaskInternal<T> delegate();
-
-    @Override
-    public void addListener(Runnable listener, Executor executor) {
-        delegate().addListener(listener, executor);
-    }
-
-    @Override
-    public boolean cancel(boolean arg0) {
-        return delegate().cancel(arg0);
-    }
-
-    @Override
-    public T get() throws InterruptedException, ExecutionException {
-        return delegate().get();
-    }
-
-    @Override
-    public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
-        return delegate().get(arg0, arg1);
-    }
-
-    @Override
-    public boolean isCancelled() {
-        return delegate().isCancelled();
-    }
-
-    @Override
-    public boolean isDone() {
-        return delegate().isDone();
-    }
-
-    @Override
-    public Task<T> asTask() {
-        return delegate().asTask();
-    }
-
-    @Override
-    public String getId() {
-        return delegate().getId();
-    }
-
-    @Override
-    public Set<Object> getTags() {
-        return delegate().getTags();
-    }
-
-    @Override
-    public long getSubmitTimeUtc() {
-        return delegate().getSubmitTimeUtc();
-    }
-
-    @Override
-    public long getStartTimeUtc() {
-        return delegate().getStartTimeUtc();
-    }
-
-    @Override
-    public long getEndTimeUtc() {
-        return delegate().getEndTimeUtc();
-    }
-
-    @Override
-    public String getDisplayName() {
-        return delegate().getDisplayName();
-    }
-
-    @Override
-    public String getDescription() {
-        return delegate().getDescription();
-    }
-
-    @Override
-    public Task<?> getSubmittedByTask() {
-        return delegate().getSubmittedByTask();
-    }
-
-    @Override
-    public Thread getThread() {
-        return delegate().getThread();
-    }
-
-    @Override
-    public boolean isSubmitted() {
-        return delegate().isSubmitted();
-    }
-
-    @Override
-    public boolean isBegun() {
-        return delegate().isBegun();
-    }
-
-    @Override
-    public boolean isError() {
-        return delegate().isError();
-    }
-
-    @Override
-    public void blockUntilStarted() {
-        delegate().blockUntilStarted();
-    }
-
-    @Override
-    public void blockUntilEnded() {
-        delegate().blockUntilEnded();
-    }
-
-    @Override
-    public boolean blockUntilEnded(Duration timeout) {
-        return delegate().blockUntilEnded(timeout);
-    }
-
-    @Override
-    public String getStatusSummary() {
-        return delegate().getStatusSummary();
-    }
-
-    @Override
-    public String getStatusDetail(boolean multiline) {
-        return delegate().getStatusDetail(multiline);
-    }
-
-    @Override
-    public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException {
-        return delegate().get(duration);
-    }
-
-    @Override
-    public T getUnchecked() {
-        return delegate().getUnchecked();
-    }
-
-    @Override
-    public T getUnchecked(Duration duration) {
-        return delegate().getUnchecked(duration);
-    }
-
-    @Override
-    public void initInternalFuture(ListenableFuture<T> result) {
-        delegate().initInternalFuture(result);
-    }
-
-    @Override
-    public long getQueuedTimeUtc() {
-        return delegate().getQueuedTimeUtc();
-    }
-
-    @Override
-    public Future<T> getInternalFuture() {
-        return delegate().getInternalFuture();
-    }
-
-    @Override
-    public boolean isQueued() {
-        return delegate().isQueued();
-    }
-
-    @Override
-    public boolean isQueuedOrSubmitted() {
-        return delegate().isQueuedOrSubmitted();
-    }
-
-    @Override
-    public boolean isQueuedAndNotSubmitted() {
-        return delegate().isQueuedAndNotSubmitted();
-    }
-
-    @Override
-    public void markQueued() {
-        delegate().markQueued();
-    }
-
-    @Override
-    public boolean cancel() {
-        return delegate().cancel();
-    }
-
-    @Override
-    public boolean blockUntilStarted(Duration timeout) {
-        return delegate().blockUntilStarted(timeout);
-    }
-
-    @Override
-    public String setBlockingDetails(String blockingDetails) {
-        return delegate().setBlockingDetails(blockingDetails);
-    }
-
-    @Override
-    public Task<?> setBlockingTask(Task<?> blockingTask) {
-        return delegate().setBlockingTask(blockingTask);
-    }
-
-    @Override
-    public void resetBlockingDetails() {
-        delegate().resetBlockingDetails();
-    }
-
-    @Override
-    public void resetBlockingTask() {
-        delegate().resetBlockingTask();
-    }
-
-    @Override
-    public String getBlockingDetails() {
-        return delegate().getBlockingDetails();
-    }
-
-    @Override
-    public Task<?> getBlockingTask() {
-        return delegate().getBlockingTask();
-    }
-
-    @Override
-    public void setExtraStatusText(Object extraStatus) {
-        delegate().setExtraStatusText(extraStatus);
-    }
-
-    @Override
-    public Object getExtraStatusText() {
-        return delegate().getExtraStatusText();
-    }
-
-    @Override
-    public void runListeners() {
-        delegate().runListeners();
-    }
-
-    @Override
-    public void setEndTimeUtc(long val) {
-        delegate().setEndTimeUtc(val);
-    }
-
-    @Override
-    public void setThread(Thread thread) {
-        delegate().setThread(thread);
-    }
-
-    @Override
-    public Callable<T> getJob() {
-        return delegate().getJob();
-    }
-
-    @Override
-    public void setJob(Callable<T> job) {
-        delegate().setJob(job);
-    }
-
-    @Override
-    public ExecutionList getListeners() {
-        return delegate().getListeners();
-    }
-
-    @Override
-    public void setSubmitTimeUtc(long currentTimeMillis) {
-        delegate().setSubmitTimeUtc(currentTimeMillis);
-    }
-
-    @Override
-    public void setSubmittedByTask(Task<?> task) {
-        delegate().setSubmittedByTask(task);
-    }
-
-    @Override
-    public Set<Object> getMutableTags() {
-        return delegate().getMutableTags();
-    }
-
-    @Override
-    public void setStartTimeUtc(long currentTimeMillis) {
-        delegate().setStartTimeUtc(currentTimeMillis);
-    }
-
-    @Override
-    public void applyTagModifier(Function<Set<Object>, Void> modifier) {
-        delegate().applyTagModifier(modifier);
-    }
-    
-    @Override
-    public Task<?> getProxyTarget() {
-        return delegate().getProxyTarget();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java b/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java
deleted file mode 100644
index 8111332..0000000
--- a/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the resposibility to:
- * <li> invoke the listeners on job completion (success or error)
- * <li> invoke the listeners on cancel */
-public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> {
-
-    final ExecutionList listeners;
-    
-    protected ListenableForwardingFuture(Future<T> delegate) {
-        super(delegate);
-        this.listeners = new ExecutionList();
-    }
-
-    protected ListenableForwardingFuture(Future<T> delegate, ExecutionList list) {
-        super(delegate);
-        this.listeners = list;
-    }
-
-    @Override
-    public void addListener(Runnable listener, Executor executor) {
-        listeners.add(listener, executor);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ParallelTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/ParallelTask.java b/core/src/main/java/brooklyn/util/task/ParallelTask.java
deleted file mode 100644
index d6e65ab..0000000
--- a/core/src/main/java/brooklyn/util/task/ParallelTask.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.brooklyn.api.management.Task;
-
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.text.Strings;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-/**
- * Runs {@link Task}s in parallel.
- *
- * No guarantees of order of starting the tasks, but the return value is a
- * {@link List} of the return values of supplied tasks in the same
- * order they were passed as arguments.
- */
-public class ParallelTask<T> extends CompoundTask<T> {
-    public ParallelTask(Object... tasks) { super(tasks); }
-    
-    public ParallelTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
-    public ParallelTask(Collection<? extends Object> tasks) { super(tasks); }
-    
-    public ParallelTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
-    public ParallelTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
-
-    @Override
-    protected List<T> runJobs() throws InterruptedException, ExecutionException {
-        setBlockingDetails("Executing "+
-                (children.size()==1 ? "1 child task" :
-                children.size()+" children tasks in parallel") );
-        for (Task<? extends T> task : children) {
-            submitIfNecessary(task);
-        }
-
-        List<T> result = Lists.newArrayList();
-        List<Exception> exceptions = Lists.newArrayList();
-        for (Task<? extends T> task : children) {
-            T x;
-            try {
-                x = task.get();
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                if (TaskTags.isInessential(task)) {
-                    // ignore exception as it's inessential
-                } else {
-                    exceptions.add(e);
-                }
-                x = null;
-            }
-            result.add(x);
-        }
-        
-        if (exceptions.isEmpty()) {
-            return result;
-        } else {
-            if (result.size()==1 && exceptions.size()==1)
-                throw Exceptions.propagate( exceptions.get(0) );
-            throw Exceptions.propagate(exceptions.size()+" of "+result.size()+" parallel child task"+Strings.s(result.size())+" failed", exceptions);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ScheduledTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/ScheduledTask.java b/core/src/main/java/brooklyn/util/task/ScheduledTask.java
deleted file mode 100644
index eabff49..0000000
--- a/core/src/main/java/brooklyn/util/task/ScheduledTask.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import static brooklyn.util.GroovyJavaMethods.elvis;
-import static brooklyn.util.GroovyJavaMethods.truth;
-
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.management.Task;
-
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.time.Duration;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Throwables;
-
-/**
- * A task which runs with a fixed period.
- * <p>
- * Note that some termination logic, including {@link #addListener(Runnable, java.util.concurrent.Executor)},
- * is not precisely defined. 
- */
-// TODO ScheduledTask is a very pragmatic implementation; would be nice to tighten, 
-// reduce external assumptions about internal structure, and clarify "done" semantics
-public class ScheduledTask extends BasicTask {
-    
-    final Callable<Task<?>> taskFactory;
-    /** initial delay before running, set as flag in constructor; defaults to 0 */
-    protected Duration delay;
-    /** time to wait between executions, or null if not to repeat (default), set as flag to constructor;
-     * this may be modified for subsequent submissions by a running task generated by the factory 
-     * using getSubmittedByTask().setPeriod(Duration) */
-    protected Duration period = null;
-    /** optional, set as flag in constructor; defaults to null meaning no limit */
-    protected Integer maxIterations = null;
-    
-    protected int runCount=0;
-    protected Task<?> recentRun, nextRun;
-
-    public int getRunCount() { return runCount; }
-    public ScheduledFuture<?> getNextScheduled() { return (ScheduledFuture<?>)internalFuture; }
-
-    public ScheduledTask(Callable<Task<?>> taskFactory) {
-        this(MutableMap.of(), taskFactory);
-    }
-
-    public ScheduledTask(final Task<?> task) {
-        this(MutableMap.of(), task);
-    }
-
-    public ScheduledTask(Map flags, final Task<?> task){
-        this(flags, new Callable<Task<?>>(){
-            @Override
-            public Task<?> call() throws Exception {
-                return task;
-            }});
-    }
-
-    public ScheduledTask(Map flags, Callable<Task<?>> taskFactory) {
-        super(flags);
-        this.taskFactory = taskFactory;
-        
-        delay = Duration.of(elvis(flags.remove("delay"), 0));
-        period = Duration.of(elvis(flags.remove("period"), null));
-        maxIterations = elvis(flags.remove("maxIterations"), null);
-    }
-    
-    public ScheduledTask delay(Duration d) {
-        this.delay = d;
-        return this;
-    }
-    public ScheduledTask delay(long val) {
-        return delay(Duration.millis(val));
-    }
-
-    public ScheduledTask period(Duration d) {
-        this.period = d;
-        return this;
-    }
-    public ScheduledTask period(long val) {
-        return period(Duration.millis(val));
-    }
-
-    public ScheduledTask maxIterations(int val) {
-        this.maxIterations = val;
-        return this;
-    }
-
-    public Callable<Task<?>> getTaskFactory() {
-        return taskFactory;
-    }
-
-    public Task<?> newTask() {
-        try {
-            return taskFactory.call();
-        } catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
-    
-    protected String getActiveTaskStatusString(int verbosity) {
-        StringBuilder rv = new StringBuilder("Scheduler");
-        if (runCount>0) rv.append(", iteration "+(runCount+1));
-        if (recentRun!=null) rv.append(", last run "+
-            Duration.sinceUtc(recentRun.getStartTimeUtc())+" ms ago");
-        if (truth(getNextScheduled())) {
-            Duration untilNext = Duration.millis(getNextScheduled().getDelay(TimeUnit.MILLISECONDS));
-            if (untilNext.isPositive())
-                rv.append(", next in "+untilNext);
-            else 
-                rv.append(", next imminent");
-        }
-        return rv.toString();
-    }
-    
-    @Override
-    public boolean isDone() {
-        return isCancelled() || (maxIterations!=null && maxIterations <= runCount) || (period==null && nextRun!=null && nextRun.isDone());
-    }
-    
-    public synchronized void blockUntilFirstScheduleStarted() {
-        // TODO Assumes that maxIterations is not negative!
-        while (true) {
-            if (isCancelled()) throw new CancellationException();
-            if (recentRun==null)
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    Throwables.propagate(e);
-                }
-            if (recentRun!=null) return;
-        }
-    }
-    
-    public void blockUntilEnded() {
-        while (!isDone()) super.blockUntilEnded();
-    }
-
-    /** gets the value of the most recently run task */
-    public Object get() throws InterruptedException, ExecutionException {
-        blockUntilStarted();
-        blockUntilFirstScheduleStarted();
-        return (truth(recentRun)) ? recentRun.get() : internalFuture.get();
-    }
-    
-    @Override
-    public synchronized boolean cancel(boolean mayInterrupt) {
-        boolean result = super.cancel(mayInterrupt);
-        if (nextRun!=null) {
-            nextRun.cancel(mayInterrupt);
-            notifyAll();
-        }
-        return result;
-    }
-    
-    /** internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation 
-     * @param duration */ 
-    @Beta
-    public boolean blockUntilNextRunFinished(Duration timeout) {
-        return Tasks.blockUntilInternalTasksEnded(nextRun, timeout);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/SequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/SequentialTask.java b/core/src/main/java/brooklyn/util/task/SequentialTask.java
deleted file mode 100644
index e739eb0..0000000
--- a/core/src/main/java/brooklyn/util/task/SequentialTask.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.brooklyn.api.management.Task;
-
-import com.google.common.collect.ImmutableList;
-
-
-/** runs tasks in order, waiting for one to finish before starting the next; return value here is TBD;
- * (currently is all the return values of individual tasks, but we
- * might want some pipeline support and eventually only to return final value...) */
-public class SequentialTask<T> extends CompoundTask<T> {
-
-    public SequentialTask(Object... tasks) { super(tasks); }
-    
-    public SequentialTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
-    public SequentialTask(Collection<? extends Object> tasks) { super(tasks); }
-    
-    public SequentialTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
-    public SequentialTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
-    
-    protected List<T> runJobs() throws InterruptedException, ExecutionException {
-        setBlockingDetails("Executing "+
-                (children.size()==1 ? "1 child task" :
-                children.size()+" children tasks sequentially") );
-
-        List<T> result = new ArrayList<T>();
-        for (Task<? extends T> task : children) {
-            submitIfNecessary(task);
-            // throw exception (and cancel subsequent tasks) on error
-            result.add(task.get());
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java b/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
deleted file mode 100644
index a48bac8..0000000
--- a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Queue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.brooklyn.api.management.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Instances of this class ensures that {@link Task}s execute with in-order
- * single-threaded semantics.
- *
- * Tasks can be presented through {@link #submit(Callable)}. The order of execution is the
- * sumbission order.
- * <p>
- * This implementation does so by blocking on a {@link ConcurrentLinkedQueue}, <em>after</em>
- * the task is started in a thread (and {@link Task#isBegun()} returns true), but (of course)
- * <em>before</em> the {@link TaskInternal#getJob()} actually gets invoked.
- */
-public class SingleThreadedScheduler implements TaskScheduler, CanSetName {
-    private static final Logger LOG = LoggerFactory.getLogger(SingleThreadedScheduler.class);
-    
-    private final Queue<QueuedSubmission<?>> order = new ConcurrentLinkedQueue<QueuedSubmission<?>>();
-    private int queueSize = 0;
-    private final AtomicBoolean running = new AtomicBoolean(false);
-    
-    private ExecutorService executor;
-
-    private String name;
-    
-    @Override
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    @Override
-    public String toString() {
-        return name!=null ? "SingleThreadedScheduler["+name+"]" : super.toString();
-    }
-    
-    @Override
-    public void injectExecutor(ExecutorService executor) {
-        this.executor = executor;
-    }
-
-    @Override
-    public synchronized <T> Future<T> submit(Callable<T> c) {
-        if (running.compareAndSet(false, true)) {
-            return executeNow(c);
-        } else {
-            WrappingFuture<T> f = new WrappingFuture<T>();
-            order.add(new QueuedSubmission<T>(c, f));
-            queueSize++;
-            if (queueSize>0 && (queueSize == 50 || (queueSize<=500 && (queueSize%100)==0) || (queueSize%1000)==0) && queueSize!=lastSizeWarn) {
-                LOG.warn("{} is backing up, {} tasks queued", this, queueSize);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Task queue backing up detail, queue "+this+"; task context is "+Tasks.current()+"; latest task is "+c+"; first task is "+order.peek());
-                }
-                lastSizeWarn = queueSize;
-            }
-            return f;
-        }
-    }
-    int lastSizeWarn = 0;
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private synchronized void onEnd() {
-        boolean done = false;
-        while (!done) {
-            if (order.isEmpty()) {
-                running.set(false);
-                done = true;
-            } else {
-                QueuedSubmission<?> qs = order.remove();
-                queueSize--;
-                if (!qs.f.isCancelled()) {
-                    Future future = executeNow(qs.c);
-                    qs.f.setDelegate(future);
-                    done = true;
-                }
-            }
-        }
-    }
-
-    private synchronized <T> Future<T> executeNow(final Callable<T> c) {
-        return executor.submit(new Callable<T>() {
-            @Override public T call() throws Exception {
-                try {
-                    return c.call();
-                } finally {
-                    onEnd();
-                }
-            }});
-    }
-    
-    
-    private static class QueuedSubmission<T> {
-        final Callable<T> c;
-        final WrappingFuture<T> f;
-        
-        QueuedSubmission(Callable<T> c, WrappingFuture<T> f) {
-            this.c = c;
-            this.f = f;
-        }
-        
-        @Override
-        public String toString() {
-            return "QueuedSubmission["+c+"]@"+Integer.toHexString(System.identityHashCode(this));
-        }
-    }
-    
-    /**
-     * A future, where the task may not yet have been submitted to the real executor.
-     * It delegates to the real future if present, and otherwise waits for that to appear
-     */
-    private static class WrappingFuture<T> implements Future<T> {
-        private volatile Future<T> delegate;
-        private boolean cancelled;
-        
-        void setDelegate(Future<T> delegate) {
-            synchronized (this) {
-                this.delegate = delegate;
-                notifyAll();
-            }
-        }
-        
-        @Override public boolean cancel(boolean mayInterruptIfRunning) {
-            if (delegate != null) {
-                return delegate.cancel(mayInterruptIfRunning);
-            } else {
-                cancelled = true;
-                synchronized (this) {
-                    notifyAll();
-                }
-                return true;
-            }
-        }
-        
-        @Override public boolean isCancelled() {
-            if (delegate != null) {
-                return delegate.isCancelled();
-            } else {
-                return cancelled;
-            }
-        }
-        
-        @Override public boolean isDone() {
-            return (delegate != null) ? delegate.isDone() : cancelled;
-        }
-        
-        @Override public T get() throws CancellationException, ExecutionException, InterruptedException {
-            if (cancelled) {
-                throw new CancellationException();
-            } else if (delegate != null) {
-                return delegate.get();
-            } else {
-                synchronized (this) {
-                    while (delegate == null && !cancelled) {
-                        wait();
-                    }
-                }
-                return get();
-            }
-        }
-        
-        @Override public T get(long timeout, TimeUnit unit) throws CancellationException, ExecutionException, InterruptedException, TimeoutException {
-            long endtime = System.currentTimeMillis()+unit.toMillis(timeout);
-            
-            if (cancelled) {
-                throw new CancellationException();
-            } else if (delegate != null) {
-                return delegate.get(timeout, unit);
-            } else if (System.currentTimeMillis() >= endtime) {
-                throw new TimeoutException();
-            } else {
-                synchronized (this) {
-                    while (delegate == null && !cancelled && System.currentTimeMillis() < endtime) {
-                        long remaining = endtime - System.currentTimeMillis();
-                        if (remaining > 0) {
-                            wait(remaining);
-                        }
-                    }
-                }
-                long remaining = endtime - System.currentTimeMillis();
-                return get(remaining, TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/TaskBuilder.java b/core/src/main/java/brooklyn/util/task/TaskBuilder.java
deleted file mode 100644
index ecd4d4f..0000000
--- a/core/src/main/java/brooklyn/util/task/TaskBuilder.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-import org.apache.brooklyn.api.management.TaskFactory;
-import org.apache.brooklyn.api.management.TaskQueueingContext;
-
-import brooklyn.util.JavaGroovyEquivalents;
-import brooklyn.util.collections.MutableList;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.collections.MutableSet;
-
-import com.google.common.collect.Iterables;
-
-/** Convenience for creating tasks; note that DynamicSequentialTask is the default */
-public class TaskBuilder<T> {
-
-    String name = null;
-    String description = null;
-    Callable<T> body = null;
-    Boolean swallowChildrenFailures = null;
-    List<TaskAdaptable<?>> children = MutableList.of();
-    Set<Object> tags = MutableSet.of();
-    Map<String,Object> flags = MutableMap.of();
-    Boolean dynamic = null;
-    boolean parallel = false;
-    
-    public static <T> TaskBuilder<T> builder() {
-        return new TaskBuilder<T>();
-    }
-    
-    public TaskBuilder<T> name(String name) {
-        this.name = name;
-        return this;
-    }
-    
-    public TaskBuilder<T> description(String description) {
-        this.description = description;
-        return this;
-    }
-    
-    /** whether task that is built has been explicitly specified to be a dynamic task 
-     * (ie a Task which is also a {@link TaskQueueingContext}
-     * whereby new tasks can be added after creation */
-    public TaskBuilder<T> dynamic(boolean dynamic) {
-        this.dynamic = dynamic;
-        return this;
-    }
-    
-    /** whether task that is built should be parallel; cannot (currently) also be dynamic */
-    public TaskBuilder<T> parallel(boolean parallel) {
-        this.parallel = parallel;
-        return this;
-    }
-    
-    public TaskBuilder<T> body(Callable<T> body) {
-        this.body = body;
-        return this;
-    }
-    
-    /** sets up a dynamic task not to fail even if children fail */
-    public TaskBuilder<T> swallowChildrenFailures(boolean swallowChildrenFailures) {
-        this.swallowChildrenFailures = swallowChildrenFailures;
-        return this;
-    }
-    
-    public TaskBuilder<T> body(Runnable body) {
-        this.body = JavaGroovyEquivalents.<T>toCallable(body);
-        return this;
-    }
-
-    /** adds a child to the given task; the semantics of how the child is executed is set using
-     * {@link #dynamic(boolean)} and {@link #parallel(boolean)} */
-    public TaskBuilder<T> add(TaskAdaptable<?> child) {
-        children.add(child);
-        return this;
-    }
-
-    public TaskBuilder<T> addAll(Iterable<? extends TaskAdaptable<?>> additionalChildren) {
-        Iterables.addAll(children, additionalChildren);
-        return this;
-    }
-
-    public TaskBuilder<T> add(TaskAdaptable<?>... additionalChildren) {
-        children.addAll(Arrays.asList(additionalChildren));
-        return this;
-    }
-
-    /** adds a tag to the given task */
-    public TaskBuilder<T> tag(Object tag) {
-        tags.add(tag);
-        return this;
-    }
-    
-    /** adds a flag to the given task */
-    public TaskBuilder<T> flag(String flag, Object value) {
-        flags.put(flag, value);
-        return this;
-    }
-
-    /** adds the given flags to the given task */
-    public TaskBuilder<T> flags(Map<String,Object> flags) {
-        this.flags.putAll(flags);
-        return this;
-    }
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    public Task<T> build() {
-        MutableMap<String, Object> taskFlags = MutableMap.copyOf(flags);
-        if (name!=null) taskFlags.put("displayName", name);
-        if (description!=null) taskFlags.put("description", description);
-        if (!tags.isEmpty()) taskFlags.put("tags", tags);
-        
-        if (Boolean.FALSE.equals(dynamic) && children.isEmpty()) {
-            if (swallowChildrenFailures!=null)
-                throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this);
-            return new BasicTask<T>(taskFlags, body);
-        }
-        
-        // prefer dynamic set unless (a) user has said not dynamic, or (b) it's parallel (since there is no dynamic parallel yet)
-        // dynamic has better cancel (will interrupt the thread) and callers can submit tasks flexibly;
-        // however dynamic uses an extra thread and task and is noisy for contexts which don't need it
-        if (Boolean.TRUE.equals(dynamic) || (dynamic==null && !parallel)) {
-            if (parallel)
-                throw new UnsupportedOperationException("No implementation of parallel dynamic aggregate task available");
-            DynamicSequentialTask<T> result = new DynamicSequentialTask<T>(taskFlags, body);
-            if (swallowChildrenFailures!=null && swallowChildrenFailures.booleanValue()) result.swallowChildrenFailures();
-            for (TaskAdaptable t: children)
-                result.queue(t.asTask());
-            return result;
-        }
-        
-        // T must be of type List<V> for these to be valid
-        if (body != null) {
-            throw new UnsupportedOperationException("No implementation of non-dynamic task with both body and children");
-        }
-        if (swallowChildrenFailures!=null) {
-            throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this);
-        }
-        
-        if (parallel)
-            return new ParallelTask(taskFlags, children);
-        else
-            return new SequentialTask(taskFlags, children);
-    }
-
-    /** returns a a factory based on this builder */
-    public TaskFactory<Task<T>> buildFactory() {
-        return new TaskFactory<Task<T>>() {
-            public Task<T> newTask() {
-                return build();
-            }
-        };
-    }
-    
-    @Override
-    public String toString() {
-        return super.toString()+"["+name+"]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/TaskInternal.java b/core/src/main/java/brooklyn/util/task/TaskInternal.java
deleted file mode 100644
index 51dbddb..0000000
--- a/core/src/main/java/brooklyn/util/task/TaskInternal.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.apache.brooklyn.api.management.ExecutionManager;
-import org.apache.brooklyn.api.management.Task;
-
-import brooklyn.util.time.Duration;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * All tasks being passed to the {@link ExecutionManager} should implement this.
- * Users are strongly encouraged to use (or extend) {@link BasicTask}, rather than
- * implementing a task from scratch.
- * 
- * The methods on this interface will change in subsequent releases. Because this is
- * marked as beta, the normal deprecation policy for these methods does not apply.
- * 
- * @author aled
- */
-@Beta
-public interface TaskInternal<T> extends Task<T> {
-    
-    /** sets the internal future object used to record the association to a job submitted to an {@link ExecutorService} */
-    void initInternalFuture(ListenableFuture<T> result);
-
-    /** returns the underlying future where this task's results will come in; see {@link #initInternalFuture(ListenableFuture)} */
-    Future<T> getInternalFuture();
-    
-    /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here;
-     * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */
-    long getQueuedTimeUtc();
-    
-    boolean isQueuedOrSubmitted();
-    boolean isQueuedAndNotSubmitted();
-    boolean isQueued();
-
-    /** marks the task as queued for execution */
-    void markQueued();
-
-    boolean cancel();
-    
-    boolean blockUntilStarted(Duration timeout);
-
-    /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait,
-     * and typically cleared immediately afterwards; referenced by management api to inspect a task
-     * which is blocking
-     * <p>
-     * returns previous details, in case caller wishes to recall and restore it (e.g. if it is doing a sub-blocking)
-     */
-    String setBlockingDetails(String blockingDetails);
-
-    /** as {@link #setBlockingDetails(String)} but records a task which is blocking,
-     * for use e.g. in a gui to navigate to the current active subtask
-     * <p>
-     * returns previous blocking task, in case caller wishes to recall and restore it
-     */
-    Task<?> setBlockingTask(Task<?> blockingTask);
-    
-    void resetBlockingDetails();
-    
-    void resetBlockingTask();
-
-    /** returns a textual message giving details while the task is blocked */
-    String getBlockingDetails();
-    
-    /** returns a task that this task is blocked on */
-    Task<?> getBlockingTask();
-    
-    void setExtraStatusText(Object extraStatus);
-    
-    Object getExtraStatusText();
-
-    void runListeners();
-
-    void setEndTimeUtc(long val);
-
-    void setThread(Thread thread);
-
-    Callable<T> getJob();
-    
-    void setJob(Callable<T> job);
-
-    ExecutionList getListeners();
-
-    void setSubmitTimeUtc(long currentTimeMillis);
-
-    void setSubmittedByTask(Task<?> task);
-    
-    Set<Object> getMutableTags();
-
-    void setStartTimeUtc(long currentTimeMillis);
-
-    void applyTagModifier(Function<Set<Object>,Void> modifier);
-    
-    /** if a task is a proxy for another one (used mainly for internal tasks),
-     * this returns the "real" task represented by this one */
-    Task<?> getProxyTarget();
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/TaskScheduler.java b/core/src/main/java/brooklyn/util/task/TaskScheduler.java
deleted file mode 100644
index a10e63a..0000000
--- a/core/src/main/java/brooklyn/util/task/TaskScheduler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.apache.brooklyn.api.management.Task;
-
-/**
- * The scheduler is an internal mechanism to decorate {@link Task}s.
- *
- * It can control how the tasks are scheduled for execution (e.g. single-threaded execution,
- * prioritised, etc).
- */
-public interface TaskScheduler {
-    
-    public void injectExecutor(ExecutorService executor);
-
-    /**
-     * Called by {@link BasicExecutionManager} to schedule tasks.
-     */
-    public <T> Future<T> submit(Callable<T> c);
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/TaskTags.java b/core/src/main/java/brooklyn/util/task/TaskTags.java
deleted file mode 100644
index a9da252..0000000
--- a/core/src/main/java/brooklyn/util/task/TaskTags.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-
-import com.google.common.base.Function;
-
-public class TaskTags {
-
-    /** marks a task which is allowed to fail without failing his parent */
-    public static final String INESSENTIAL_TASK = "inessential";
-
-    /** marks a task which is a subtask of another */
-    public static final String SUB_TASK_TAG = "SUB-TASK";
-
-    public static void addTagDynamically(TaskAdaptable<?> task, final Object tag) {
-        ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() {
-            public Void apply(@Nullable Set<Object> input) {
-                input.add(tag);
-                return null;
-            }
-        });
-    }
-    
-    public static void addTagsDynamically(TaskAdaptable<?> task, final Object tag1, final Object ...tags) {
-        ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() {
-            public Void apply(@Nullable Set<Object> input) {
-                input.add(tag1);
-                for (Object tag: tags) input.add(tag);
-                return null;
-            }
-        });
-    }
-
-    
-    public static boolean isInessential(Task<?> task) {
-        return hasTag(task, INESSENTIAL_TASK);
-    }
-
-    public static boolean hasTag(Task<?> task, Object tag) {
-        return task.getTags().contains(tag);
-    }
-    
-    public static <U,V extends TaskAdaptable<U>> V markInessential(V task) {
-        addTagDynamically(task, INESSENTIAL_TASK);
-        return task;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/Tasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/Tasks.java b/core/src/main/java/brooklyn/util/task/Tasks.java
deleted file mode 100644
index c25dd19..0000000
--- a/core/src/main/java/brooklyn/util/task/Tasks.java
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
-
-import javax.annotation.Nullable;
-
-import org.apache.brooklyn.api.management.ExecutionContext;
-import org.apache.brooklyn.api.management.HasTaskChildren;
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-import org.apache.brooklyn.api.management.TaskFactory;
-import org.apache.brooklyn.api.management.TaskQueueingContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.exceptions.ReferenceWithError;
-import brooklyn.util.repeat.Repeater;
-import brooklyn.util.time.CountdownTimer;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-
-public class Tasks {
-    
-    private static final Logger log = LoggerFactory.getLogger(Tasks.class);
-    
-    /** convenience for setting "blocking details" on any task where the current thread is running;
-     * typically invoked prior to a wait, for transparency to a user;
-     * then invoked with 'null' just after the wait */
-    public static String setBlockingDetails(String description) {
-        Task<?> current = current();
-        if (current instanceof TaskInternal)
-            return ((TaskInternal<?>)current).setBlockingDetails(description);
-        return null;
-    }
-    public static void resetBlockingDetails() {
-        Task<?> current = current();
-        if (current instanceof TaskInternal)
-            ((TaskInternal<?>)current).resetBlockingDetails(); 
-    }
-    public static Task<?> setBlockingTask(Task<?> blocker) {
-        Task<?> current = current();
-        if (current instanceof TaskInternal)
-            return ((TaskInternal<?>)current).setBlockingTask(blocker);
-        return null;
-    }
-    public static void resetBlockingTask() {
-        Task<?> current = current();
-        if (current instanceof TaskInternal)
-            ((TaskInternal<?>)current).resetBlockingTask(); 
-    }
-    
-    /** convenience for setting "blocking details" on any task where the current thread is running,
-     * while the passed code is executed; often used from groovy as
-     * <pre>{@code withBlockingDetails("sleeping 5s") { Thread.sleep(5000); } }</pre>
-     * If code block is null, the description is set until further notice (not cleareed). */
-    @SuppressWarnings("rawtypes")
-    public static <T> T withBlockingDetails(String description, Callable<T> code) throws Exception {
-        Task current = current();
-        if (code==null) {
-            log.warn("legacy invocation of withBlockingDetails with null code block, ignoring");
-            return null;
-        }
-        String prevBlockingDetails = null;
-        if (current instanceof TaskInternal) {
-            prevBlockingDetails = ((TaskInternal)current).setBlockingDetails(description);
-        } 
-        try {
-            return code.call();
-        } finally {
-            if (current instanceof TaskInternal)
-                ((TaskInternal)current).setBlockingDetails(prevBlockingDetails); 
-        }
-    }
-
-    /** the {@link Task} where the current thread is executing, if executing in a Task, otherwise null;
-     * if the current task is a proxy, this returns the target of that proxy */
-    @SuppressWarnings("rawtypes")
-    public static Task current() { 
-        return getFinalProxyTarget(BasicExecutionManager.getPerThreadCurrentTask().get());
-    }
-
-    public static Task<?> getFinalProxyTarget(Task<?> task) {
-        if (task==null) return null;
-        Task<?> proxy = ((TaskInternal<?>)task).getProxyTarget();
-        if (proxy==null || proxy.equals(task)) return task;
-        return getFinalProxyTarget(proxy);
-    }
-    
-    /** creates a {@link ValueResolver} instance which allows significantly more customization than
-     * the various {@link #resolveValue(Object, Class, ExecutionContext)} methods here */
-    public static <T> ValueResolver<T> resolving(Object v, Class<T> type) {
-        return new ValueResolver<T>(v, type);
-    }
-
-    public static ValueResolver.ResolverBuilderPretype resolving(Object v) {
-        return new ValueResolver.ResolverBuilderPretype(v);
-    }
-
-    /** @see #resolveValue(Object, Class, ExecutionContext, String) */
-    public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec) throws ExecutionException, InterruptedException {
-        return new ValueResolver<T>(v, type).context(exec).get();
-    }
-    
-    /** attempt to resolve the given value as the given type, waiting on futures, submitting if necessary,
-     * and coercing as allowed by TypeCoercions;
-     * contextMessage (optional) will be displayed in status reports while it waits (e.g. the name of the config key being looked up).
-     * if no execution context supplied (null) this method will throw an exception if the object is an unsubmitted task */
-    public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException {
-        return new ValueResolver<T>(v, type).context(exec).description(contextMessage).get();
-    }
-    
-    /**
-     * @see #resolveDeepValue(Object, Class, ExecutionContext, String)
-     */
-    public static Object resolveDeepValue(Object v, Class<?> type, ExecutionContext exec) throws ExecutionException, InterruptedException {
-        return resolveDeepValue(v, type, exec, null);
-    }
-
-    /**
-     * Resolves the given object, blocking on futures and coercing it to the given type. If the object is a 
-     * map or iterable (or a list of map of maps, etc, etc) then walks these maps/iterables to convert all of 
-     * their values to the given type. For example, the following will return a list containing a map with "1"="true":
-     * 
-     *   {@code Object result = resolveDeepValue(ImmutableList.of(ImmutableMap.of(1, true)), String.class, exec)} 
-     *
-     * To perform a deep conversion of futures contained within Iterables or Maps without coercion of each element,
-     * the type should normally be Object, not the type of the collection. This differs from
-     * {@link #resolveValue(Object, Class, ExecutionContext, String)} which will accept Map and Iterable
-     * as the required type.
-     */
-    public static <T> T resolveDeepValue(Object v, Class<T> type, ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException {
-        return new ValueResolver<T>(v, type).context(exec).deep(true).description(contextMessage).get();
-    }
-
-    /** sets extra status details on the current task, if possible (otherwise does nothing).
-     * the extra status is presented in Task.getStatusDetails(true)
-     */
-    public static void setExtraStatusDetails(String notes) {
-        Task<?> current = current();
-        if (current instanceof TaskInternal)
-            ((TaskInternal<?>)current).setExtraStatusText(notes); 
-    }
-
-    public static <T> TaskBuilder<T> builder() {
-        return TaskBuilder.<T>builder();
-    }
-    
-    private static Task<?>[] asTasks(TaskAdaptable<?> ...tasks) {
-        Task<?>[] result = new Task<?>[tasks.length];
-        for (int i=0; i<tasks.length; i++)
-            result[i] = tasks[i].asTask();
-        return result;
-    }
-
-    public static Task<List<?>> parallel(TaskAdaptable<?> ...tasks) {
-        return parallelInternal("parallelised tasks", asTasks(tasks));
-    }
-    public static Task<List<?>> parallel(String name, TaskAdaptable<?> ...tasks) {
-        return parallelInternal(name, asTasks(tasks));
-    }
-    public static Task<List<?>> parallel(Iterable<? extends TaskAdaptable<?>> tasks) {
-        return parallel(asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
-    }
-    public static Task<List<?>> parallel(String name, Iterable<? extends TaskAdaptable<?>> tasks) {
-        return parallelInternal(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
-    }
-    private static Task<List<?>> parallelInternal(String name, Task<?>[] tasks) {
-        return Tasks.<List<?>>builder().name(name).parallel(true).add(tasks).build();
-    }
-
-    public static Task<List<?>> sequential(TaskAdaptable<?> ...tasks) {
-        return sequentialInternal("sequential tasks", asTasks(tasks));
-    }
-    public static Task<List<?>> sequential(String name, TaskAdaptable<?> ...tasks) {
-        return sequentialInternal(name, asTasks(tasks));
-    }
-    public static TaskFactory<?> sequential(TaskFactory<?> ...taskFactories) {
-        return sequentialInternal("sequential tasks", taskFactories);
-    }
-    public static TaskFactory<?> sequential(String name, TaskFactory<?> ...taskFactories) {
-        return sequentialInternal(name, taskFactories);
-    }
-    public static Task<List<?>> sequential(List<? extends TaskAdaptable<?>> tasks) {
-        return sequential(asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
-    }
-    public static Task<List<?>> sequential(String name, List<? extends TaskAdaptable<?>> tasks) {
-        return sequential(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
-    }
-    private static Task<List<?>> sequentialInternal(String name, Task<?>[] tasks) {
-        return Tasks.<List<?>>builder().name(name).parallel(false).add(tasks).build();
-    }
-    private static TaskFactory<?> sequentialInternal(final String name, final TaskFactory<?> ...taskFactories) {
-        return new TaskFactory<TaskAdaptable<?>>() {
-            @Override
-            public TaskAdaptable<?> newTask() {
-                TaskBuilder<List<?>> tb = Tasks.<List<?>>builder().name(name).parallel(false);
-                for (TaskFactory<?> tf: taskFactories)
-                    tb.add(tf.newTask().asTask());
-                return tb.build();
-            }
-        };
-    }
-
-    /** returns the first tag found on the given task which matches the given type, looking up the submission hierarachy if necessary */
-    @SuppressWarnings("unchecked")
-    public static <T> T tag(@Nullable Task<?> task, Class<T> type, boolean recurseHierarchy) {
-        // support null task to make it easier for callers to walk hierarchies
-        if (task==null) return null;
-        for (Object tag: task.getTags())
-            if (type.isInstance(tag)) return (T)tag;
-        if (!recurseHierarchy) return null;
-        return tag(task.getSubmittedByTask(), type, true);
-    }
-    
-    public static boolean isAncestorCancelled(Task<?> t) {
-        if (t==null) return false;
-        if (t.isCancelled()) return true;
-        return isAncestorCancelled(t.getSubmittedByTask());
-    }
-
-    public static boolean isQueued(TaskAdaptable<?> task) {
-        return ((TaskInternal<?>)task.asTask()).isQueued();
-    }
-
-    public static boolean isSubmitted(TaskAdaptable<?> task) {
-        return ((TaskInternal<?>)task.asTask()).isSubmitted();
-    }
-    
-    public static boolean isQueuedOrSubmitted(TaskAdaptable<?> task) {
-        return ((TaskInternal<?>)task.asTask()).isQueuedOrSubmitted();
-    }
-    
-    /**
-     * Adds the given task to the given context. Does not throw an exception if the addition fails.
-     * @return true if the task was added, false otherwise.
-     */
-    public static boolean tryQueueing(TaskQueueingContext adder, TaskAdaptable<?> task) {
-        if (task==null || isQueued(task))
-            return false;
-        try {
-            adder.queue(task.asTask());
-            return true;
-        } catch (Exception e) {
-            if (log.isDebugEnabled())
-                log.debug("Could not add task "+task+" at "+adder+": "+e);
-            return false;
-        }        
-    }
-    
-    /** see also {@link #resolving(Object)} which gives much more control about submission, timeout, etc */
-    public static <T> Supplier<T> supplier(final TaskAdaptable<T> task) {
-        return new Supplier<T>() {
-            @Override
-            public T get() {
-                return task.asTask().getUnchecked();
-            }
-        };
-    }
-    
-    /** return all children tasks of the given tasks, if it has children, else empty list */
-    public static Iterable<Task<?>> children(Task<?> task) {
-        if (task instanceof HasTaskChildren)
-            return ((HasTaskChildren)task).getChildren();
-        return Collections.emptyList();
-    }
-    
-    /** returns failed tasks */
-    public static Iterable<Task<?>> failed(Iterable<Task<?>> subtasks) {
-        return Iterables.filter(subtasks, new Predicate<Task<?>>() {
-            @Override
-            public boolean apply(Task<?> input) {
-                return input.isError();
-            }
-        });
-    }
-    
-    /** returns the task, its children, and all its children, and so on;
-     * @param root task whose descendants should be iterated
-     * @param parentFirst whether to put parents before children or after
-     */
-    public static Iterable<Task<?>> descendants(Task<?> root, final boolean parentFirst) {
-        Iterable<Task<?>> descs = Iterables.concat(Iterables.transform(Tasks.children(root), new Function<Task<?>,Iterable<Task<?>>>() {
-            @Override
-            public Iterable<Task<?>> apply(Task<?> input) {
-                return descendants(input, parentFirst);
-            }
-        }));
-        if (parentFirst) return Iterables.concat(Collections.singleton(root), descs);
-        else return Iterables.concat(descs, Collections.singleton(root));
-    }
-
-    /** returns the error thrown by the task if {@link Task#isError()}, or null if no error or not done */
-    public static Throwable getError(Task<?> t) {
-        if (t==null) return null;
-        if (!t.isDone()) return null;
-        if (t.isCancelled()) return new CancellationException();
-        try {
-            t.get();
-            return null;
-        } catch (Throwable error) {
-            // do not propagate as we are pretty much guaranteed above that it wasn't this
-            // thread which originally threw the error
-            return error;
-        }
-    }
-    public static Task<Void> fail(final String name, final Throwable optionalError) {
-        return Tasks.<Void>builder().dynamic(false).name(name).body(new Runnable() { public void run() { 
-            if (optionalError!=null) throw Exceptions.propagate(optionalError); else throw new RuntimeException("Failed: "+name);
-        } }).build();
-    }
-    public static Task<Void> warning(final String message, final Throwable optionalError) {
-        log.warn(message);
-        return TaskTags.markInessential(fail(message, optionalError));
-    }
-
-    /** marks the current task inessential; this mainly matters if the task is running in a parent
-     * {@link TaskQueueingContext} and we don't want the parent to fail if this task fails
-     * <p>
-     * no-op (silently ignored) if not in a task */
-    public static void markInessential() {
-        Task<?> task = Tasks.current();
-        if (task==null) {
-            TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
-            if (qc!=null) task = qc.asTask();
-        }
-        if (task!=null) {
-            TaskTags.markInessential(task);
-        }
-    }
-    
-    /** causes failures in subtasks of the current task not to fail the parent;
-     * no-op if not in a {@link TaskQueueingContext}.
-     * <p>
-     * essentially like a {@link #markInessential()} on all tasks in the current 
-     * {@link TaskQueueingContext}, including tasks queued subsequently */
-    @Beta
-    public static void swallowChildrenFailures() {
-        Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here");
-        TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
-        if (qc!=null) {
-            qc.swallowChildrenFailures();
-        }
-    }
-
-    /** as {@link TaskTags#addTagDynamically(TaskAdaptable, Object)} but for current task, skipping if no current task */
-    public static void addTagDynamically(Object tag) {
-        Task<?> t = Tasks.current();
-        if (t!=null) TaskTags.addTagDynamically(t, tag);
-    }
-    
-    /** 
-     * Workaround for limitation described at {@link Task#cancel(boolean)};
-     * internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation.
-     * <p> 
-     * It is irritating that {@link FutureTask} sync's object clears the runner thread, 
-     * so even if {@link BasicTask#getInternalFuture()} is used, there is no means of determining if the underlying object is done.
-     * The {@link Task#getEndTimeUtc()} seems the only way.
-     *  
-     * @return true if tasks ended; false if timed out
-     **/ 
-    @Beta
-    public static boolean blockUntilInternalTasksEnded(Task<?> t, Duration timeout) {
-        CountdownTimer timer = timeout.countdownTimer();
-        
-        if (t==null)
-            return true;
-        
-        if (t instanceof ScheduledTask) {
-            boolean result = ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining());
-            if (!result) return false;
-        }
-
-        t.blockUntilEnded(timer.getDurationRemaining());
-        
-        while (true) {
-            if (t.getEndTimeUtc()>=0) return true;
-            // above should be sufficient; but just in case, trying the below
-            Thread tt = t.getThread();
-            if (t instanceof ScheduledTask) {
-                ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining());
-                return true;
-            } else {
-                if (tt==null || !tt.isAlive()) {
-                    if (!t.isCancelled()) {
-                        // may happen for a cancelled task, interrupted after submit but before start
-                        log.warn("Internal task thread is dead or null ("+tt+") but task not ended: "+t.getEndTimeUtc()+" ("+t+")");
-                    }
-                    return true;
-                }
-            }
-            if (timer.isExpired())
-                return false;
-            Time.sleep(Repeater.DEFAULT_REAL_QUICK_PERIOD);
-        }
-    }
-    
-    /** returns true if either the current thread or the current task is interrupted/cancelled */
-    public static boolean isInterrupted() {
-        if (Thread.currentThread().isInterrupted()) return true;
-        Task<?> t = current();
-        if (t==null) return false;
-        return t.isCancelled();
-    }
-
-    private static class WaitForRepeaterCallable implements Callable<Boolean> {
-        protected Repeater repeater;
-        protected boolean requireTrue;
-
-        public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) {
-            this.repeater = repeater;
-            this.requireTrue = requireTrue;
-        }
-
-        @Override
-        public Boolean call() {
-            ReferenceWithError<Boolean> result;
-            Tasks.setBlockingDetails(repeater.getDescription());
-            try {
-               result = repeater.runKeepingError();
-            } finally {
-                Tasks.resetBlockingDetails();
-            }
-
-            if (Boolean.TRUE.equals(result.getWithoutError()))
-                return true;
-            if (result.hasError()) 
-                throw Exceptions.propagate(result.getError());
-            if (requireTrue)
-                throw new IllegalStateException("timeout - "+repeater.getDescription());
-            return false;
-        }
-    }
-
-    /** @return a {@link TaskBuilder} which tests whether the repeater terminates with success in its configured timeframe,
-     * returning true or false depending on whether repeater succeed */
-    public static TaskBuilder<Boolean> testing(Repeater repeater) {
-        return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, false))
-            .name("waiting for condition")
-            .description("Testing whether " + getTimeoutString(repeater) + ": "+repeater.getDescription());
-    }
-
-    /** @return a {@link TaskBuilder} which requires that the repeater terminate with success in its configured timeframe,
-     * throwing if it does not */
-    public static TaskBuilder<?> requiring(Repeater repeater) {
-        return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, true))
-            .name("waiting for condition")
-            .description("Requiring " + getTimeoutString(repeater) + ": " + repeater.getDescription());
-    }
-    
-    private static String getTimeoutString(Repeater repeater) {
-        Duration timeout = repeater.getTimeLimit();
-        if (timeout==null || Duration.PRACTICALLY_FOREVER.equals(timeout))
-            return "eventually";
-        return "in "+timeout;
-    }
-
-}