You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2017/03/02 17:01:57 UTC
[13/19] brooklyn-server git commit: ensure TaskFactory items
evaluated immediately don't leak long-running tasks
ensure TaskFactory items evaluated immediately don't leak long-running tasks
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/cd3d4864
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/cd3d4864
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/cd3d4864
Branch: refs/heads/master
Commit: cd3d4864aa2a59a18f28997313ca07bc9185fd62
Parents: 0aa29ef
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Sat Feb 18 16:31:26 2017 +0000
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Sat Feb 18 16:31:26 2017 +0000
----------------------------------------------------------------------
.../brooklyn/api/mgmt/ExecutionContext.java | 4 +-
.../util/core/task/BasicExecutionContext.java | 16 +++-
.../task/InterruptingImmediateSupplier.java | 26 ++++--
.../brooklyn/util/core/task/ValueResolver.java | 88 ++++++++++++++------
.../util/core/task/ValueResolverTest.java | 51 ++++++++++++
5 files changed, 151 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cd3d4864/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index f8a963a..344907a 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -75,10 +75,10 @@ public interface ExecutionContext extends Executor {
* tricks to make it look like it is in a sub-thread, and will attempt to be non-blocking but
* if needed they may block.
* <p>
- * Supports {@link Callable} and {@link Runnable} targets to be evaluated with "immediate" semantics.
+ * Supports {@link Callable} and {@link Runnable} and some {@link Task} targets to be evaluated with "immediate" semantics.
*/
// TODO reference ImmediateSupplier when that class is moved to utils project
@Beta
- <T> Maybe<T> getImmediately(Object callableOrSupplier);
+ <T> Maybe<T> getImmediately(Object callableOrSupplierOrTask);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cd3d4864/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 6c69509..0799607 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -41,6 +41,7 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.task.ImmediateSupplier.ImmediateUnsupportedException;
import org.apache.brooklyn.util.guava.Maybe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,7 +104,20 @@ public class BasicExecutionContext extends AbstractExecutionContext {
@SuppressWarnings("unchecked")
@Override
public <T> Maybe<T> getImmediately(Object callableOrSupplier) {
- BasicTask<?> fakeTaskForContext = new BasicTask<Object>(MutableMap.of("displayName", "immediate evaluation"));
+ BasicTask<?> fakeTaskForContext;
+ if (callableOrSupplier instanceof BasicTask) {
+ fakeTaskForContext = (BasicTask<?>)callableOrSupplier;
+ if (fakeTaskForContext.isQueuedOrSubmitted()) {
+ if (fakeTaskForContext.isDone()) {
+ return Maybe.of((T)fakeTaskForContext.getUnchecked());
+ } else {
+ throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+fakeTaskForContext);
+ }
+ }
+ callableOrSupplier = fakeTaskForContext.getJob();
+ } else {
+ fakeTaskForContext = new BasicTask<Object>(MutableMap.of("displayName", "immediate evaluation"));
+ }
fakeTaskForContext.tags.addAll(tags);
fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG);
fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cd3d4864/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
index a92a641..84b1bb4 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.ReferenceWithError;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.guava.Maybe;
@@ -72,12 +73,16 @@ public class InterruptingImmediateSupplier<T> implements ImmediateSupplier<T>, D
return nestedSupplier.get();
}
- @SuppressWarnings("unchecked")
public static <T> InterruptingImmediateSupplier<T> of(final Object o) {
+ return InterruptingImmediateSupplier.<T>ofSafe(o).get();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> ReferenceWithError<InterruptingImmediateSupplier<T>> ofSafe(final Object o) {
if (o instanceof Supplier) {
- return new InterruptingImmediateSupplier<T>((Supplier<T>)o);
+ return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier<T>((Supplier<T>)o));
} else if (o instanceof Callable) {
- return new InterruptingImmediateSupplier<T>(new Supplier<T>() {
+ return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier<T>(new Supplier<T>() {
@Override
public T get() {
try {
@@ -86,18 +91,25 @@ public class InterruptingImmediateSupplier<T> implements ImmediateSupplier<T>, D
throw Exceptions.propagate(e);
}
}
- });
+ }));
} else if (o instanceof Runnable) {
- return new InterruptingImmediateSupplier<T>(new Supplier<T>() {
+ return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier<T>(new Supplier<T>() {
@Override
public T get() {
((Runnable)o).run();
return null;
}
- });
+ }));
} else {
- throw new UnsupportedOperationException("Type "+o.getClass()+" not supported as InterruptingImmediateSupplier (instance "+o+")");
+ return ReferenceWithError.newInstanceThrowingError(null, new InterruptingImmediateSupplierNotSupportedForObject(o));
}
}
+ public static class InterruptingImmediateSupplierNotSupportedForObject extends UnsupportedOperationException {
+ private static final long serialVersionUID = 307517409005386500L;
+
+ public InterruptingImmediateSupplierNotSupportedForObject(Object o) {
+ super("Type "+o.getClass()+" not supported as InterruptingImmediateSupplier (instance "+o+")");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cd3d4864/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
index 6644a9a..f8cb91b 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
@@ -361,37 +361,59 @@ public class ValueResolver<T> implements DeferredSupplier<T>, Iterable<Maybe<Obj
return Maybe.of((T) v);
try {
- if (isEvaluatingImmediately() && v instanceof ImmediateSupplier) {
- final ImmediateSupplier<Object> supplier = (ImmediateSupplier<Object>) v;
- try {
- Maybe<Object> result = exec.getImmediately(supplier);
-
- // Recurse: need to ensure returned value is cast, etc
- return (result.isPresent())
- ? recursive
- ? new ValueResolver(result.get(), type, this).getMaybe()
- : result
- : result;
- } catch (ImmediateSupplier.ImmediateUnsupportedException e) {
- log.debug("Unable to resolve-immediately for "+description+" ("+v+"); falling back to executing with timeout", e);
+ boolean allowImmediateExecution = false;
+ boolean bailOutAfterImmediateExecution = false;
+
+ if (v instanceof ImmediateSupplier) {
+ allowImmediateExecution = true;
+
+ } else {
+ if ((v instanceof TaskFactory<?>) && !(v instanceof DeferredSupplier)) {
+ v = ((TaskFactory<?>)v).newTask();
+ allowImmediateExecution = true;
+ bailOutAfterImmediateExecution = true;
+ BrooklynTaskTags.setTransient(((TaskAdaptable<?>)v).asTask());
+ if (isEvaluatingImmediately()) {
+ // not needed if executing immediately
+ BrooklynTaskTags.addTagDynamically( ((TaskAdaptable<?>)v).asTask(), BrooklynTaskTags.IMMEDIATE_TASK_TAG );
+ }
+ }
+
+ //if it's a task or a future, we wait for the task to complete
+ if (v instanceof TaskAdaptable<?>) {
+ v = ((TaskAdaptable<?>) v).asTask();
}
}
- // TODO if evaluating immediately should use a new ExecutionContext.submitImmediate(...)
- // and sets a timeout but which wraps a task but does not spawn a new thread
-
- if ((v instanceof TaskFactory<?>) && !(v instanceof DeferredSupplier)) {
- v = ((TaskFactory<?>)v).newTask();
- BrooklynTaskTags.setTransient(((TaskAdaptable<?>)v).asTask());
- if (isEvaluatingImmediately()) {
- BrooklynTaskTags.addTagDynamically( ((TaskAdaptable<?>)v).asTask(), BrooklynTaskTags.IMMEDIATE_TASK_TAG );
+ if (allowImmediateExecution && isEvaluatingImmediately()) {
+ // TODO could allow for everything, when evaluating immediately -- but if the target isn't safe to run again
+ // then we have to fail if immediate didn't work; to avoid breaking semantics we only do that for a few cases;
+ // might be nice to get to the point where we can break those semantics however,
+ // ie weakening what getImmediate supports and making it be non-blocking, so that bailOut=true is the default.
+ // if: v instanceof TaskFactory -- it is safe, it's a new API (but it is currently the only one supported);
+ // more than safe, we have to do it -- or add code here to cancel tasks -- because it spawns new tasks
+ // (other objects passed through here don't get cancelled, because other things might try again later;
+ // ie a task or future passed in here might naturally be long-running so cancelling is wrong,
+ // but with a task factory generated task it would leak if we submitted and didn't cancel!)
+ // if: v instanceof ImmediateSupplier -- it probably is safe to change to bailOut = true ?
+ // if: v instanceof Task or other things -- it currently isn't safe, there are places where
+ // we expect to getImmediate on things which don't support it nicely,
+ // and we rely on the blocking-short-wait behaviour, e.g. QuorumChecks in ConfigYamlTest
+ try {
+ Maybe<T> result = execImmediate(exec, v);
+ if (result!=null) return result;
+ if (bailOutAfterImmediateExecution) {
+ throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot get immediately: "+v);
+ }
+ } catch (InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject o) {
+ // ignore, continue below
+ log.debug("Unable to resolve-immediately for "+description+" ("+v+", wrong type "+v.getClass()+"); falling back to executing with timeout");
}
}
- //if it's a task or a future, we wait for the task to complete
- if (v instanceof TaskAdaptable<?>) {
+ if (v instanceof Task) {
//if it's a task, we make sure it is submitted
- Task<?> task = ((TaskAdaptable<?>) v).asTask();
+ Task<?> task = (Task<?>) v;
if (!task.isSubmitted()) {
if (exec==null) {
return Maybe.absent("Value for unsubmitted task '"+getDescription()+"' requested but no execution context available");
@@ -537,6 +559,24 @@ public class ValueResolver<T> implements DeferredSupplier<T>, Iterable<Maybe<Obj
}
}
+ protected Maybe<T> execImmediate(ExecutionContext exec, Object immediateSupplierOrImmediateTask) {
+ Maybe<T> result;
+ try {
+ result = exec.getImmediately(immediateSupplierOrImmediateTask);
+ } catch (ImmediateSupplier.ImmediateUnsupportedException e) {
+ return null;
+ }
+ // let InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject
+ // bet thrown, and caller who cares will catch that to know it can continue
+
+ // Recurse: need to ensure returned value is cast, etc
+ return (result.isPresent())
+ ? recursive
+ ? new ValueResolver<T>(result.get(), type, this).getMaybe()
+ : result
+ : result;
+ }
+
protected String getDescription() {
return description!=null ? description : ""+value;
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cd3d4864/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index 358f39d..550d475 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -24,8 +24,11 @@ import static org.testng.Assert.fail;
import java.util.Arrays;
import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.mgmt.TaskAdaptable;
+import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.test.Asserts;
@@ -36,6 +39,8 @@ import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.util.concurrent.Callables;
+
/**
* see also {@link TasksTest} for more tests
*/
@@ -219,6 +224,52 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
assertEquals(result.getClass(), FailingImmediateAndDeferredSupplier.class);
}
+ public void testTaskFactoryGet() {
+ TaskFactory<TaskAdaptable<String>> taskFactory = new TaskFactory<TaskAdaptable<String>>() {
+ @Override public TaskAdaptable<String> newTask() {
+ return new BasicTask<>(Callables.returning("myval"));
+ }
+ };
+ String result = Tasks.resolving(taskFactory).as(String.class).context(app).get();
+ assertEquals(result, "myval");
+ }
+
+ public void testTaskFactoryGetImmediately() {
+ TaskFactory<TaskAdaptable<String>> taskFactory = new TaskFactory<TaskAdaptable<String>>() {
+ @Override public TaskAdaptable<String> newTask() {
+ return new BasicTask<>(Callables.returning("myval"));
+ }
+ };
+ String result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).get();
+ assertEquals(result, "myval");
+ }
+
+ public void testTaskFactoryGetImmediatelyDoesNotBlock() {
+ final AtomicBoolean executing = new AtomicBoolean();
+ TaskFactory<TaskAdaptable<String>> taskFactory = new TaskFactory<TaskAdaptable<String>>() {
+ @Override public TaskAdaptable<String> newTask() {
+ return new BasicTask<>(new Callable<String>() {
+ public String call() {
+ executing.set(true);
+ try {
+ Time.sleep(Duration.ONE_MINUTE);
+ return "myval";
+ } finally {
+ executing.set(false);
+ }
+ }});
+ }
+ };
+ Maybe<String> result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).getMaybe();
+ Asserts.assertTrue(result.isAbsent(), "result="+result);
+ // the call below default times out after 30s while the task above is still running
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ Asserts.assertFalse(executing.get());
+ }
+ });
+ }
+
private static class MyImmediateAndDeferredSupplier implements ImmediateSupplier<CallInfo>, DeferredSupplier<CallInfo> {
private final boolean failImmediately;