You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:00:34 UTC
[19/64] incubator-brooklyn git commit: [BROOKLYN-162] Refactor
package in ./core/util
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java
new file mode 100644
index 0000000..794dea9
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.brooklyn.api.management.Task;
+
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class ForwardingTask<T> extends ForwardingObject implements TaskInternal<T> {
+
+ /** Constructor for use by subclasses. */
+ protected ForwardingTask() {}
+
+ @Override
+ protected abstract TaskInternal<T> delegate();
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ delegate().addListener(listener, executor);
+ }
+
+ @Override
+ public boolean cancel(boolean arg0) {
+ return delegate().cancel(arg0);
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ return delegate().get();
+ }
+
+ @Override
+ public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate().get(arg0, arg1);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return delegate().isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return delegate().isDone();
+ }
+
+ @Override
+ public Task<T> asTask() {
+ return delegate().asTask();
+ }
+
+ @Override
+ public String getId() {
+ return delegate().getId();
+ }
+
+ @Override
+ public Set<Object> getTags() {
+ return delegate().getTags();
+ }
+
+ @Override
+ public long getSubmitTimeUtc() {
+ return delegate().getSubmitTimeUtc();
+ }
+
+ @Override
+ public long getStartTimeUtc() {
+ return delegate().getStartTimeUtc();
+ }
+
+ @Override
+ public long getEndTimeUtc() {
+ return delegate().getEndTimeUtc();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return delegate().getDisplayName();
+ }
+
+ @Override
+ public String getDescription() {
+ return delegate().getDescription();
+ }
+
+ @Override
+ public Task<?> getSubmittedByTask() {
+ return delegate().getSubmittedByTask();
+ }
+
+ @Override
+ public Thread getThread() {
+ return delegate().getThread();
+ }
+
+ @Override
+ public boolean isSubmitted() {
+ return delegate().isSubmitted();
+ }
+
+ @Override
+ public boolean isBegun() {
+ return delegate().isBegun();
+ }
+
+ @Override
+ public boolean isError() {
+ return delegate().isError();
+ }
+
+ @Override
+ public void blockUntilStarted() {
+ delegate().blockUntilStarted();
+ }
+
+ @Override
+ public void blockUntilEnded() {
+ delegate().blockUntilEnded();
+ }
+
+ @Override
+ public boolean blockUntilEnded(Duration timeout) {
+ return delegate().blockUntilEnded(timeout);
+ }
+
+ @Override
+ public String getStatusSummary() {
+ return delegate().getStatusSummary();
+ }
+
+ @Override
+ public String getStatusDetail(boolean multiline) {
+ return delegate().getStatusDetail(multiline);
+ }
+
+ @Override
+ public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate().get(duration);
+ }
+
+ @Override
+ public T getUnchecked() {
+ return delegate().getUnchecked();
+ }
+
+ @Override
+ public T getUnchecked(Duration duration) {
+ return delegate().getUnchecked(duration);
+ }
+
+ @Override
+ public void initInternalFuture(ListenableFuture<T> result) {
+ delegate().initInternalFuture(result);
+ }
+
+ @Override
+ public long getQueuedTimeUtc() {
+ return delegate().getQueuedTimeUtc();
+ }
+
+ @Override
+ public Future<T> getInternalFuture() {
+ return delegate().getInternalFuture();
+ }
+
+ @Override
+ public boolean isQueued() {
+ return delegate().isQueued();
+ }
+
+ @Override
+ public boolean isQueuedOrSubmitted() {
+ return delegate().isQueuedOrSubmitted();
+ }
+
+ @Override
+ public boolean isQueuedAndNotSubmitted() {
+ return delegate().isQueuedAndNotSubmitted();
+ }
+
+ @Override
+ public void markQueued() {
+ delegate().markQueued();
+ }
+
+ @Override
+ public boolean cancel() {
+ return delegate().cancel();
+ }
+
+ @Override
+ public boolean blockUntilStarted(Duration timeout) {
+ return delegate().blockUntilStarted(timeout);
+ }
+
+ @Override
+ public String setBlockingDetails(String blockingDetails) {
+ return delegate().setBlockingDetails(blockingDetails);
+ }
+
+ @Override
+ public Task<?> setBlockingTask(Task<?> blockingTask) {
+ return delegate().setBlockingTask(blockingTask);
+ }
+
+ @Override
+ public void resetBlockingDetails() {
+ delegate().resetBlockingDetails();
+ }
+
+ @Override
+ public void resetBlockingTask() {
+ delegate().resetBlockingTask();
+ }
+
+ @Override
+ public String getBlockingDetails() {
+ return delegate().getBlockingDetails();
+ }
+
+ @Override
+ public Task<?> getBlockingTask() {
+ return delegate().getBlockingTask();
+ }
+
+ @Override
+ public void setExtraStatusText(Object extraStatus) {
+ delegate().setExtraStatusText(extraStatus);
+ }
+
+ @Override
+ public Object getExtraStatusText() {
+ return delegate().getExtraStatusText();
+ }
+
+ @Override
+ public void runListeners() {
+ delegate().runListeners();
+ }
+
+ @Override
+ public void setEndTimeUtc(long val) {
+ delegate().setEndTimeUtc(val);
+ }
+
+ @Override
+ public void setThread(Thread thread) {
+ delegate().setThread(thread);
+ }
+
+ @Override
+ public Callable<T> getJob() {
+ return delegate().getJob();
+ }
+
+ @Override
+ public void setJob(Callable<T> job) {
+ delegate().setJob(job);
+ }
+
+ @Override
+ public ExecutionList getListeners() {
+ return delegate().getListeners();
+ }
+
+ @Override
+ public void setSubmitTimeUtc(long currentTimeMillis) {
+ delegate().setSubmitTimeUtc(currentTimeMillis);
+ }
+
+ @Override
+ public void setSubmittedByTask(Task<?> task) {
+ delegate().setSubmittedByTask(task);
+ }
+
+ @Override
+ public Set<Object> getMutableTags() {
+ return delegate().getMutableTags();
+ }
+
+ @Override
+ public void setStartTimeUtc(long currentTimeMillis) {
+ delegate().setStartTimeUtc(currentTimeMillis);
+ }
+
+ @Override
+ public void applyTagModifier(Function<Set<Object>, Void> modifier) {
+ delegate().applyTagModifier(modifier);
+ }
+
+ @Override
+ public Task<?> getProxyTarget() {
+ return delegate().getProxyTarget();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java
new file mode 100644
index 0000000..bfe88b0
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the resposibility to:
+ * <li> invoke the listeners on job completion (success or error)
+ * <li> invoke the listeners on cancel */
+public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> {
+
+ final ExecutionList listeners;
+
+ protected ListenableForwardingFuture(Future<T> delegate) {
+ super(delegate);
+ this.listeners = new ExecutionList();
+ }
+
+ protected ListenableForwardingFuture(Future<T> delegate, ExecutionList list) {
+ super(delegate);
+ this.listeners = list;
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ listeners.add(listener, executor);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java
new file mode 100644
index 0000000..10da414
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.brooklyn.api.management.Task;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.text.Strings;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Runs {@link Task}s in parallel.
+ *
+ * No guarantees of order of starting the tasks, but the return value is a
+ * {@link List} of the return values of supplied tasks in the same
+ * order they were passed as arguments.
+ */
+public class ParallelTask<T> extends CompoundTask<T> {
+ public ParallelTask(Object... tasks) { super(tasks); }
+
+ public ParallelTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
+ public ParallelTask(Collection<? extends Object> tasks) { super(tasks); }
+
+ public ParallelTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
+ public ParallelTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
+
+ @Override
+ protected List<T> runJobs() throws InterruptedException, ExecutionException {
+ setBlockingDetails("Executing "+
+ (children.size()==1 ? "1 child task" :
+ children.size()+" children tasks in parallel") );
+ for (Task<? extends T> task : children) {
+ submitIfNecessary(task);
+ }
+
+ List<T> result = Lists.newArrayList();
+ List<Exception> exceptions = Lists.newArrayList();
+ for (Task<? extends T> task : children) {
+ T x;
+ try {
+ x = task.get();
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ if (TaskTags.isInessential(task)) {
+ // ignore exception as it's inessential
+ } else {
+ exceptions.add(e);
+ }
+ x = null;
+ }
+ result.add(x);
+ }
+
+ if (exceptions.isEmpty()) {
+ return result;
+ } else {
+ if (result.size()==1 && exceptions.size()==1)
+ throw Exceptions.propagate( exceptions.get(0) );
+ throw Exceptions.propagate(exceptions.size()+" of "+result.size()+" parallel child task"+Strings.s(result.size())+" failed", exceptions);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java
new file mode 100644
index 0000000..5c4b208
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static brooklyn.util.GroovyJavaMethods.elvis;
+import static brooklyn.util.GroovyJavaMethods.truth;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Throwables;
+
+/**
+ * A task which runs with a fixed period.
+ * <p>
+ * Note that some termination logic, including {@link #addListener(Runnable, java.util.concurrent.Executor)},
+ * is not precisely defined.
+ */
+// TODO ScheduledTask is a very pragmatic implementation; would be nice to tighten,
+// reduce external assumptions about internal structure, and clarify "done" semantics
+public class ScheduledTask extends BasicTask {
+
+ final Callable<Task<?>> taskFactory;
+ /** initial delay before running, set as flag in constructor; defaults to 0 */
+ protected Duration delay;
+ /** time to wait between executions, or null if not to repeat (default), set as flag to constructor;
+ * this may be modified for subsequent submissions by a running task generated by the factory
+ * using getSubmittedByTask().setPeriod(Duration) */
+ protected Duration period = null;
+ /** optional, set as flag in constructor; defaults to null meaning no limit */
+ protected Integer maxIterations = null;
+
+ protected int runCount=0;
+ protected Task<?> recentRun, nextRun;
+
+ public int getRunCount() { return runCount; }
+ public ScheduledFuture<?> getNextScheduled() { return (ScheduledFuture<?>)internalFuture; }
+
+ public ScheduledTask(Callable<Task<?>> taskFactory) {
+ this(MutableMap.of(), taskFactory);
+ }
+
+ public ScheduledTask(final Task<?> task) {
+ this(MutableMap.of(), task);
+ }
+
+ public ScheduledTask(Map flags, final Task<?> task){
+ this(flags, new Callable<Task<?>>(){
+ @Override
+ public Task<?> call() throws Exception {
+ return task;
+ }});
+ }
+
+ public ScheduledTask(Map flags, Callable<Task<?>> taskFactory) {
+ super(flags);
+ this.taskFactory = taskFactory;
+
+ delay = Duration.of(elvis(flags.remove("delay"), 0));
+ period = Duration.of(elvis(flags.remove("period"), null));
+ maxIterations = elvis(flags.remove("maxIterations"), null);
+ }
+
+ public ScheduledTask delay(Duration d) {
+ this.delay = d;
+ return this;
+ }
+ public ScheduledTask delay(long val) {
+ return delay(Duration.millis(val));
+ }
+
+ public ScheduledTask period(Duration d) {
+ this.period = d;
+ return this;
+ }
+ public ScheduledTask period(long val) {
+ return period(Duration.millis(val));
+ }
+
+ public ScheduledTask maxIterations(int val) {
+ this.maxIterations = val;
+ return this;
+ }
+
+ public Callable<Task<?>> getTaskFactory() {
+ return taskFactory;
+ }
+
+ public Task<?> newTask() {
+ try {
+ return taskFactory.call();
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ protected String getActiveTaskStatusString(int verbosity) {
+ StringBuilder rv = new StringBuilder("Scheduler");
+ if (runCount>0) rv.append(", iteration "+(runCount+1));
+ if (recentRun!=null) rv.append(", last run "+
+ Duration.sinceUtc(recentRun.getStartTimeUtc())+" ms ago");
+ if (truth(getNextScheduled())) {
+ Duration untilNext = Duration.millis(getNextScheduled().getDelay(TimeUnit.MILLISECONDS));
+ if (untilNext.isPositive())
+ rv.append(", next in "+untilNext);
+ else
+ rv.append(", next imminent");
+ }
+ return rv.toString();
+ }
+
+ @Override
+ public boolean isDone() {
+ return isCancelled() || (maxIterations!=null && maxIterations <= runCount) || (period==null && nextRun!=null && nextRun.isDone());
+ }
+
+ public synchronized void blockUntilFirstScheduleStarted() {
+ // TODO Assumes that maxIterations is not negative!
+ while (true) {
+ if (isCancelled()) throw new CancellationException();
+ if (recentRun==null)
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ Throwables.propagate(e);
+ }
+ if (recentRun!=null) return;
+ }
+ }
+
+ public void blockUntilEnded() {
+ while (!isDone()) super.blockUntilEnded();
+ }
+
+ /** gets the value of the most recently run task */
+ public Object get() throws InterruptedException, ExecutionException {
+ blockUntilStarted();
+ blockUntilFirstScheduleStarted();
+ return (truth(recentRun)) ? recentRun.get() : internalFuture.get();
+ }
+
+ @Override
+ public synchronized boolean cancel(boolean mayInterrupt) {
+ boolean result = super.cancel(mayInterrupt);
+ if (nextRun!=null) {
+ nextRun.cancel(mayInterrupt);
+ notifyAll();
+ }
+ return result;
+ }
+
+ /** internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation
+ * @param duration */
+ @Beta
+ public boolean blockUntilNextRunFinished(Duration timeout) {
+ return Tasks.blockUntilInternalTasksEnded(nextRun, timeout);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java
new file mode 100644
index 0000000..93ecdf6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.brooklyn.api.management.Task;
+
+import com.google.common.collect.ImmutableList;
+
+
+/** runs tasks in order, waiting for one to finish before starting the next; return value here is TBD;
+ * (currently is all the return values of individual tasks, but we
+ * might want some pipeline support and eventually only to return final value...) */
+public class SequentialTask<T> extends CompoundTask<T> {
+
+ public SequentialTask(Object... tasks) { super(tasks); }
+
+ public SequentialTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
+ public SequentialTask(Collection<? extends Object> tasks) { super(tasks); }
+
+ public SequentialTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
+ public SequentialTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
+
+ protected List<T> runJobs() throws InterruptedException, ExecutionException {
+ setBlockingDetails("Executing "+
+ (children.size()==1 ? "1 child task" :
+ children.size()+" children tasks sequentially") );
+
+ List<T> result = new ArrayList<T>();
+ for (Task<? extends T> task : children) {
+ submitIfNecessary(task);
+ // throw exception (and cancel subsequent tasks) on error
+ result.add(task.get());
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java b/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java
new file mode 100644
index 0000000..2a5b51d
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.brooklyn.api.management.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Instances of this class ensures that {@link Task}s execute with in-order
+ * single-threaded semantics.
+ *
+ * Tasks can be presented through {@link #submit(Callable)}. The order of execution is the
+ * sumbission order.
+ * <p>
+ * This implementation does so by blocking on a {@link ConcurrentLinkedQueue}, <em>after</em>
+ * the task is started in a thread (and {@link Task#isBegun()} returns true), but (of course)
+ * <em>before</em> the {@link TaskInternal#getJob()} actually gets invoked.
+ */
+public class SingleThreadedScheduler implements TaskScheduler, CanSetName {
+ private static final Logger LOG = LoggerFactory.getLogger(SingleThreadedScheduler.class);
+
+ private final Queue<QueuedSubmission<?>> order = new ConcurrentLinkedQueue<QueuedSubmission<?>>();
+ private int queueSize = 0;
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ private ExecutorService executor;
+
+ private String name;
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return name!=null ? "SingleThreadedScheduler["+name+"]" : super.toString();
+ }
+
+ @Override
+ public void injectExecutor(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ @Override
+ public synchronized <T> Future<T> submit(Callable<T> c) {
+ if (running.compareAndSet(false, true)) {
+ return executeNow(c);
+ } else {
+ WrappingFuture<T> f = new WrappingFuture<T>();
+ order.add(new QueuedSubmission<T>(c, f));
+ queueSize++;
+ if (queueSize>0 && (queueSize == 50 || (queueSize<=500 && (queueSize%100)==0) || (queueSize%1000)==0) && queueSize!=lastSizeWarn) {
+ LOG.warn("{} is backing up, {} tasks queued", this, queueSize);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Task queue backing up detail, queue "+this+"; task context is "+Tasks.current()+"; latest task is "+c+"; first task is "+order.peek());
+ }
+ lastSizeWarn = queueSize;
+ }
+ return f;
+ }
+ }
+ int lastSizeWarn = 0;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private synchronized void onEnd() {
+ boolean done = false;
+ while (!done) {
+ if (order.isEmpty()) {
+ running.set(false);
+ done = true;
+ } else {
+ QueuedSubmission<?> qs = order.remove();
+ queueSize--;
+ if (!qs.f.isCancelled()) {
+ Future future = executeNow(qs.c);
+ qs.f.setDelegate(future);
+ done = true;
+ }
+ }
+ }
+ }
+
+ private synchronized <T> Future<T> executeNow(final Callable<T> c) {
+ return executor.submit(new Callable<T>() {
+ @Override public T call() throws Exception {
+ try {
+ return c.call();
+ } finally {
+ onEnd();
+ }
+ }});
+ }
+
+
+ private static class QueuedSubmission<T> {
+ final Callable<T> c;
+ final WrappingFuture<T> f;
+
+ QueuedSubmission(Callable<T> c, WrappingFuture<T> f) {
+ this.c = c;
+ this.f = f;
+ }
+
+ @Override
+ public String toString() {
+ return "QueuedSubmission["+c+"]@"+Integer.toHexString(System.identityHashCode(this));
+ }
+ }
+
+ /**
+ * A future, where the task may not yet have been submitted to the real executor.
+ * It delegates to the real future if present, and otherwise waits for that to appear
+ */
+ private static class WrappingFuture<T> implements Future<T> {
+ private volatile Future<T> delegate;
+ private boolean cancelled;
+
+ void setDelegate(Future<T> delegate) {
+ synchronized (this) {
+ this.delegate = delegate;
+ notifyAll();
+ }
+ }
+
+ @Override public boolean cancel(boolean mayInterruptIfRunning) {
+ if (delegate != null) {
+ return delegate.cancel(mayInterruptIfRunning);
+ } else {
+ cancelled = true;
+ synchronized (this) {
+ notifyAll();
+ }
+ return true;
+ }
+ }
+
+ @Override public boolean isCancelled() {
+ if (delegate != null) {
+ return delegate.isCancelled();
+ } else {
+ return cancelled;
+ }
+ }
+
+ @Override public boolean isDone() {
+ return (delegate != null) ? delegate.isDone() : cancelled;
+ }
+
+ @Override public T get() throws CancellationException, ExecutionException, InterruptedException {
+ if (cancelled) {
+ throw new CancellationException();
+ } else if (delegate != null) {
+ return delegate.get();
+ } else {
+ synchronized (this) {
+ while (delegate == null && !cancelled) {
+ wait();
+ }
+ }
+ return get();
+ }
+ }
+
+ @Override public T get(long timeout, TimeUnit unit) throws CancellationException, ExecutionException, InterruptedException, TimeoutException {
+ long endtime = System.currentTimeMillis()+unit.toMillis(timeout);
+
+ if (cancelled) {
+ throw new CancellationException();
+ } else if (delegate != null) {
+ return delegate.get(timeout, unit);
+ } else if (System.currentTimeMillis() >= endtime) {
+ throw new TimeoutException();
+ } else {
+ synchronized (this) {
+ while (delegate == null && !cancelled && System.currentTimeMillis() < endtime) {
+ long remaining = endtime - System.currentTimeMillis();
+ if (remaining > 0) {
+ wait(remaining);
+ }
+ }
+ }
+ long remaining = endtime - System.currentTimeMillis();
+ return get(remaining, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java
new file mode 100644
index 0000000..b105a00
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.api.management.TaskFactory;
+import org.apache.brooklyn.api.management.TaskQueueingContext;
+
+import brooklyn.util.JavaGroovyEquivalents;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+
+import com.google.common.collect.Iterables;
+
+/** Convenience for creating tasks; note that DynamicSequentialTask is the default */
+public class TaskBuilder<T> {
+
+ String name = null;
+ String description = null;
+ Callable<T> body = null;
+ Boolean swallowChildrenFailures = null;
+ List<TaskAdaptable<?>> children = MutableList.of();
+ Set<Object> tags = MutableSet.of();
+ Map<String,Object> flags = MutableMap.of();
+ Boolean dynamic = null;
+ boolean parallel = false;
+
+ public static <T> TaskBuilder<T> builder() {
+ return new TaskBuilder<T>();
+ }
+
+ public TaskBuilder<T> name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public TaskBuilder<T> description(String description) {
+ this.description = description;
+ return this;
+ }
+
+ /** whether task that is built has been explicitly specified to be a dynamic task
+ * (ie a Task which is also a {@link TaskQueueingContext}
+ * whereby new tasks can be added after creation */
+ public TaskBuilder<T> dynamic(boolean dynamic) {
+ this.dynamic = dynamic;
+ return this;
+ }
+
+ /** whether task that is built should be parallel; cannot (currently) also be dynamic */
+ public TaskBuilder<T> parallel(boolean parallel) {
+ this.parallel = parallel;
+ return this;
+ }
+
+ public TaskBuilder<T> body(Callable<T> body) {
+ this.body = body;
+ return this;
+ }
+
+ /** sets up a dynamic task not to fail even if children fail */
+ public TaskBuilder<T> swallowChildrenFailures(boolean swallowChildrenFailures) {
+ this.swallowChildrenFailures = swallowChildrenFailures;
+ return this;
+ }
+
+ public TaskBuilder<T> body(Runnable body) {
+ this.body = JavaGroovyEquivalents.<T>toCallable(body);
+ return this;
+ }
+
+ /** adds a child to the given task; the semantics of how the child is executed is set using
+ * {@link #dynamic(boolean)} and {@link #parallel(boolean)} */
+ public TaskBuilder<T> add(TaskAdaptable<?> child) {
+ children.add(child);
+ return this;
+ }
+
+ public TaskBuilder<T> addAll(Iterable<? extends TaskAdaptable<?>> additionalChildren) {
+ Iterables.addAll(children, additionalChildren);
+ return this;
+ }
+
+ public TaskBuilder<T> add(TaskAdaptable<?>... additionalChildren) {
+ children.addAll(Arrays.asList(additionalChildren));
+ return this;
+ }
+
+ /** adds a tag to the given task */
+ public TaskBuilder<T> tag(Object tag) {
+ tags.add(tag);
+ return this;
+ }
+
+ /** adds a flag to the given task */
+ public TaskBuilder<T> flag(String flag, Object value) {
+ flags.put(flag, value);
+ return this;
+ }
+
+ /** adds the given flags to the given task */
+ public TaskBuilder<T> flags(Map<String,Object> flags) {
+ this.flags.putAll(flags);
+ return this;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Task<T> build() {
+ MutableMap<String, Object> taskFlags = MutableMap.copyOf(flags);
+ if (name!=null) taskFlags.put("displayName", name);
+ if (description!=null) taskFlags.put("description", description);
+ if (!tags.isEmpty()) taskFlags.put("tags", tags);
+
+ if (Boolean.FALSE.equals(dynamic) && children.isEmpty()) {
+ if (swallowChildrenFailures!=null)
+ throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this);
+ return new BasicTask<T>(taskFlags, body);
+ }
+
+ // prefer dynamic set unless (a) user has said not dynamic, or (b) it's parallel (since there is no dynamic parallel yet)
+ // dynamic has better cancel (will interrupt the thread) and callers can submit tasks flexibly;
+ // however dynamic uses an extra thread and task and is noisy for contexts which don't need it
+ if (Boolean.TRUE.equals(dynamic) || (dynamic==null && !parallel)) {
+ if (parallel)
+ throw new UnsupportedOperationException("No implementation of parallel dynamic aggregate task available");
+ DynamicSequentialTask<T> result = new DynamicSequentialTask<T>(taskFlags, body);
+ if (swallowChildrenFailures!=null && swallowChildrenFailures.booleanValue()) result.swallowChildrenFailures();
+ for (TaskAdaptable t: children)
+ result.queue(t.asTask());
+ return result;
+ }
+
+ // T must be of type List<V> for these to be valid
+ if (body != null) {
+ throw new UnsupportedOperationException("No implementation of non-dynamic task with both body and children");
+ }
+ if (swallowChildrenFailures!=null) {
+ throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this);
+ }
+
+ if (parallel)
+ return new ParallelTask(taskFlags, children);
+ else
+ return new SequentialTask(taskFlags, children);
+ }
+
+ /** returns a a factory based on this builder */
+ public TaskFactory<Task<T>> buildFactory() {
+ return new TaskFactory<Task<T>>() {
+ public Task<T> newTask() {
+ return build();
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return super.toString()+"["+name+"]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java
new file mode 100644
index 0000000..b4a6569
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.brooklyn.api.management.ExecutionManager;
+import org.apache.brooklyn.api.management.Task;
+
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * All tasks being passed to the {@link ExecutionManager} should implement this.
+ * Users are strongly encouraged to use (or extend) {@link BasicTask}, rather than
+ * implementing a task from scratch.
+ *
+ * The methods on this interface will change in subsequent releases. Because this is
+ * marked as beta, the normal deprecation policy for these methods does not apply.
+ *
+ * @author aled
+ */
+@Beta
+public interface TaskInternal<T> extends Task<T> {
+
+ /** sets the internal future object used to record the association to a job submitted to an {@link ExecutorService} */
+ void initInternalFuture(ListenableFuture<T> result);
+
+ /** returns the underlying future where this task's results will come in; see {@link #initInternalFuture(ListenableFuture)} */
+ Future<T> getInternalFuture();
+
+ /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here;
+ * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */
+ long getQueuedTimeUtc();
+
+ boolean isQueuedOrSubmitted();
+ boolean isQueuedAndNotSubmitted();
+ boolean isQueued();
+
+ /** marks the task as queued for execution */
+ void markQueued();
+
+ boolean cancel();
+
+ boolean blockUntilStarted(Duration timeout);
+
+ /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait,
+ * and typically cleared immediately afterwards; referenced by management api to inspect a task
+ * which is blocking
+ * <p>
+ * returns previous details, in case caller wishes to recall and restore it (e.g. if it is doing a sub-blocking)
+ */
+ String setBlockingDetails(String blockingDetails);
+
+ /** as {@link #setBlockingDetails(String)} but records a task which is blocking,
+ * for use e.g. in a gui to navigate to the current active subtask
+ * <p>
+ * returns previous blocking task, in case caller wishes to recall and restore it
+ */
+ Task<?> setBlockingTask(Task<?> blockingTask);
+
+ void resetBlockingDetails();
+
+ void resetBlockingTask();
+
+ /** returns a textual message giving details while the task is blocked */
+ String getBlockingDetails();
+
+ /** returns a task that this task is blocked on */
+ Task<?> getBlockingTask();
+
+ void setExtraStatusText(Object extraStatus);
+
+ Object getExtraStatusText();
+
+ void runListeners();
+
+ void setEndTimeUtc(long val);
+
+ void setThread(Thread thread);
+
+ Callable<T> getJob();
+
+ void setJob(Callable<T> job);
+
+ ExecutionList getListeners();
+
+ void setSubmitTimeUtc(long currentTimeMillis);
+
+ void setSubmittedByTask(Task<?> task);
+
+ Set<Object> getMutableTags();
+
+ void setStartTimeUtc(long currentTimeMillis);
+
+ void applyTagModifier(Function<Set<Object>,Void> modifier);
+
+ /** if a task is a proxy for another one (used mainly for internal tasks),
+ * this returns the "real" task represented by this one */
+ Task<?> getProxyTarget();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java
new file mode 100644
index 0000000..7c5d8a2
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.brooklyn.api.management.Task;
+
+/**
+ * The scheduler is an internal mechanism to decorate {@link Task}s.
+ *
+ * It can control how the tasks are scheduled for execution (e.g. single-threaded execution,
+ * prioritised, etc).
+ */
+public interface TaskScheduler {
+
+ public void injectExecutor(ExecutorService executor);
+
+ /**
+ * Called by {@link BasicExecutionManager} to schedule tasks.
+ */
+ public <T> Future<T> submit(Callable<T> c);
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java
new file mode 100644
index 0000000..e404e87
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+
+import com.google.common.base.Function;
+
+public class TaskTags {
+
+ /** marks a task which is allowed to fail without failing his parent */
+ public static final String INESSENTIAL_TASK = "inessential";
+
+ /** marks a task which is a subtask of another */
+ public static final String SUB_TASK_TAG = "SUB-TASK";
+
+ public static void addTagDynamically(TaskAdaptable<?> task, final Object tag) {
+ ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() {
+ public Void apply(@Nullable Set<Object> input) {
+ input.add(tag);
+ return null;
+ }
+ });
+ }
+
+ public static void addTagsDynamically(TaskAdaptable<?> task, final Object tag1, final Object ...tags) {
+ ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() {
+ public Void apply(@Nullable Set<Object> input) {
+ input.add(tag1);
+ for (Object tag: tags) input.add(tag);
+ return null;
+ }
+ });
+ }
+
+
+ public static boolean isInessential(Task<?> task) {
+ return hasTag(task, INESSENTIAL_TASK);
+ }
+
+ public static boolean hasTag(Task<?> task, Object tag) {
+ return task.getTags().contains(tag);
+ }
+
+ public static <U,V extends TaskAdaptable<U>> V markInessential(V task) {
+ addTagDynamically(task, INESSENTIAL_TASK);
+ return task;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java b/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java
new file mode 100644
index 0000000..d391a90
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.TaskAdaptable;
+import org.apache.brooklyn.api.management.TaskFactory;
+import org.apache.brooklyn.api.management.TaskQueueingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.ReferenceWithError;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.time.CountdownTimer;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+
+public class Tasks {
+
+ private static final Logger log = LoggerFactory.getLogger(Tasks.class);
+
+ /** convenience for setting "blocking details" on any task where the current thread is running;
+ * typically invoked prior to a wait, for transparency to a user;
+ * then invoked with 'null' just after the wait */
+ public static String setBlockingDetails(String description) {
+ Task<?> current = current();
+ if (current instanceof TaskInternal)
+ return ((TaskInternal<?>)current).setBlockingDetails(description);
+ return null;
+ }
+ public static void resetBlockingDetails() {
+ Task<?> current = current();
+ if (current instanceof TaskInternal)
+ ((TaskInternal<?>)current).resetBlockingDetails();
+ }
+ public static Task<?> setBlockingTask(Task<?> blocker) {
+ Task<?> current = current();
+ if (current instanceof TaskInternal)
+ return ((TaskInternal<?>)current).setBlockingTask(blocker);
+ return null;
+ }
+ public static void resetBlockingTask() {
+ Task<?> current = current();
+ if (current instanceof TaskInternal)
+ ((TaskInternal<?>)current).resetBlockingTask();
+ }
+
+ /** convenience for setting "blocking details" on any task where the current thread is running,
+ * while the passed code is executed; often used from groovy as
+ * <pre>{@code withBlockingDetails("sleeping 5s") { Thread.sleep(5000); } }</pre>
+ * If code block is null, the description is set until further notice (not cleareed). */
+ @SuppressWarnings("rawtypes")
+ public static <T> T withBlockingDetails(String description, Callable<T> code) throws Exception {
+ Task current = current();
+ if (code==null) {
+ log.warn("legacy invocation of withBlockingDetails with null code block, ignoring");
+ return null;
+ }
+ String prevBlockingDetails = null;
+ if (current instanceof TaskInternal) {
+ prevBlockingDetails = ((TaskInternal)current).setBlockingDetails(description);
+ }
+ try {
+ return code.call();
+ } finally {
+ if (current instanceof TaskInternal)
+ ((TaskInternal)current).setBlockingDetails(prevBlockingDetails);
+ }
+ }
+
+ /** the {@link Task} where the current thread is executing, if executing in a Task, otherwise null;
+ * if the current task is a proxy, this returns the target of that proxy */
+ @SuppressWarnings("rawtypes")
+ public static Task current() {
+ return getFinalProxyTarget(BasicExecutionManager.getPerThreadCurrentTask().get());
+ }
+
+ public static Task<?> getFinalProxyTarget(Task<?> task) {
+ if (task==null) return null;
+ Task<?> proxy = ((TaskInternal<?>)task).getProxyTarget();
+ if (proxy==null || proxy.equals(task)) return task;
+ return getFinalProxyTarget(proxy);
+ }
+
+ /** creates a {@link ValueResolver} instance which allows significantly more customization than
+ * the various {@link #resolveValue(Object, Class, ExecutionContext)} methods here */
+ public static <T> ValueResolver<T> resolving(Object v, Class<T> type) {
+ return new ValueResolver<T>(v, type);
+ }
+
+ public static ValueResolver.ResolverBuilderPretype resolving(Object v) {
+ return new ValueResolver.ResolverBuilderPretype(v);
+ }
+
+ /** @see #resolveValue(Object, Class, ExecutionContext, String) */
+ public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec) throws ExecutionException, InterruptedException {
+ return new ValueResolver<T>(v, type).context(exec).get();
+ }
+
+ /** attempt to resolve the given value as the given type, waiting on futures, submitting if necessary,
+ * and coercing as allowed by TypeCoercions;
+ * contextMessage (optional) will be displayed in status reports while it waits (e.g. the name of the config key being looked up).
+ * if no execution context supplied (null) this method will throw an exception if the object is an unsubmitted task */
+ public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException {
+ return new ValueResolver<T>(v, type).context(exec).description(contextMessage).get();
+ }
+
+ /**
+ * @see #resolveDeepValue(Object, Class, ExecutionContext, String)
+ */
+ public static Object resolveDeepValue(Object v, Class<?> type, ExecutionContext exec) throws ExecutionException, InterruptedException {
+ return resolveDeepValue(v, type, exec, null);
+ }
+
+ /**
+ * Resolves the given object, blocking on futures and coercing it to the given type. If the object is a
+ * map or iterable (or a list of map of maps, etc, etc) then walks these maps/iterables to convert all of
+ * their values to the given type. For example, the following will return a list containing a map with "1"="true":
+ *
+ * {@code Object result = resolveDeepValue(ImmutableList.of(ImmutableMap.of(1, true)), String.class, exec)}
+ *
+ * To perform a deep conversion of futures contained within Iterables or Maps without coercion of each element,
+ * the type should normally be Object, not the type of the collection. This differs from
+ * {@link #resolveValue(Object, Class, ExecutionContext, String)} which will accept Map and Iterable
+ * as the required type.
+ */
+ public static <T> T resolveDeepValue(Object v, Class<T> type, ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException {
+ return new ValueResolver<T>(v, type).context(exec).deep(true).description(contextMessage).get();
+ }
+
+ /** sets extra status details on the current task, if possible (otherwise does nothing).
+ * the extra status is presented in Task.getStatusDetails(true)
+ */
+ public static void setExtraStatusDetails(String notes) {
+ Task<?> current = current();
+ if (current instanceof TaskInternal)
+ ((TaskInternal<?>)current).setExtraStatusText(notes);
+ }
+
+ public static <T> TaskBuilder<T> builder() {
+ return TaskBuilder.<T>builder();
+ }
+
+ private static Task<?>[] asTasks(TaskAdaptable<?> ...tasks) {
+ Task<?>[] result = new Task<?>[tasks.length];
+ for (int i=0; i<tasks.length; i++)
+ result[i] = tasks[i].asTask();
+ return result;
+ }
+
+ public static Task<List<?>> parallel(TaskAdaptable<?> ...tasks) {
+ return parallelInternal("parallelised tasks", asTasks(tasks));
+ }
+ public static Task<List<?>> parallel(String name, TaskAdaptable<?> ...tasks) {
+ return parallelInternal(name, asTasks(tasks));
+ }
+ public static Task<List<?>> parallel(Iterable<? extends TaskAdaptable<?>> tasks) {
+ return parallel(asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
+ }
+ public static Task<List<?>> parallel(String name, Iterable<? extends TaskAdaptable<?>> tasks) {
+ return parallelInternal(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
+ }
+ private static Task<List<?>> parallelInternal(String name, Task<?>[] tasks) {
+ return Tasks.<List<?>>builder().name(name).parallel(true).add(tasks).build();
+ }
+
+ public static Task<List<?>> sequential(TaskAdaptable<?> ...tasks) {
+ return sequentialInternal("sequential tasks", asTasks(tasks));
+ }
+ public static Task<List<?>> sequential(String name, TaskAdaptable<?> ...tasks) {
+ return sequentialInternal(name, asTasks(tasks));
+ }
+ public static TaskFactory<?> sequential(TaskFactory<?> ...taskFactories) {
+ return sequentialInternal("sequential tasks", taskFactories);
+ }
+ public static TaskFactory<?> sequential(String name, TaskFactory<?> ...taskFactories) {
+ return sequentialInternal(name, taskFactories);
+ }
+ public static Task<List<?>> sequential(List<? extends TaskAdaptable<?>> tasks) {
+ return sequential(asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
+ }
+ public static Task<List<?>> sequential(String name, List<? extends TaskAdaptable<?>> tasks) {
+ return sequential(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
+ }
+ private static Task<List<?>> sequentialInternal(String name, Task<?>[] tasks) {
+ return Tasks.<List<?>>builder().name(name).parallel(false).add(tasks).build();
+ }
+ private static TaskFactory<?> sequentialInternal(final String name, final TaskFactory<?> ...taskFactories) {
+ return new TaskFactory<TaskAdaptable<?>>() {
+ @Override
+ public TaskAdaptable<?> newTask() {
+ TaskBuilder<List<?>> tb = Tasks.<List<?>>builder().name(name).parallel(false);
+ for (TaskFactory<?> tf: taskFactories)
+ tb.add(tf.newTask().asTask());
+ return tb.build();
+ }
+ };
+ }
+
+ /** returns the first tag found on the given task which matches the given type, looking up the submission hierarachy if necessary */
+ @SuppressWarnings("unchecked")
+ public static <T> T tag(@Nullable Task<?> task, Class<T> type, boolean recurseHierarchy) {
+ // support null task to make it easier for callers to walk hierarchies
+ if (task==null) return null;
+ for (Object tag: task.getTags())
+ if (type.isInstance(tag)) return (T)tag;
+ if (!recurseHierarchy) return null;
+ return tag(task.getSubmittedByTask(), type, true);
+ }
+
+ public static boolean isAncestorCancelled(Task<?> t) {
+ if (t==null) return false;
+ if (t.isCancelled()) return true;
+ return isAncestorCancelled(t.getSubmittedByTask());
+ }
+
+ public static boolean isQueued(TaskAdaptable<?> task) {
+ return ((TaskInternal<?>)task.asTask()).isQueued();
+ }
+
+ public static boolean isSubmitted(TaskAdaptable<?> task) {
+ return ((TaskInternal<?>)task.asTask()).isSubmitted();
+ }
+
+ public static boolean isQueuedOrSubmitted(TaskAdaptable<?> task) {
+ return ((TaskInternal<?>)task.asTask()).isQueuedOrSubmitted();
+ }
+
+ /**
+ * Adds the given task to the given context. Does not throw an exception if the addition fails.
+ * @return true if the task was added, false otherwise.
+ */
+ public static boolean tryQueueing(TaskQueueingContext adder, TaskAdaptable<?> task) {
+ if (task==null || isQueued(task))
+ return false;
+ try {
+ adder.queue(task.asTask());
+ return true;
+ } catch (Exception e) {
+ if (log.isDebugEnabled())
+ log.debug("Could not add task "+task+" at "+adder+": "+e);
+ return false;
+ }
+ }
+
+ /** see also {@link #resolving(Object)} which gives much more control about submission, timeout, etc */
+ public static <T> Supplier<T> supplier(final TaskAdaptable<T> task) {
+ return new Supplier<T>() {
+ @Override
+ public T get() {
+ return task.asTask().getUnchecked();
+ }
+ };
+ }
+
+ /** return all children tasks of the given tasks, if it has children, else empty list */
+ public static Iterable<Task<?>> children(Task<?> task) {
+ if (task instanceof HasTaskChildren)
+ return ((HasTaskChildren)task).getChildren();
+ return Collections.emptyList();
+ }
+
+ /** returns failed tasks */
+ public static Iterable<Task<?>> failed(Iterable<Task<?>> subtasks) {
+ return Iterables.filter(subtasks, new Predicate<Task<?>>() {
+ @Override
+ public boolean apply(Task<?> input) {
+ return input.isError();
+ }
+ });
+ }
+
+ /** returns the task, its children, and all its children, and so on;
+ * @param root task whose descendants should be iterated
+ * @param parentFirst whether to put parents before children or after
+ */
+ public static Iterable<Task<?>> descendants(Task<?> root, final boolean parentFirst) {
+ Iterable<Task<?>> descs = Iterables.concat(Iterables.transform(Tasks.children(root), new Function<Task<?>,Iterable<Task<?>>>() {
+ @Override
+ public Iterable<Task<?>> apply(Task<?> input) {
+ return descendants(input, parentFirst);
+ }
+ }));
+ if (parentFirst) return Iterables.concat(Collections.singleton(root), descs);
+ else return Iterables.concat(descs, Collections.singleton(root));
+ }
+
+ /** returns the error thrown by the task if {@link Task#isError()}, or null if no error or not done */
+ public static Throwable getError(Task<?> t) {
+ if (t==null) return null;
+ if (!t.isDone()) return null;
+ if (t.isCancelled()) return new CancellationException();
+ try {
+ t.get();
+ return null;
+ } catch (Throwable error) {
+ // do not propagate as we are pretty much guaranteed above that it wasn't this
+ // thread which originally threw the error
+ return error;
+ }
+ }
+ public static Task<Void> fail(final String name, final Throwable optionalError) {
+ return Tasks.<Void>builder().dynamic(false).name(name).body(new Runnable() { public void run() {
+ if (optionalError!=null) throw Exceptions.propagate(optionalError); else throw new RuntimeException("Failed: "+name);
+ } }).build();
+ }
+ public static Task<Void> warning(final String message, final Throwable optionalError) {
+ log.warn(message);
+ return TaskTags.markInessential(fail(message, optionalError));
+ }
+
+ /** marks the current task inessential; this mainly matters if the task is running in a parent
+ * {@link TaskQueueingContext} and we don't want the parent to fail if this task fails
+ * <p>
+ * no-op (silently ignored) if not in a task */
+ public static void markInessential() {
+ Task<?> task = Tasks.current();
+ if (task==null) {
+ TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
+ if (qc!=null) task = qc.asTask();
+ }
+ if (task!=null) {
+ TaskTags.markInessential(task);
+ }
+ }
+
+ /** causes failures in subtasks of the current task not to fail the parent;
+ * no-op if not in a {@link TaskQueueingContext}.
+ * <p>
+ * essentially like a {@link #markInessential()} on all tasks in the current
+ * {@link TaskQueueingContext}, including tasks queued subsequently */
+ @Beta
+ public static void swallowChildrenFailures() {
+ Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here");
+ TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
+ if (qc!=null) {
+ qc.swallowChildrenFailures();
+ }
+ }
+
+ /** as {@link TaskTags#addTagDynamically(TaskAdaptable, Object)} but for current task, skipping if no current task */
+ public static void addTagDynamically(Object tag) {
+ Task<?> t = Tasks.current();
+ if (t!=null) TaskTags.addTagDynamically(t, tag);
+ }
+
+ /**
+ * Workaround for limitation described at {@link Task#cancel(boolean)};
+ * internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation.
+ * <p>
+ * It is irritating that {@link FutureTask} sync's object clears the runner thread,
+ * so even if {@link BasicTask#getInternalFuture()} is used, there is no means of determining if the underlying object is done.
+ * The {@link Task#getEndTimeUtc()} seems the only way.
+ *
+ * @return true if tasks ended; false if timed out
+ **/
+ @Beta
+ public static boolean blockUntilInternalTasksEnded(Task<?> t, Duration timeout) {
+ CountdownTimer timer = timeout.countdownTimer();
+
+ if (t==null)
+ return true;
+
+ if (t instanceof ScheduledTask) {
+ boolean result = ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining());
+ if (!result) return false;
+ }
+
+ t.blockUntilEnded(timer.getDurationRemaining());
+
+ while (true) {
+ if (t.getEndTimeUtc()>=0) return true;
+ // above should be sufficient; but just in case, trying the below
+ Thread tt = t.getThread();
+ if (t instanceof ScheduledTask) {
+ ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining());
+ return true;
+ } else {
+ if (tt==null || !tt.isAlive()) {
+ if (!t.isCancelled()) {
+ // may happen for a cancelled task, interrupted after submit but before start
+ log.warn("Internal task thread is dead or null ("+tt+") but task not ended: "+t.getEndTimeUtc()+" ("+t+")");
+ }
+ return true;
+ }
+ }
+ if (timer.isExpired())
+ return false;
+ Time.sleep(Repeater.DEFAULT_REAL_QUICK_PERIOD);
+ }
+ }
+
+ /** returns true if either the current thread or the current task is interrupted/cancelled */
+ public static boolean isInterrupted() {
+ if (Thread.currentThread().isInterrupted()) return true;
+ Task<?> t = current();
+ if (t==null) return false;
+ return t.isCancelled();
+ }
+
+ private static class WaitForRepeaterCallable implements Callable<Boolean> {
+ protected Repeater repeater;
+ protected boolean requireTrue;
+
+ public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) {
+ this.repeater = repeater;
+ this.requireTrue = requireTrue;
+ }
+
+ @Override
+ public Boolean call() {
+ ReferenceWithError<Boolean> result;
+ Tasks.setBlockingDetails(repeater.getDescription());
+ try {
+ result = repeater.runKeepingError();
+ } finally {
+ Tasks.resetBlockingDetails();
+ }
+
+ if (Boolean.TRUE.equals(result.getWithoutError()))
+ return true;
+ if (result.hasError())
+ throw Exceptions.propagate(result.getError());
+ if (requireTrue)
+ throw new IllegalStateException("timeout - "+repeater.getDescription());
+ return false;
+ }
+ }
+
+ /** @return a {@link TaskBuilder} which tests whether the repeater terminates with success in its configured timeframe,
+ * returning true or false depending on whether repeater succeed */
+ public static TaskBuilder<Boolean> testing(Repeater repeater) {
+ return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, false))
+ .name("waiting for condition")
+ .description("Testing whether " + getTimeoutString(repeater) + ": "+repeater.getDescription());
+ }
+
+ /** @return a {@link TaskBuilder} which requires that the repeater terminate with success in its configured timeframe,
+ * throwing if it does not */
+ public static TaskBuilder<?> requiring(Repeater repeater) {
+ return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, true))
+ .name("waiting for condition")
+ .description("Requiring " + getTimeoutString(repeater) + ": " + repeater.getDescription());
+ }
+
+ private static String getTimeoutString(Repeater repeater) {
+ Duration timeout = repeater.getTimeLimit();
+ if (timeout==null || Duration.PRACTICALLY_FOREVER.equals(timeout))
+ return "eventually";
+ return "in "+timeout;
+ }
+
+}