You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:00:45 UTC
[30/64] incubator-brooklyn git commit: [BROOKLYN-162] Refactor
package in ./core/util
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ForwardingTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/ForwardingTask.java b/core/src/main/java/brooklyn/util/task/ForwardingTask.java
deleted file mode 100644
index 3bc3427..0000000
--- a/core/src/main/java/brooklyn/util/task/ForwardingTask.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.brooklyn.api.management.Task;
-
-import brooklyn.util.time.Duration;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ForwardingObject;
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
-
-public abstract class ForwardingTask<T> extends ForwardingObject implements TaskInternal<T> {
-
- /** Constructor for use by subclasses. */
- protected ForwardingTask() {}
-
- @Override
- protected abstract TaskInternal<T> delegate();
-
- @Override
- public void addListener(Runnable listener, Executor executor) {
- delegate().addListener(listener, executor);
- }
-
- @Override
- public boolean cancel(boolean arg0) {
- return delegate().cancel(arg0);
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- return delegate().get();
- }
-
- @Override
- public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
- return delegate().get(arg0, arg1);
- }
-
- @Override
- public boolean isCancelled() {
- return delegate().isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return delegate().isDone();
- }
-
- @Override
- public Task<T> asTask() {
- return delegate().asTask();
- }
-
- @Override
- public String getId() {
- return delegate().getId();
- }
-
- @Override
- public Set<Object> getTags() {
- return delegate().getTags();
- }
-
- @Override
- public long getSubmitTimeUtc() {
- return delegate().getSubmitTimeUtc();
- }
-
- @Override
- public long getStartTimeUtc() {
- return delegate().getStartTimeUtc();
- }
-
- @Override
- public long getEndTimeUtc() {
- return delegate().getEndTimeUtc();
- }
-
- @Override
- public String getDisplayName() {
- return delegate().getDisplayName();
- }
-
- @Override
- public String getDescription() {
- return delegate().getDescription();
- }
-
- @Override
- public Task<?> getSubmittedByTask() {
- return delegate().getSubmittedByTask();
- }
-
- @Override
- public Thread getThread() {
- return delegate().getThread();
- }
-
- @Override
- public boolean isSubmitted() {
- return delegate().isSubmitted();
- }
-
- @Override
- public boolean isBegun() {
- return delegate().isBegun();
- }
-
- @Override
- public boolean isError() {
- return delegate().isError();
- }
-
- @Override
- public void blockUntilStarted() {
- delegate().blockUntilStarted();
- }
-
- @Override
- public void blockUntilEnded() {
- delegate().blockUntilEnded();
- }
-
- @Override
- public boolean blockUntilEnded(Duration timeout) {
- return delegate().blockUntilEnded(timeout);
- }
-
- @Override
- public String getStatusSummary() {
- return delegate().getStatusSummary();
- }
-
- @Override
- public String getStatusDetail(boolean multiline) {
- return delegate().getStatusDetail(multiline);
- }
-
- @Override
- public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException {
- return delegate().get(duration);
- }
-
- @Override
- public T getUnchecked() {
- return delegate().getUnchecked();
- }
-
- @Override
- public T getUnchecked(Duration duration) {
- return delegate().getUnchecked(duration);
- }
-
- @Override
- public void initInternalFuture(ListenableFuture<T> result) {
- delegate().initInternalFuture(result);
- }
-
- @Override
- public long getQueuedTimeUtc() {
- return delegate().getQueuedTimeUtc();
- }
-
- @Override
- public Future<T> getInternalFuture() {
- return delegate().getInternalFuture();
- }
-
- @Override
- public boolean isQueued() {
- return delegate().isQueued();
- }
-
- @Override
- public boolean isQueuedOrSubmitted() {
- return delegate().isQueuedOrSubmitted();
- }
-
- @Override
- public boolean isQueuedAndNotSubmitted() {
- return delegate().isQueuedAndNotSubmitted();
- }
-
- @Override
- public void markQueued() {
- delegate().markQueued();
- }
-
- @Override
- public boolean cancel() {
- return delegate().cancel();
- }
-
- @Override
- public boolean blockUntilStarted(Duration timeout) {
- return delegate().blockUntilStarted(timeout);
- }
-
- @Override
- public String setBlockingDetails(String blockingDetails) {
- return delegate().setBlockingDetails(blockingDetails);
- }
-
- @Override
- public Task<?> setBlockingTask(Task<?> blockingTask) {
- return delegate().setBlockingTask(blockingTask);
- }
-
- @Override
- public void resetBlockingDetails() {
- delegate().resetBlockingDetails();
- }
-
- @Override
- public void resetBlockingTask() {
- delegate().resetBlockingTask();
- }
-
- @Override
- public String getBlockingDetails() {
- return delegate().getBlockingDetails();
- }
-
- @Override
- public Task<?> getBlockingTask() {
- return delegate().getBlockingTask();
- }
-
- @Override
- public void setExtraStatusText(Object extraStatus) {
- delegate().setExtraStatusText(extraStatus);
- }
-
- @Override
- public Object getExtraStatusText() {
- return delegate().getExtraStatusText();
- }
-
- @Override
- public void runListeners() {
- delegate().runListeners();
- }
-
- @Override
- public void setEndTimeUtc(long val) {
- delegate().setEndTimeUtc(val);
- }
-
- @Override
- public void setThread(Thread thread) {
- delegate().setThread(thread);
- }
-
- @Override
- public Callable<T> getJob() {
- return delegate().getJob();
- }
-
- @Override
- public void setJob(Callable<T> job) {
- delegate().setJob(job);
- }
-
- @Override
- public ExecutionList getListeners() {
- return delegate().getListeners();
- }
-
- @Override
- public void setSubmitTimeUtc(long currentTimeMillis) {
- delegate().setSubmitTimeUtc(currentTimeMillis);
- }
-
- @Override
- public void setSubmittedByTask(Task<?> task) {
- delegate().setSubmittedByTask(task);
- }
-
- @Override
- public Set<Object> getMutableTags() {
- return delegate().getMutableTags();
- }
-
- @Override
- public void setStartTimeUtc(long currentTimeMillis) {
- delegate().setStartTimeUtc(currentTimeMillis);
- }
-
- @Override
- public void applyTagModifier(Function<Set<Object>, Void> modifier) {
- delegate().applyTagModifier(modifier);
- }
-
- @Override
- public Task<?> getProxyTarget() {
- return delegate().getProxyTarget();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java b/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java
deleted file mode 100644
index 8111332..0000000
--- a/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the resposibility to:
- * <li> invoke the listeners on job completion (success or error)
- * <li> invoke the listeners on cancel */
-public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> {
-
- final ExecutionList listeners;
-
- protected ListenableForwardingFuture(Future<T> delegate) {
- super(delegate);
- this.listeners = new ExecutionList();
- }
-
- protected ListenableForwardingFuture(Future<T> delegate, ExecutionList list) {
- super(delegate);
- this.listeners = list;
- }
-
- @Override
- public void addListener(Runnable listener, Executor executor) {
- listeners.add(listener, executor);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ParallelTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/ParallelTask.java b/core/src/main/java/brooklyn/util/task/ParallelTask.java
deleted file mode 100644
index d6e65ab..0000000
--- a/core/src/main/java/brooklyn/util/task/ParallelTask.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.brooklyn.api.management.Task;
-
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.text.Strings;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-/**
- * Runs {@link Task}s in parallel.
- *
- * No guarantees of order of starting the tasks, but the return value is a
- * {@link List} of the return values of supplied tasks in the same
- * order they were passed as arguments.
- */
-public class ParallelTask<T> extends CompoundTask<T> {
- public ParallelTask(Object... tasks) { super(tasks); }
-
- public ParallelTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
- public ParallelTask(Collection<? extends Object> tasks) { super(tasks); }
-
- public ParallelTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
- public ParallelTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
-
- @Override
- protected List<T> runJobs() throws InterruptedException, ExecutionException {
- setBlockingDetails("Executing "+
- (children.size()==1 ? "1 child task" :
- children.size()+" children tasks in parallel") );
- for (Task<? extends T> task : children) {
- submitIfNecessary(task);
- }
-
- List<T> result = Lists.newArrayList();
- List<Exception> exceptions = Lists.newArrayList();
- for (Task<? extends T> task : children) {
- T x;
- try {
- x = task.get();
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- if (TaskTags.isInessential(task)) {
- // ignore exception as it's inessential
- } else {
- exceptions.add(e);
- }
- x = null;
- }
- result.add(x);
- }
-
- if (exceptions.isEmpty()) {
- return result;
- } else {
- if (result.size()==1 && exceptions.size()==1)
- throw Exceptions.propagate( exceptions.get(0) );
- throw Exceptions.propagate(exceptions.size()+" of "+result.size()+" parallel child task"+Strings.s(result.size())+" failed", exceptions);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ScheduledTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/ScheduledTask.java b/core/src/main/java/brooklyn/util/task/ScheduledTask.java
deleted file mode 100644
index eabff49..0000000
--- a/core/src/main/java/brooklyn/util/task/ScheduledTask.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import static brooklyn.util.GroovyJavaMethods.elvis;
-import static brooklyn.util.GroovyJavaMethods.truth;
-
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.management.Task;
-
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.time.Duration;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Throwables;
-
-/**
- * A task which runs with a fixed period.
- * <p>
- * Note that some termination logic, including {@link #addListener(Runnable, java.util.concurrent.Executor)},
- * is not precisely defined.
- */
-// TODO ScheduledTask is a very pragmatic implementation; would be nice to tighten,
-// reduce external assumptions about internal structure, and clarify "done" semantics
-public class ScheduledTask extends BasicTask {
-
- final Callable<Task<?>> taskFactory;
- /** initial delay before running, set as flag in constructor; defaults to 0 */
- protected Duration delay;
- /** time to wait between executions, or null if not to repeat (default), set as flag to constructor;
- * this may be modified for subsequent submissions by a running task generated by the factory
- * using getSubmittedByTask().setPeriod(Duration) */
- protected Duration period = null;
- /** optional, set as flag in constructor; defaults to null meaning no limit */
- protected Integer maxIterations = null;
-
- protected int runCount=0;
- protected Task<?> recentRun, nextRun;
-
- public int getRunCount() { return runCount; }
- public ScheduledFuture<?> getNextScheduled() { return (ScheduledFuture<?>)internalFuture; }
-
- public ScheduledTask(Callable<Task<?>> taskFactory) {
- this(MutableMap.of(), taskFactory);
- }
-
- public ScheduledTask(final Task<?> task) {
- this(MutableMap.of(), task);
- }
-
- public ScheduledTask(Map flags, final Task<?> task){
- this(flags, new Callable<Task<?>>(){
- @Override
- public Task<?> call() throws Exception {
- return task;
- }});
- }
-
- public ScheduledTask(Map flags, Callable<Task<?>> taskFactory) {
- super(flags);
- this.taskFactory = taskFactory;
-
- delay = Duration.of(elvis(flags.remove("delay"), 0));
- period = Duration.of(elvis(flags.remove("period"), null));
- maxIterations = elvis(flags.remove("maxIterations"), null);
- }
-
- public ScheduledTask delay(Duration d) {
- this.delay = d;
- return this;
- }
- public ScheduledTask delay(long val) {
- return delay(Duration.millis(val));
- }
-
- public ScheduledTask period(Duration d) {
- this.period = d;
- return this;
- }
- public ScheduledTask period(long val) {
- return period(Duration.millis(val));
- }
-
- public ScheduledTask maxIterations(int val) {
- this.maxIterations = val;
- return this;
- }
-
- public Callable<Task<?>> getTaskFactory() {
- return taskFactory;
- }
-
- public Task<?> newTask() {
- try {
- return taskFactory.call();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- protected String getActiveTaskStatusString(int verbosity) {
- StringBuilder rv = new StringBuilder("Scheduler");
- if (runCount>0) rv.append(", iteration "+(runCount+1));
- if (recentRun!=null) rv.append(", last run "+
- Duration.sinceUtc(recentRun.getStartTimeUtc())+" ms ago");
- if (truth(getNextScheduled())) {
- Duration untilNext = Duration.millis(getNextScheduled().getDelay(TimeUnit.MILLISECONDS));
- if (untilNext.isPositive())
- rv.append(", next in "+untilNext);
- else
- rv.append(", next imminent");
- }
- return rv.toString();
- }
-
- @Override
- public boolean isDone() {
- return isCancelled() || (maxIterations!=null && maxIterations <= runCount) || (period==null && nextRun!=null && nextRun.isDone());
- }
-
- public synchronized void blockUntilFirstScheduleStarted() {
- // TODO Assumes that maxIterations is not negative!
- while (true) {
- if (isCancelled()) throw new CancellationException();
- if (recentRun==null)
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- Throwables.propagate(e);
- }
- if (recentRun!=null) return;
- }
- }
-
- public void blockUntilEnded() {
- while (!isDone()) super.blockUntilEnded();
- }
-
- /** gets the value of the most recently run task */
- public Object get() throws InterruptedException, ExecutionException {
- blockUntilStarted();
- blockUntilFirstScheduleStarted();
- return (truth(recentRun)) ? recentRun.get() : internalFuture.get();
- }
-
- @Override
- public synchronized boolean cancel(boolean mayInterrupt) {
- boolean result = super.cancel(mayInterrupt);
- if (nextRun!=null) {
- nextRun.cancel(mayInterrupt);
- notifyAll();
- }
- return result;
- }
-
- /** internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation
- * @param duration */
- @Beta
- public boolean blockUntilNextRunFinished(Duration timeout) {
- return Tasks.blockUntilInternalTasksEnded(nextRun, timeout);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/SequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/SequentialTask.java b/core/src/main/java/brooklyn/util/task/SequentialTask.java
deleted file mode 100644
index e739eb0..0000000
--- a/core/src/main/java/brooklyn/util/task/SequentialTask.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.brooklyn.api.management.Task;
-
-import com.google.common.collect.ImmutableList;
-
-
-/** runs tasks in order, waiting for one to finish before starting the next; return value here is TBD;
- * (currently is all the return values of individual tasks, but we
- * might want some pipeline support and eventually only to return final value...) */
-public class SequentialTask<T> extends CompoundTask<T> {
-
- public SequentialTask(Object... tasks) { super(tasks); }
-
- public SequentialTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
- public SequentialTask(Collection<? extends Object> tasks) { super(tasks); }
-
- public SequentialTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
- public SequentialTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
-
- protected List<T> runJobs() throws InterruptedException, ExecutionException {
- setBlockingDetails("Executing "+
- (children.size()==1 ? "1 child task" :
- children.size()+" children tasks sequentially") );
-
- List<T> result = new ArrayList<T>();
- for (Task<? extends T> task : children) {
- submitIfNecessary(task);
- // throw exception (and cancel subsequent tasks) on error
- result.add(task.get());
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java b/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
deleted file mode 100644
index a48bac8..0000000
--- a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Queue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.brooklyn.api.management.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Instances of this class ensures that {@link Task}s execute with in-order
- * single-threaded semantics.
- *
- * Tasks can be presented through {@link #submit(Callable)}. The order of execution is the
- * sumbission order.
- * <p>
- * This implementation does so by blocking on a {@link ConcurrentLinkedQueue}, <em>after</em>
- * the task is started in a thread (and {@link Task#isBegun()} returns true), but (of course)
- * <em>before</em> the {@link TaskInternal#getJob()} actually gets invoked.
- */
-public class SingleThreadedScheduler implements TaskScheduler, CanSetName {
- private static final Logger LOG = LoggerFactory.getLogger(SingleThreadedScheduler.class);
-
- private final Queue<QueuedSubmission<?>> order = new ConcurrentLinkedQueue<QueuedSubmission<?>>();
- private int queueSize = 0;
- private final AtomicBoolean running = new AtomicBoolean(false);
-
- private ExecutorService executor;
-
- private String name;
-
- @Override
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return name!=null ? "SingleThreadedScheduler["+name+"]" : super.toString();
- }
-
- @Override
- public void injectExecutor(ExecutorService executor) {
- this.executor = executor;
- }
-
- @Override
- public synchronized <T> Future<T> submit(Callable<T> c) {
- if (running.compareAndSet(false, true)) {
- return executeNow(c);
- } else {
- WrappingFuture<T> f = new WrappingFuture<T>();
- order.add(new QueuedSubmission<T>(c, f));
- queueSize++;
- if (queueSize>0 && (queueSize == 50 || (queueSize<=500 && (queueSize%100)==0) || (queueSize%1000)==0) && queueSize!=lastSizeWarn) {
- LOG.warn("{} is backing up, {} tasks queued", this, queueSize);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Task queue backing up detail, queue "+this+"; task context is "+Tasks.current()+"; latest task is "+c+"; first task is "+order.peek());
- }
- lastSizeWarn = queueSize;
- }
- return f;
- }
- }
- int lastSizeWarn = 0;
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private synchronized void onEnd() {
- boolean done = false;
- while (!done) {
- if (order.isEmpty()) {
- running.set(false);
- done = true;
- } else {
- QueuedSubmission<?> qs = order.remove();
- queueSize--;
- if (!qs.f.isCancelled()) {
- Future future = executeNow(qs.c);
- qs.f.setDelegate(future);
- done = true;
- }
- }
- }
- }
-
- private synchronized <T> Future<T> executeNow(final Callable<T> c) {
- return executor.submit(new Callable<T>() {
- @Override public T call() throws Exception {
- try {
- return c.call();
- } finally {
- onEnd();
- }
- }});
- }
-
-
- private static class QueuedSubmission<T> {
- final Callable<T> c;
- final WrappingFuture<T> f;
-
- QueuedSubmission(Callable<T> c, WrappingFuture<T> f) {
- this.c = c;
- this.f = f;
- }
-
- @Override
- public String toString() {
- return "QueuedSubmission["+c+"]@"+Integer.toHexString(System.identityHashCode(this));
- }
- }
-
- /**
- * A future, where the task may not yet have been submitted to the real executor.
- * It delegates to the real future if present, and otherwise waits for that to appear
- */
- private static class WrappingFuture<T> implements Future<T> {
- private volatile Future<T> delegate;
- private boolean cancelled;
-
- void setDelegate(Future<T> delegate) {
- synchronized (this) {
- this.delegate = delegate;
- notifyAll();
- }
- }
-
- @Override public boolean cancel(boolean mayInterruptIfRunning) {
- if (delegate != null) {
- return delegate.cancel(mayInterruptIfRunning);
- } else {
- cancelled = true;
- synchronized (this) {
- notifyAll();
- }
- return true;
- }
- }
-
- @Override public boolean isCancelled() {
- if (delegate != null) {
- return delegate.isCancelled();
- } else {
- return cancelled;
- }
- }
-
- @Override public boolean isDone() {
- return (delegate != null) ? delegate.isDone() : cancelled;
- }
-
- @Override public T get() throws CancellationException, ExecutionException, InterruptedException {
- if (cancelled) {
- throw new CancellationException();
- } else if (delegate != null) {
- return delegate.get();
- } else {
- synchronized (this) {
- while (delegate == null && !cancelled) {
- wait();
- }
- }
- return get();
- }
- }
-
- @Override public T get(long timeout, TimeUnit unit) throws CancellationException, ExecutionException, InterruptedException, TimeoutException {
- long endtime = System.currentTimeMillis()+unit.toMillis(timeout);
-
- if (cancelled) {
- throw new CancellationException();
- } else if (delegate != null) {
- return delegate.get(timeout, unit);
- } else if (System.currentTimeMillis() >= endtime) {
- throw new TimeoutException();
- } else {
- synchronized (this) {
- while (delegate == null && !cancelled && System.currentTimeMillis() < endtime) {
- long remaining = endtime - System.currentTimeMillis();
- if (remaining > 0) {
- wait(remaining);
- }
- }
- }
- long remaining = endtime - System.currentTimeMillis();
- return get(remaining, TimeUnit.MILLISECONDS);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/TaskBuilder.java b/core/src/main/java/brooklyn/util/task/TaskBuilder.java
deleted file mode 100644
index ecd4d4f..0000000
--- a/core/src/main/java/brooklyn/util/task/TaskBuilder.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-import org.apache.brooklyn.api.management.TaskFactory;
-import org.apache.brooklyn.api.management.TaskQueueingContext;
-
-import brooklyn.util.JavaGroovyEquivalents;
-import brooklyn.util.collections.MutableList;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.collections.MutableSet;
-
-import com.google.common.collect.Iterables;
-
-/** Convenience for creating tasks; note that DynamicSequentialTask is the default */
-public class TaskBuilder<T> {
-
- String name = null;
- String description = null;
- Callable<T> body = null;
- Boolean swallowChildrenFailures = null;
- List<TaskAdaptable<?>> children = MutableList.of();
- Set<Object> tags = MutableSet.of();
- Map<String,Object> flags = MutableMap.of();
- Boolean dynamic = null;
- boolean parallel = false;
-
- public static <T> TaskBuilder<T> builder() {
- return new TaskBuilder<T>();
- }
-
- public TaskBuilder<T> name(String name) {
- this.name = name;
- return this;
- }
-
- public TaskBuilder<T> description(String description) {
- this.description = description;
- return this;
- }
-
- /** whether task that is built has been explicitly specified to be a dynamic task
- * (ie a Task which is also a {@link TaskQueueingContext}
- * whereby new tasks can be added after creation */
- public TaskBuilder<T> dynamic(boolean dynamic) {
- this.dynamic = dynamic;
- return this;
- }
-
- /** whether task that is built should be parallel; cannot (currently) also be dynamic */
- public TaskBuilder<T> parallel(boolean parallel) {
- this.parallel = parallel;
- return this;
- }
-
- public TaskBuilder<T> body(Callable<T> body) {
- this.body = body;
- return this;
- }
-
- /** sets up a dynamic task not to fail even if children fail */
- public TaskBuilder<T> swallowChildrenFailures(boolean swallowChildrenFailures) {
- this.swallowChildrenFailures = swallowChildrenFailures;
- return this;
- }
-
- public TaskBuilder<T> body(Runnable body) {
- this.body = JavaGroovyEquivalents.<T>toCallable(body);
- return this;
- }
-
- /** adds a child to the given task; the semantics of how the child is executed is set using
- * {@link #dynamic(boolean)} and {@link #parallel(boolean)} */
- public TaskBuilder<T> add(TaskAdaptable<?> child) {
- children.add(child);
- return this;
- }
-
- public TaskBuilder<T> addAll(Iterable<? extends TaskAdaptable<?>> additionalChildren) {
- Iterables.addAll(children, additionalChildren);
- return this;
- }
-
- public TaskBuilder<T> add(TaskAdaptable<?>... additionalChildren) {
- children.addAll(Arrays.asList(additionalChildren));
- return this;
- }
-
- /** adds a tag to the given task */
- public TaskBuilder<T> tag(Object tag) {
- tags.add(tag);
- return this;
- }
-
- /** adds a flag to the given task */
- public TaskBuilder<T> flag(String flag, Object value) {
- flags.put(flag, value);
- return this;
- }
-
- /** adds the given flags to the given task */
- public TaskBuilder<T> flags(Map<String,Object> flags) {
- this.flags.putAll(flags);
- return this;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Task<T> build() {
- MutableMap<String, Object> taskFlags = MutableMap.copyOf(flags);
- if (name!=null) taskFlags.put("displayName", name);
- if (description!=null) taskFlags.put("description", description);
- if (!tags.isEmpty()) taskFlags.put("tags", tags);
-
- if (Boolean.FALSE.equals(dynamic) && children.isEmpty()) {
- if (swallowChildrenFailures!=null)
- throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this);
- return new BasicTask<T>(taskFlags, body);
- }
-
- // prefer dynamic set unless (a) user has said not dynamic, or (b) it's parallel (since there is no dynamic parallel yet)
- // dynamic has better cancel (will interrupt the thread) and callers can submit tasks flexibly;
- // however dynamic uses an extra thread and task and is noisy for contexts which don't need it
- if (Boolean.TRUE.equals(dynamic) || (dynamic==null && !parallel)) {
- if (parallel)
- throw new UnsupportedOperationException("No implementation of parallel dynamic aggregate task available");
- DynamicSequentialTask<T> result = new DynamicSequentialTask<T>(taskFlags, body);
- if (swallowChildrenFailures!=null && swallowChildrenFailures.booleanValue()) result.swallowChildrenFailures();
- for (TaskAdaptable t: children)
- result.queue(t.asTask());
- return result;
- }
-
- // T must be of type List<V> for these to be valid
- if (body != null) {
- throw new UnsupportedOperationException("No implementation of non-dynamic task with both body and children");
- }
- if (swallowChildrenFailures!=null) {
- throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this);
- }
-
- if (parallel)
- return new ParallelTask(taskFlags, children);
- else
- return new SequentialTask(taskFlags, children);
- }
-
- /** returns a a factory based on this builder */
- public TaskFactory<Task<T>> buildFactory() {
- return new TaskFactory<Task<T>>() {
- public Task<T> newTask() {
- return build();
- }
- };
- }
-
- @Override
- public String toString() {
- return super.toString()+"["+name+"]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/TaskInternal.java b/core/src/main/java/brooklyn/util/task/TaskInternal.java
deleted file mode 100644
index 51dbddb..0000000
--- a/core/src/main/java/brooklyn/util/task/TaskInternal.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.apache.brooklyn.api.management.ExecutionManager;
-import org.apache.brooklyn.api.management.Task;
-
-import brooklyn.util.time.Duration;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * All tasks being passed to the {@link ExecutionManager} should implement this.
- * Users are strongly encouraged to use (or extend) {@link BasicTask}, rather than
- * implementing a task from scratch.
- *
- * The methods on this interface will change in subsequent releases. Because this is
- * marked as beta, the normal deprecation policy for these methods does not apply.
- *
- * @author aled
- */
-@Beta
-public interface TaskInternal<T> extends Task<T> {
-
- /** sets the internal future object used to record the association to a job submitted to an {@link ExecutorService} */
- void initInternalFuture(ListenableFuture<T> result);
-
- /** returns the underlying future where this task's results will come in; see {@link #initInternalFuture(ListenableFuture)} */
- Future<T> getInternalFuture();
-
- /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here;
- * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */
- long getQueuedTimeUtc();
-
- boolean isQueuedOrSubmitted();
- boolean isQueuedAndNotSubmitted();
- boolean isQueued();
-
- /** marks the task as queued for execution */
- void markQueued();
-
- boolean cancel();
-
- boolean blockUntilStarted(Duration timeout);
-
- /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait,
- * and typically cleared immediately afterwards; referenced by management api to inspect a task
- * which is blocking
- * <p>
- * returns previous details, in case caller wishes to recall and restore it (e.g. if it is doing a sub-blocking)
- */
- String setBlockingDetails(String blockingDetails);
-
- /** as {@link #setBlockingDetails(String)} but records a task which is blocking,
- * for use e.g. in a gui to navigate to the current active subtask
- * <p>
- * returns previous blocking task, in case caller wishes to recall and restore it
- */
- Task<?> setBlockingTask(Task<?> blockingTask);
-
- void resetBlockingDetails();
-
- void resetBlockingTask();
-
- /** returns a textual message giving details while the task is blocked */
- String getBlockingDetails();
-
- /** returns a task that this task is blocked on */
- Task<?> getBlockingTask();
-
- void setExtraStatusText(Object extraStatus);
-
- Object getExtraStatusText();
-
- void runListeners();
-
- void setEndTimeUtc(long val);
-
- void setThread(Thread thread);
-
- Callable<T> getJob();
-
- void setJob(Callable<T> job);
-
- ExecutionList getListeners();
-
- void setSubmitTimeUtc(long currentTimeMillis);
-
- void setSubmittedByTask(Task<?> task);
-
- Set<Object> getMutableTags();
-
- void setStartTimeUtc(long currentTimeMillis);
-
- void applyTagModifier(Function<Set<Object>,Void> modifier);
-
- /** if a task is a proxy for another one (used mainly for internal tasks),
- * this returns the "real" task represented by this one */
- Task<?> getProxyTarget();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/TaskScheduler.java b/core/src/main/java/brooklyn/util/task/TaskScheduler.java
deleted file mode 100644
index a10e63a..0000000
--- a/core/src/main/java/brooklyn/util/task/TaskScheduler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.apache.brooklyn.api.management.Task;
-
-/**
- * The scheduler is an internal mechanism to decorate {@link Task}s.
- *
- * It can control how the tasks are scheduled for execution (e.g. single-threaded execution,
- * prioritised, etc).
- */
-public interface TaskScheduler {
-
- public void injectExecutor(ExecutorService executor);
-
- /**
- * Called by {@link BasicExecutionManager} to schedule tasks.
- */
- public <T> Future<T> submit(Callable<T> c);
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/TaskTags.java b/core/src/main/java/brooklyn/util/task/TaskTags.java
deleted file mode 100644
index a9da252..0000000
--- a/core/src/main/java/brooklyn/util/task/TaskTags.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-
-import com.google.common.base.Function;
-
-public class TaskTags {
-
- /** marks a task which is allowed to fail without failing his parent */
- public static final String INESSENTIAL_TASK = "inessential";
-
- /** marks a task which is a subtask of another */
- public static final String SUB_TASK_TAG = "SUB-TASK";
-
- public static void addTagDynamically(TaskAdaptable<?> task, final Object tag) {
- ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() {
- public Void apply(@Nullable Set<Object> input) {
- input.add(tag);
- return null;
- }
- });
- }
-
- public static void addTagsDynamically(TaskAdaptable<?> task, final Object tag1, final Object ...tags) {
- ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() {
- public Void apply(@Nullable Set<Object> input) {
- input.add(tag1);
- for (Object tag: tags) input.add(tag);
- return null;
- }
- });
- }
-
-
- public static boolean isInessential(Task<?> task) {
- return hasTag(task, INESSENTIAL_TASK);
- }
-
- public static boolean hasTag(Task<?> task, Object tag) {
- return task.getTags().contains(tag);
- }
-
- public static <U,V extends TaskAdaptable<U>> V markInessential(V task) {
- addTagDynamically(task, INESSENTIAL_TASK);
- return task;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/Tasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/Tasks.java b/core/src/main/java/brooklyn/util/task/Tasks.java
deleted file mode 100644
index c25dd19..0000000
--- a/core/src/main/java/brooklyn/util/task/Tasks.java
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
-
-import javax.annotation.Nullable;
-
-import org.apache.brooklyn.api.management.ExecutionContext;
-import org.apache.brooklyn.api.management.HasTaskChildren;
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-import org.apache.brooklyn.api.management.TaskFactory;
-import org.apache.brooklyn.api.management.TaskQueueingContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.exceptions.ReferenceWithError;
-import brooklyn.util.repeat.Repeater;
-import brooklyn.util.time.CountdownTimer;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-
-public class Tasks {
-
- private static final Logger log = LoggerFactory.getLogger(Tasks.class);
-
- /** convenience for setting "blocking details" on any task where the current thread is running;
- * typically invoked prior to a wait, for transparency to a user;
- * then invoked with 'null' just after the wait */
- public static String setBlockingDetails(String description) {
- Task<?> current = current();
- if (current instanceof TaskInternal)
- return ((TaskInternal<?>)current).setBlockingDetails(description);
- return null;
- }
- public static void resetBlockingDetails() {
- Task<?> current = current();
- if (current instanceof TaskInternal)
- ((TaskInternal<?>)current).resetBlockingDetails();
- }
- public static Task<?> setBlockingTask(Task<?> blocker) {
- Task<?> current = current();
- if (current instanceof TaskInternal)
- return ((TaskInternal<?>)current).setBlockingTask(blocker);
- return null;
- }
- public static void resetBlockingTask() {
- Task<?> current = current();
- if (current instanceof TaskInternal)
- ((TaskInternal<?>)current).resetBlockingTask();
- }
-
- /** convenience for setting "blocking details" on any task where the current thread is running,
- * while the passed code is executed; often used from groovy as
- * <pre>{@code withBlockingDetails("sleeping 5s") { Thread.sleep(5000); } }</pre>
- * If code block is null, the description is set until further notice (not cleareed). */
- @SuppressWarnings("rawtypes")
- public static <T> T withBlockingDetails(String description, Callable<T> code) throws Exception {
- Task current = current();
- if (code==null) {
- log.warn("legacy invocation of withBlockingDetails with null code block, ignoring");
- return null;
- }
- String prevBlockingDetails = null;
- if (current instanceof TaskInternal) {
- prevBlockingDetails = ((TaskInternal)current).setBlockingDetails(description);
- }
- try {
- return code.call();
- } finally {
- if (current instanceof TaskInternal)
- ((TaskInternal)current).setBlockingDetails(prevBlockingDetails);
- }
- }
-
- /** the {@link Task} where the current thread is executing, if executing in a Task, otherwise null;
- * if the current task is a proxy, this returns the target of that proxy */
- @SuppressWarnings("rawtypes")
- public static Task current() {
- return getFinalProxyTarget(BasicExecutionManager.getPerThreadCurrentTask().get());
- }
-
- public static Task<?> getFinalProxyTarget(Task<?> task) {
- if (task==null) return null;
- Task<?> proxy = ((TaskInternal<?>)task).getProxyTarget();
- if (proxy==null || proxy.equals(task)) return task;
- return getFinalProxyTarget(proxy);
- }
-
- /** creates a {@link ValueResolver} instance which allows significantly more customization than
- * the various {@link #resolveValue(Object, Class, ExecutionContext)} methods here */
- public static <T> ValueResolver<T> resolving(Object v, Class<T> type) {
- return new ValueResolver<T>(v, type);
- }
-
- public static ValueResolver.ResolverBuilderPretype resolving(Object v) {
- return new ValueResolver.ResolverBuilderPretype(v);
- }
-
- /** @see #resolveValue(Object, Class, ExecutionContext, String) */
- public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec) throws ExecutionException, InterruptedException {
- return new ValueResolver<T>(v, type).context(exec).get();
- }
-
- /** attempt to resolve the given value as the given type, waiting on futures, submitting if necessary,
- * and coercing as allowed by TypeCoercions;
- * contextMessage (optional) will be displayed in status reports while it waits (e.g. the name of the config key being looked up).
- * if no execution context supplied (null) this method will throw an exception if the object is an unsubmitted task */
- public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException {
- return new ValueResolver<T>(v, type).context(exec).description(contextMessage).get();
- }
-
- /**
- * @see #resolveDeepValue(Object, Class, ExecutionContext, String)
- */
- public static Object resolveDeepValue(Object v, Class<?> type, ExecutionContext exec) throws ExecutionException, InterruptedException {
- return resolveDeepValue(v, type, exec, null);
- }
-
- /**
- * Resolves the given object, blocking on futures and coercing it to the given type. If the object is a
- * map or iterable (or a list of map of maps, etc, etc) then walks these maps/iterables to convert all of
- * their values to the given type. For example, the following will return a list containing a map with "1"="true":
- *
- * {@code Object result = resolveDeepValue(ImmutableList.of(ImmutableMap.of(1, true)), String.class, exec)}
- *
- * To perform a deep conversion of futures contained within Iterables or Maps without coercion of each element,
- * the type should normally be Object, not the type of the collection. This differs from
- * {@link #resolveValue(Object, Class, ExecutionContext, String)} which will accept Map and Iterable
- * as the required type.
- */
- public static <T> T resolveDeepValue(Object v, Class<T> type, ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException {
- return new ValueResolver<T>(v, type).context(exec).deep(true).description(contextMessage).get();
- }
-
- /** sets extra status details on the current task, if possible (otherwise does nothing).
- * the extra status is presented in Task.getStatusDetails(true)
- */
- public static void setExtraStatusDetails(String notes) {
- Task<?> current = current();
- if (current instanceof TaskInternal)
- ((TaskInternal<?>)current).setExtraStatusText(notes);
- }
-
- public static <T> TaskBuilder<T> builder() {
- return TaskBuilder.<T>builder();
- }
-
- private static Task<?>[] asTasks(TaskAdaptable<?> ...tasks) {
- Task<?>[] result = new Task<?>[tasks.length];
- for (int i=0; i<tasks.length; i++)
- result[i] = tasks[i].asTask();
- return result;
- }
-
- public static Task<List<?>> parallel(TaskAdaptable<?> ...tasks) {
- return parallelInternal("parallelised tasks", asTasks(tasks));
- }
- public static Task<List<?>> parallel(String name, TaskAdaptable<?> ...tasks) {
- return parallelInternal(name, asTasks(tasks));
- }
- public static Task<List<?>> parallel(Iterable<? extends TaskAdaptable<?>> tasks) {
- return parallel(asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
- }
- public static Task<List<?>> parallel(String name, Iterable<? extends TaskAdaptable<?>> tasks) {
- return parallelInternal(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
- }
- private static Task<List<?>> parallelInternal(String name, Task<?>[] tasks) {
- return Tasks.<List<?>>builder().name(name).parallel(true).add(tasks).build();
- }
-
- public static Task<List<?>> sequential(TaskAdaptable<?> ...tasks) {
- return sequentialInternal("sequential tasks", asTasks(tasks));
- }
- public static Task<List<?>> sequential(String name, TaskAdaptable<?> ...tasks) {
- return sequentialInternal(name, asTasks(tasks));
- }
- public static TaskFactory<?> sequential(TaskFactory<?> ...taskFactories) {
- return sequentialInternal("sequential tasks", taskFactories);
- }
- public static TaskFactory<?> sequential(String name, TaskFactory<?> ...taskFactories) {
- return sequentialInternal(name, taskFactories);
- }
- public static Task<List<?>> sequential(List<? extends TaskAdaptable<?>> tasks) {
- return sequential(asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
- }
- public static Task<List<?>> sequential(String name, List<? extends TaskAdaptable<?>> tasks) {
- return sequential(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class)));
- }
- private static Task<List<?>> sequentialInternal(String name, Task<?>[] tasks) {
- return Tasks.<List<?>>builder().name(name).parallel(false).add(tasks).build();
- }
- private static TaskFactory<?> sequentialInternal(final String name, final TaskFactory<?> ...taskFactories) {
- return new TaskFactory<TaskAdaptable<?>>() {
- @Override
- public TaskAdaptable<?> newTask() {
- TaskBuilder<List<?>> tb = Tasks.<List<?>>builder().name(name).parallel(false);
- for (TaskFactory<?> tf: taskFactories)
- tb.add(tf.newTask().asTask());
- return tb.build();
- }
- };
- }
-
- /** returns the first tag found on the given task which matches the given type, looking up the submission hierarachy if necessary */
- @SuppressWarnings("unchecked")
- public static <T> T tag(@Nullable Task<?> task, Class<T> type, boolean recurseHierarchy) {
- // support null task to make it easier for callers to walk hierarchies
- if (task==null) return null;
- for (Object tag: task.getTags())
- if (type.isInstance(tag)) return (T)tag;
- if (!recurseHierarchy) return null;
- return tag(task.getSubmittedByTask(), type, true);
- }
-
- public static boolean isAncestorCancelled(Task<?> t) {
- if (t==null) return false;
- if (t.isCancelled()) return true;
- return isAncestorCancelled(t.getSubmittedByTask());
- }
-
- public static boolean isQueued(TaskAdaptable<?> task) {
- return ((TaskInternal<?>)task.asTask()).isQueued();
- }
-
- public static boolean isSubmitted(TaskAdaptable<?> task) {
- return ((TaskInternal<?>)task.asTask()).isSubmitted();
- }
-
- public static boolean isQueuedOrSubmitted(TaskAdaptable<?> task) {
- return ((TaskInternal<?>)task.asTask()).isQueuedOrSubmitted();
- }
-
- /**
- * Adds the given task to the given context. Does not throw an exception if the addition fails.
- * @return true if the task was added, false otherwise.
- */
- public static boolean tryQueueing(TaskQueueingContext adder, TaskAdaptable<?> task) {
- if (task==null || isQueued(task))
- return false;
- try {
- adder.queue(task.asTask());
- return true;
- } catch (Exception e) {
- if (log.isDebugEnabled())
- log.debug("Could not add task "+task+" at "+adder+": "+e);
- return false;
- }
- }
-
- /** see also {@link #resolving(Object)} which gives much more control about submission, timeout, etc */
- public static <T> Supplier<T> supplier(final TaskAdaptable<T> task) {
- return new Supplier<T>() {
- @Override
- public T get() {
- return task.asTask().getUnchecked();
- }
- };
- }
-
- /** return all children tasks of the given tasks, if it has children, else empty list */
- public static Iterable<Task<?>> children(Task<?> task) {
- if (task instanceof HasTaskChildren)
- return ((HasTaskChildren)task).getChildren();
- return Collections.emptyList();
- }
-
- /** returns failed tasks */
- public static Iterable<Task<?>> failed(Iterable<Task<?>> subtasks) {
- return Iterables.filter(subtasks, new Predicate<Task<?>>() {
- @Override
- public boolean apply(Task<?> input) {
- return input.isError();
- }
- });
- }
-
- /** returns the task, its children, and all its children, and so on;
- * @param root task whose descendants should be iterated
- * @param parentFirst whether to put parents before children or after
- */
- public static Iterable<Task<?>> descendants(Task<?> root, final boolean parentFirst) {
- Iterable<Task<?>> descs = Iterables.concat(Iterables.transform(Tasks.children(root), new Function<Task<?>,Iterable<Task<?>>>() {
- @Override
- public Iterable<Task<?>> apply(Task<?> input) {
- return descendants(input, parentFirst);
- }
- }));
- if (parentFirst) return Iterables.concat(Collections.singleton(root), descs);
- else return Iterables.concat(descs, Collections.singleton(root));
- }
-
- /** returns the error thrown by the task if {@link Task#isError()}, or null if no error or not done */
- public static Throwable getError(Task<?> t) {
- if (t==null) return null;
- if (!t.isDone()) return null;
- if (t.isCancelled()) return new CancellationException();
- try {
- t.get();
- return null;
- } catch (Throwable error) {
- // do not propagate as we are pretty much guaranteed above that it wasn't this
- // thread which originally threw the error
- return error;
- }
- }
- public static Task<Void> fail(final String name, final Throwable optionalError) {
- return Tasks.<Void>builder().dynamic(false).name(name).body(new Runnable() { public void run() {
- if (optionalError!=null) throw Exceptions.propagate(optionalError); else throw new RuntimeException("Failed: "+name);
- } }).build();
- }
- public static Task<Void> warning(final String message, final Throwable optionalError) {
- log.warn(message);
- return TaskTags.markInessential(fail(message, optionalError));
- }
-
- /** marks the current task inessential; this mainly matters if the task is running in a parent
- * {@link TaskQueueingContext} and we don't want the parent to fail if this task fails
- * <p>
- * no-op (silently ignored) if not in a task */
- public static void markInessential() {
- Task<?> task = Tasks.current();
- if (task==null) {
- TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
- if (qc!=null) task = qc.asTask();
- }
- if (task!=null) {
- TaskTags.markInessential(task);
- }
- }
-
- /** causes failures in subtasks of the current task not to fail the parent;
- * no-op if not in a {@link TaskQueueingContext}.
- * <p>
- * essentially like a {@link #markInessential()} on all tasks in the current
- * {@link TaskQueueingContext}, including tasks queued subsequently */
- @Beta
- public static void swallowChildrenFailures() {
- Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here");
- TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext();
- if (qc!=null) {
- qc.swallowChildrenFailures();
- }
- }
-
- /** as {@link TaskTags#addTagDynamically(TaskAdaptable, Object)} but for current task, skipping if no current task */
- public static void addTagDynamically(Object tag) {
- Task<?> t = Tasks.current();
- if (t!=null) TaskTags.addTagDynamically(t, tag);
- }
-
- /**
- * Workaround for limitation described at {@link Task#cancel(boolean)};
- * internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation.
- * <p>
- * It is irritating that {@link FutureTask} sync's object clears the runner thread,
- * so even if {@link BasicTask#getInternalFuture()} is used, there is no means of determining if the underlying object is done.
- * The {@link Task#getEndTimeUtc()} seems the only way.
- *
- * @return true if tasks ended; false if timed out
- **/
- @Beta
- public static boolean blockUntilInternalTasksEnded(Task<?> t, Duration timeout) {
- CountdownTimer timer = timeout.countdownTimer();
-
- if (t==null)
- return true;
-
- if (t instanceof ScheduledTask) {
- boolean result = ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining());
- if (!result) return false;
- }
-
- t.blockUntilEnded(timer.getDurationRemaining());
-
- while (true) {
- if (t.getEndTimeUtc()>=0) return true;
- // above should be sufficient; but just in case, trying the below
- Thread tt = t.getThread();
- if (t instanceof ScheduledTask) {
- ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining());
- return true;
- } else {
- if (tt==null || !tt.isAlive()) {
- if (!t.isCancelled()) {
- // may happen for a cancelled task, interrupted after submit but before start
- log.warn("Internal task thread is dead or null ("+tt+") but task not ended: "+t.getEndTimeUtc()+" ("+t+")");
- }
- return true;
- }
- }
- if (timer.isExpired())
- return false;
- Time.sleep(Repeater.DEFAULT_REAL_QUICK_PERIOD);
- }
- }
-
- /** returns true if either the current thread or the current task is interrupted/cancelled */
- public static boolean isInterrupted() {
- if (Thread.currentThread().isInterrupted()) return true;
- Task<?> t = current();
- if (t==null) return false;
- return t.isCancelled();
- }
-
- private static class WaitForRepeaterCallable implements Callable<Boolean> {
- protected Repeater repeater;
- protected boolean requireTrue;
-
- public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) {
- this.repeater = repeater;
- this.requireTrue = requireTrue;
- }
-
- @Override
- public Boolean call() {
- ReferenceWithError<Boolean> result;
- Tasks.setBlockingDetails(repeater.getDescription());
- try {
- result = repeater.runKeepingError();
- } finally {
- Tasks.resetBlockingDetails();
- }
-
- if (Boolean.TRUE.equals(result.getWithoutError()))
- return true;
- if (result.hasError())
- throw Exceptions.propagate(result.getError());
- if (requireTrue)
- throw new IllegalStateException("timeout - "+repeater.getDescription());
- return false;
- }
- }
-
- /** @return a {@link TaskBuilder} which tests whether the repeater terminates with success in its configured timeframe,
- * returning true or false depending on whether repeater succeed */
- public static TaskBuilder<Boolean> testing(Repeater repeater) {
- return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, false))
- .name("waiting for condition")
- .description("Testing whether " + getTimeoutString(repeater) + ": "+repeater.getDescription());
- }
-
- /** @return a {@link TaskBuilder} which requires that the repeater terminate with success in its configured timeframe,
- * throwing if it does not */
- public static TaskBuilder<?> requiring(Repeater repeater) {
- return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, true))
- .name("waiting for condition")
- .description("Requiring " + getTimeoutString(repeater) + ": " + repeater.getDescription());
- }
-
- private static String getTimeoutString(Repeater repeater) {
- Duration timeout = repeater.getTimeLimit();
- if (timeout==null || Duration.PRACTICALLY_FOREVER.equals(timeout))
- return "eventually";
- return "in "+timeout;
- }
-
-}