You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sv...@apache.org on 2016/11/02 10:06:22 UTC
[1/6] brooklyn-server git commit: PR #390: incorporate review comments
Repository: brooklyn-server
Updated Branches:
refs/heads/master c4cc0d1bd -> 4fab5b1a8
PR #390: incorporate review comments
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/cee60cc8
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/cee60cc8
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/cee60cc8
Branch: refs/heads/master
Commit: cee60cc84510aebb548cecc9c2a3e65e96635406
Parents: 1cd2a09
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 1 11:05:45 2016 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 11:06:02 2016 +0000
----------------------------------------------------------------------
.../spi/dsl/methods/BrooklynDslCommon.java | 40 +++++--------
.../brooklyn/spi/dsl/methods/DslComponent.java | 23 +++++--
.../brooklyn/camp/brooklyn/dsl/DslTest.java | 63 ++++++++++++--------
.../util/core/task/ImmediateSupplier.java | 2 +-
.../brooklyn/util/core/task/ValueResolver.java | 7 +--
.../core/entity/EntitySubscriptionTest.java | 19 +++++-
.../util/core/task/ValueResolverTest.java | 45 +++++++++++++-
7 files changed, 137 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
index d92bf46..f508db9 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
@@ -414,17 +414,7 @@ public class BrooklynDslCommon {
@Override
public Maybe<Object> getImmediately() {
- Class<?> type = this.type;
- if (type == null) {
- EntityInternal entity = entity();
- try {
- type = new ClassLoaderUtils(BrooklynDslCommon.class, entity).loadClass(typeName);
- } catch (ClassNotFoundException e) {
- throw Exceptions.propagate(e);
- }
- }
- final Class<?> clazz = type;
-
+ final Class<?> clazz = getOrLoadType();
final ExecutionContext executionContext = ((EntityInternal)entity()).getExecutionContext();
// Marker exception that one of our component-parts cannot yet be resolved -
@@ -465,17 +455,7 @@ public class BrooklynDslCommon {
@Override
public Task<Object> newTask() {
- Class<?> type = this.type;
- if (type == null) {
- EntityInternal entity = entity();
- try {
- type = new ClassLoaderUtils(BrooklynDslCommon.class, entity).loadClass(typeName);
- } catch (ClassNotFoundException e) {
- throw Exceptions.propagate(e);
- }
- }
- final Class<?> clazz = type;
-
+ final Class<?> clazz = getOrLoadType();
final ExecutionContext executionContext = ((EntityInternal)entity()).getExecutionContext();
final Function<Object, Object> resolver = new Function<Object, Object>() {
@@ -508,6 +488,19 @@ public class BrooklynDslCommon {
.build();
}
+ protected Class<?> getOrLoadType() {
+ Class<?> type = this.type;
+ if (type == null) {
+ EntityInternal entity = entity();
+ try {
+ type = new ClassLoaderUtils(BrooklynDslCommon.class, entity).loadClass(typeName);
+ } catch (ClassNotFoundException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+ return type;
+ }
+
public static <T> T create(Class<T> type, List<?> constructorArgs, Map<String,?> fields, Map<String,?> config) {
try {
T bean = Reflections.invokeConstructorFromArgs(type, constructorArgs.toArray()).get();
@@ -603,8 +596,7 @@ public class BrooklynDslCommon {
.body(new Callable<Object>() {
@Override
public Object call() throws Exception {
- ManagementContextInternal managementContext = DslExternal.managementContext();
- return managementContext.getExternalConfigProviderRegistry().getConfig(providerName, key);
+ return getImmediately().get();
}
})
.build();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
index b134450..cded853 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
@@ -88,7 +88,7 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
}
// ---------------------------
-
+
@Override
public final Maybe<Entity> getImmediately() {
try {
@@ -236,7 +236,11 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
@Override
public Maybe<Object> getImmediately() {
- return Maybe.<Object>of(component.get().getId());
+ Maybe<Entity> targetEntityMaybe = component.getImmediately();
+ if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
+ Entity targetEntity = targetEntityMaybe.get();
+
+ return Maybe.<Object>of(targetEntity.getId());
}
@Override
@@ -277,7 +281,10 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
@Override
public final Maybe<Object> getImmediately() {
- Entity targetEntity = component.getImmediately().get();
+ Maybe<Entity> targetEntityMaybe = component.getImmediately();
+ if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
+ Entity targetEntity = targetEntityMaybe.get();
+
AttributeSensor<?> targetSensor = (AttributeSensor<?>) targetEntity.getEntityType().getSensor(sensorName);
if (targetSensor == null) {
targetSensor = Sensors.newSensor(Object.class, sensorName);
@@ -331,7 +338,10 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
@Override
public final Maybe<Object> getImmediately() {
- Entity targetEntity = component.get();
+ Maybe<Entity> targetEntityMaybe = component.getImmediately();
+ if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
+ Entity targetEntity = targetEntityMaybe.get();
+
return Maybe.of(targetEntity.getConfig(ConfigKeys.newConfigKey(Object.class, keyName)));
}
@@ -394,7 +404,10 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
if (si instanceof Sensor) {
return Maybe.<Sensor<?>>of((Sensor<?>)si);
} else if (si instanceof String) {
- Entity targetEntity = component.get();
+ Maybe<Entity> targetEntityMaybe = component.getImmediately();
+ if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
+ Entity targetEntity = targetEntityMaybe.get();
+
Sensor<?> result = null;
if (targetEntity!=null) {
result = targetEntity.getEntityType().getSensor((String)si);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
index d27e6c2..8db3720 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
@@ -48,6 +48,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -79,7 +80,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
@Test
public void testAttributeWhenReadyEmptyDoesNotBlock() throws Exception {
BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestApplication.MY_ATTRIBUTE.getName());
- Maybe<?> actualValue = execDslRealRealyQuick(dsl, TestApplication.MY_ATTRIBUTE.getType(), app);
+ Maybe<?> actualValue = execDslRealRealQuick(dsl, TestApplication.MY_ATTRIBUTE.getType(), app);
assertTrue(actualValue.isAbsent());
}
@@ -100,7 +101,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
public void testAttributeWhenReadyBlocksUntilReady() throws Exception {
// Fewer iterations, because there is a sleep each time
BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestEntity.NAME.getName());
- new AttributeWhenReadyTestWorker(app, TestEntity.NAME, dsl).eventually(true).resolverIterations(2).run();
+ new AttributeWhenReadyTestWorker(app, TestEntity.NAME, dsl).satisfiedAsynchronously(true).resolverIterations(2).run();
}
@Test(groups="Integration")
@@ -165,7 +166,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
protected final Class<?> type;
protected EntitySpec<TestEntity> childSpec = EntitySpec.create(TestEntity.class);
protected int resolverIterations = MANY_RESOLVER_ITERATIONS;
- protected boolean eventually = false;
+ protected boolean satisfiedAsynchronously = false;
private boolean wrapInTaskForImmediately = true;
public DslTestWorker(TestApplication parent, BrooklynDslDeferredSupplier<?> dsl, Class<?> type) {
@@ -179,8 +180,8 @@ public class DslTest extends BrooklynAppUnitTestSupport {
return this;
}
- public DslTestWorker eventually(boolean val) {
- eventually = val;
+ public DslTestWorker satisfiedAsynchronously(boolean val) {
+ satisfiedAsynchronously = val;
return this;
}
@@ -191,13 +192,10 @@ public class DslTest extends BrooklynAppUnitTestSupport {
@Override
public void run() {
- TestEntity entity = parent.addChild(childSpec);
- for (int i = 0; i < resolverIterations; i++) {
- preResolve(entity);
- Maybe<?> eventualValue = execDslEventually(dsl, type, entity, Duration.FIVE_SECONDS);//FIXME ONE_MINUTE);
- postResolve(entity, eventualValue);
-
- if (!eventually) {
+ try {
+ TestEntity entity = parent.addChild(childSpec);
+ for (int i = 0; i < resolverIterations; i++) {
+ // Call dsl.getImmediately()
preResolve(entity);
Maybe<?> immediateValue;
try {
@@ -205,21 +203,29 @@ public class DslTest extends BrooklynAppUnitTestSupport {
} catch (Exception e) {
throw Exceptions.propagate(e);
}
- postResolve(entity, immediateValue);
+ postResolve(entity, immediateValue, true);
+
+ // Call dsl.get()
+ preResolve(entity);
+ Maybe<?> eventualValue = execDslEventually(dsl, type, entity, Duration.ONE_MINUTE);
+ postResolve(entity, eventualValue, false);
}
+ } catch (Exception e) {
+ Exceptions.propagate(e);
}
}
- protected void preResolve(TestEntity entity) {
+ protected void preResolve(TestEntity entity) throws Exception {
}
- protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
+ protected void postResolve(TestEntity entity, Maybe<?> actualValue, boolean isImmediate) throws Exception {
}
}
private class AttributeWhenReadyTestWorker extends DslTestWorker {
private AttributeSensor<String> sensor;
private String expectedValue;
+ private ListenableScheduledFuture<?> future;
public AttributeWhenReadyTestWorker(TestApplication parent, AttributeSensor<String> sensor, BrooklynDslDeferredSupplier<?> dsl) {
super(parent, dsl, sensor.getType());
@@ -234,21 +240,28 @@ public class DslTest extends BrooklynAppUnitTestSupport {
entity.sensors().set(sensor, expectedValue);
}
};
- if (eventually) {
- executor.schedule(job, random.nextInt(20), TimeUnit.MILLISECONDS);
+ if (satisfiedAsynchronously) {
+ future = executor.schedule(job, random.nextInt(20), TimeUnit.MILLISECONDS);
} else {
job.run();
}
}
@Override
- protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
- assertEquals(actualValue.get(), expectedValue);
+ protected void postResolve(TestEntity entity, Maybe<?> actualValue, boolean isImmediate) throws Exception {
+ if (satisfiedAsynchronously && isImmediate) {
+ // We accept a maybe.absent if we called getImmediately when satisfiedAsynchronously
+ assertTrue(actualValue.isAbsent() || expectedValue.equals(actualValue.get()), "actual="+actualValue+"; expected="+expectedValue);
+ } else {
+ assertEquals(actualValue.get(), expectedValue);
+ }
- if (eventually) {
- // Reset sensor - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
- entity.sensors().set(sensor, null);
+ if (future != null) {
+ future.get(Asserts.DEFAULT_LONG_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+ future = null;
}
+ // Reset sensor - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
+ entity.sensors().set(sensor, null);
}
}
@@ -263,7 +276,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
}
@Override
- protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
+ protected void postResolve(TestEntity entity, Maybe<?> actualValue, boolean isImmediate) {
assertEquals(actualValue.get(), entity);
}
@@ -275,7 +288,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
}
@Override
- protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
+ protected void postResolve(TestEntity entity, Maybe<?> actualValue, boolean isImmediate) {
assertEquals(actualValue.get(), parent);
}
}
@@ -304,7 +317,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
}
}
- static Maybe<?> execDslRealRealyQuick(BrooklynDslDeferredSupplier<?> dsl, Class<?> type, Entity context) {
+ static Maybe<?> execDslRealRealQuick(BrooklynDslDeferredSupplier<?> dsl, Class<?> type, Entity context) {
return execDslEventually(dsl, type, context, ValueResolver.REAL_REAL_QUICK_WAIT);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
index 03a1da6..ac0aae4 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
@@ -30,7 +30,7 @@ public interface ImmediateSupplier<T> {
* Indicates that we are unable to get the value immediately, because that is not supported
* (e.g. because the supplier is composed of sub-tasks that do not support {@link ImmediateSupplier}.
*/
- public static class ImmediateUnsupportedException extends RuntimeException {
+ public static class ImmediateUnsupportedException extends UnsupportedOperationException {
private static final long serialVersionUID = -7942339715007942797L;
public ImmediateUnsupportedException(String message) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
index 10fd665..4c3cbe2 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
@@ -379,12 +379,9 @@ public class ValueResolver<T> implements DeferredSupplier<T> {
.tagIfNotNull(BrooklynTaskTags.getTargetOrContextEntityTag(Tasks.current()));
if (isTransientTask) tb.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG);
+ // Note that immediate resolution is handled by using ImmediateSupplier (using an instanceof check),
+ // so that it executes in the current thread instead of using task execution.
Task<Object> vt = exec.submit(tb.build());
- // TODO to handle immediate resolution, it would be nice to be able to submit
- // so it executes in the current thread,
- // or put a marker in the target thread or task while it is running that the task
- // should never wait on anything other than another value being resolved
- // (though either could recurse infinitely)
Maybe<Object> vm = Durations.get(vt, timer);
vt.cancel(true);
if (vm.isAbsent()) return (Maybe<T>)vm;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index a16576c..ea91f36 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.core.test.policy.TestEnricher;
import org.apache.brooklyn.core.test.policy.TestPolicy;
import org.apache.brooklyn.entity.group.BasicGroup;
import org.apache.brooklyn.test.Asserts;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -76,6 +77,23 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
app.start(ImmutableList.of(loc));
}
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ try {
+ super.tearDown();
+ } finally {
+ loc = null;
+ entity = null;
+ observedEntity = null;
+ observedChildEntity = null;
+ observedGroup = null;
+ observedMemberEntity = null;
+ otherEntity = null;
+ listener = null;
+ }
+ }
+
@Test
public void testSubscriptionReceivesEvents() {
entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
@@ -298,7 +316,6 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
entity.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener);
observedChildEntity.sensors().set(TestEntity.SEQUENCE, 123);
assertListenerCalledOnceWithContextEntityEventually(listener, entity);
- listener.clearEvents();
}
@Test
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index 43cd0ba..b13d2b5 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -59,13 +59,56 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
Assert.assertEquals(result.get(), "foo");
}
- public void testNoExecutionContextOnCompleted() {
+ public void testCompletedTaskReturnsResultImmediately() {
+ // Call ValueResolver.getMaybe() from this thread, which has no execution context.
+ // However, the task has already been submitted and we have waited for it to complete.
+ // Therefore the ValueResolver can simply check for task.isDone() and return its result immediately.
Task<String> t = newSleepTask(Duration.ZERO, "foo");
app.getExecutionContext().submit(t).getUnchecked();
Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
Assert.assertEquals(result.get(), "foo");
}
+ public void testUnsubmittedTaskWhenNoExecutionContextFails() {
+ // ValueResolver.getMaybe() is called with no execution context. Therefore it will not execute the task.
+ Task<String> t = newSleepTask(Duration.ZERO, "foo");
+ Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
+
+ Assert.assertTrue(result.isAbsent(), "result="+result);
+ Exception exception = ((Maybe.Absent<?>)result).getException();
+ Assert.assertTrue(exception.toString().contains("no execution context available"), "exception="+exception);
+ }
+
+ public void testUnsubmittedTaskWithExecutionContextExecutesAndReturns() {
+ // ValueResolver.getMaybe() is called in app's execution context. Therefore it will execute the task.
+ final Task<String> t = newSleepTask(Duration.ZERO, "foo");
+
+ Maybe<String> result = app.getExecutionContext()
+ .submit(new Callable<Maybe<String> >() {
+ public Maybe<String> call() throws Exception {
+ return Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe();
+ }})
+ .getUnchecked();
+
+ Assert.assertEquals(result.get(), "foo");
+ }
+
+ public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOut() {
+ // ValueResolver.getMaybe() is called in app's execution context. Therefore it will execute the task.
+ final Task<String> t = newSleepTask(Duration.ONE_MINUTE, "foo");
+
+ Maybe<String> result = app.getExecutionContext()
+ .submit(new Callable<Maybe<String> >() {
+ public Maybe<String> call() throws Exception {
+ return Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
+ }})
+ .getUnchecked();
+
+ Assert.assertTrue(result.isAbsent(), "result="+result);
+ Exception exception = ((Maybe.Absent<?>)result).getException();
+ Assert.assertTrue(exception.toString().contains("not completed when immediate completion requested"), "exception="+exception);
+ }
+
public void testSwallowError() {
ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app).swallowExceptions();
assertMaybeIsAbsent(result);
[2/6] brooklyn-server git commit: Subscription callbacks: task has
contextEntity tag
Posted by sv...@apache.org.
Subscription callbacks: task has contextEntity tag
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/3f37e657
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/3f37e657
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/3f37e657
Branch: refs/heads/master
Commit: 3f37e657a3efd0b1b025008ab1070713b9b8badc
Parents: c4cc0d1
Author: Aled Sage <al...@gmail.com>
Authored: Thu Oct 13 22:46:57 2016 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 11:06:02 2016 +0000
----------------------------------------------------------------------
.../internal/AbstractManagementContext.java | 6 +-
.../mgmt/internal/BasicSubscriptionContext.java | 3 +-
.../mgmt/internal/LocalSubscriptionManager.java | 35 +++++----
.../core/mgmt/internal/Subscription.java | 1 +
.../core/entity/EntitySubscriptionTest.java | 76 ++++++++++++++++----
.../entity/RecordingSensorEventListener.java | 16 ++++-
6 files changed, 109 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index ea84f3c..b57e6eb 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -66,6 +66,7 @@ import org.apache.brooklyn.core.internal.storage.impl.BrooklynStorageImpl;
import org.apache.brooklyn.core.internal.storage.impl.inmemory.InMemoryDataGridFactory;
import org.apache.brooklyn.core.location.BasicLocationRegistry;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
import org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContextSequential;
import org.apache.brooklyn.core.mgmt.classloading.JavaBrooklynClassLoadingContext;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
@@ -87,6 +88,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public abstract class AbstractManagementContext implements ManagementContextInternal {
@@ -277,7 +280,8 @@ public abstract class AbstractManagementContext implements ManagementContextInte
@Override
public SubscriptionContext getSubscriptionContext(Entity e) {
// BSC is a thin wrapper around SM so fine to create a new one here
- return new BasicSubscriptionContext(getSubscriptionManager(), e);
+ Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e)));
+ return new BasicSubscriptionContext(flags, getSubscriptionManager(), e);
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
index 57d4712..2c6ab65 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import groovy.lang.Closure;
@@ -55,7 +56,7 @@ public class BasicSubscriptionContext implements SubscriptionContext {
private final Map<String,Object> flags;
public BasicSubscriptionContext(SubscriptionManager manager, Object subscriber) {
- this(Collections.<String,Object>emptyMap(), manager, subscriber);
+ this(ImmutableMap.<String, Object>of(), manager, subscriber);
}
public BasicSubscriptionContext(Map<String, ?> flags, SubscriptionManager manager, Object subscriber) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index f9606f8..fd15cf3 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -21,12 +21,12 @@ package org.apache.brooklyn.core.mgmt.internal;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.join;
-import static org.apache.brooklyn.util.JavaGroovyEquivalents.mapOf;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +43,7 @@ import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.task.BasicExecutionManager;
import org.apache.brooklyn.util.core.task.SingleThreadedScheduler;
@@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Predicate;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimaps;
/**
@@ -97,6 +99,12 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
Entity producer = s.producer;
Sensor<T> sensor= s.sensor;
s.subscriber = getSubscriber(flags, s);
+ if (flags.containsKey("tags") || flags.containsKey("tag")) {
+ Iterable<?> tags = (Iterable<?>) flags.get("tags");
+ Object tag = flags.get("tag");
+ s.subscriberExtraExecTags = (tag == null) ? tags : (tags == null ? ImmutableList.of(tag) : MutableList.builder().addAll(tags).add(tag).build());
+ }
+
if (flags.containsKey("subscriberExecutionManagerTag")) {
s.subscriberExecutionManagerTag = flags.remove("subscriberExecutionManagerTag");
s.subscriberExecutionManagerTagSupplied = true;
@@ -130,8 +138,13 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
} else {
if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
- Map<String, Object> tagsMap = MutableMap.of("tag", s.subscriberExecutionManagerTag);
- em.submit(tagsMap, new Runnable() {
+ List<Object> tags = MutableList.builder()
+ .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+ .add(s.subscriberExecutionManagerTag)
+ .build()
+ .asUnmodifiable();
+ Map<String, ?> execFlags = MutableMap.of("tags", tags);
+ em.submit(execFlags, new Runnable() {
@Override
public String toString() {
return "LSM.publishInitialValue("+s.producer+", "+s.sensor+")";
@@ -222,16 +235,14 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
continue;
final Subscription sAtClosureCreation = s;
-// Set<Object> tags = MutableSet.of();
-// if (s.subscriberExecutionManagerTag!=null) tags.add(s.subscriberExecutionManagerTag);
-// if (event.getSource()!=null) tags.add(BrooklynTaskTags.tagForContextEntity(event.getSource()));
-// Map<String, Object> tagsMap = mapOf("tags", (Object)tags);
- // use code above, instead of line below, if we want subscription deliveries associated with the entity;
- // that will cause them to be cancelled when the entity is unmanaged
- // (not sure that is useful, and likely NOT worth the expense, but it might be...) -Alex Oct 2014
- Map<String, Object> tagsMap = mapOf("tag", s.subscriberExecutionManagerTag);
+ List<Object> tags = MutableList.builder()
+ .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+ .add(s.subscriberExecutionManagerTag)
+ .build()
+ .asUnmodifiable();
+ Map<String, ?> execFlags = MutableMap.of("tags", tags);
- em.submit(tagsMap, new Runnable() {
+ em.submit(execFlags, new Runnable() {
@Override
public String toString() {
return "LSM.publish("+event+")";
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
index 66706a1..5e71701 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
@@ -37,6 +37,7 @@ class Subscription<T> implements SubscriptionHandle {
public Object subscriberExecutionManagerTag;
/** whether the tag was supplied by user, in which case we should not clear execution semantics */
public boolean subscriberExecutionManagerTagSupplied;
+ public Iterable<?> subscriberExtraExecTags;
public final Entity producer;
public final Sensor<T> sensor;
public final SensorEventListener<? super T> listener;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index 3d61a99..a16576c 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -20,16 +20,20 @@ package org.apache.brooklyn.core.entity;
import static org.testng.Assert.assertEquals;
-import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.EnricherSpec;
import org.apache.brooklyn.core.location.SimulatedLocation;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.sensor.BasicSensorEvent;
-import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.core.test.policy.TestEnricher;
+import org.apache.brooklyn.core.test.policy.TestPolicy;
import org.apache.brooklyn.entity.group.BasicGroup;
import org.apache.brooklyn.test.Asserts;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -37,14 +41,13 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-public class EntitySubscriptionTest {
+public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
// TODO Duplication between this and PolicySubscriptionTest
private static final long SHORT_WAIT_MS = 100;
private SimulatedLocation loc;
- private TestApplication app;
private TestEntity entity;
private TestEntity observedEntity;
private BasicGroup observedGroup;
@@ -52,10 +55,11 @@ public class EntitySubscriptionTest {
private TestEntity observedMemberEntity;
private TestEntity otherEntity;
private RecordingSensorEventListener<Object> listener;
-
+
@BeforeMethod(alwaysRun=true)
- public void setUp() {
- app = TestApplication.Factory.newManagedInstanceForTests();
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
loc = app.newSimulatedLocation();
entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
observedEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
@@ -72,11 +76,6 @@ public class EntitySubscriptionTest {
app.start(ImmutableList.of(loc));
}
- @AfterMethod(alwaysRun=true)
- public void tearDown() {
- if (app != null) Entities.destroyAll(app.getManagementContext());
- }
-
@Test
public void testSubscriptionReceivesEvents() {
entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
@@ -280,4 +279,55 @@ public class EntitySubscriptionTest {
entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, TestEntity.NAME, listener);
entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, null, listener);
}
+
+ @Test
+ public void testContextEntityOnSubscriptionCallbackTask() {
+ observedEntity.sensors().set(TestEntity.NAME, "myval");
+ entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener);
+
+ // notify-of-initial-value should give us our entity
+ assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+ listener.clearEvents();
+
+ // as should subsequent events
+ observedEntity.sensors().set(TestEntity.NAME, "myval2");
+ assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+ listener.clearEvents();
+
+ // same for subscribing to children: context should be the subscriber
+ entity.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener);
+ observedChildEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+ listener.clearEvents();
+ }
+
+ @Test
+ public void testContextEntityOnPolicySubscriptionCallbackTask() {
+ TestPolicy policy = entity.policies().add(PolicySpec.create(TestPolicy.class));
+ policy.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+
+ observedEntity.sensors().set(TestEntity.NAME, "myval");
+ assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+ }
+
+ @Test
+ public void testContextEntityOnEnricherSubscriptionCallbackTask() {
+ TestEnricher enricher = entity.enrichers().add(EnricherSpec.create(TestEnricher.class));
+ enricher.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+
+ observedEntity.sensors().set(TestEntity.NAME, "myval");
+ assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+ }
+
+ protected void assertListenerCalledEventually(final RecordingSensorEventListener<?> listener, final int expectedEventCount) {
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents().size(), expectedEventCount);
+ }});
+ }
+
+ protected void assertListenerCalledOnceWithContextEntityEventually(final RecordingSensorEventListener<?> listener, final Entity expectedContext) {
+ assertListenerCalledEventually(listener, 1);
+ assertEquals(BrooklynTaskTags.getContextEntity(Iterables.getOnlyElement(listener.getTasks())), entity);
+ }
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
index 44920ed..f5384b2 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
@@ -25,8 +25,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.core.task.Tasks;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
@@ -41,6 +44,8 @@ import com.google.common.primitives.Longs;
public class RecordingSensorEventListener<T> implements SensorEventListener<T>, Iterable<SensorEvent<T>> {
private final List<SensorEvent<T>> events = Lists.newCopyOnWriteArrayList();
+ private final List<Task<?>> tasks = Lists.newCopyOnWriteArrayList();
+
private final boolean suppressDuplicates;
private T lastValue;
@@ -56,6 +61,7 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
public void onEvent(SensorEvent<T> event) {
if (!suppressDuplicates || events.isEmpty() || !Objects.equals(lastValue, event.getValue())) {
events.add(event);
+ tasks.add(Tasks.current());
lastValue = event.getValue();
}
}
@@ -68,6 +74,13 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
}
/**
+ * The {@link {@link Tasks#current()} for each call to {@link #onEvent(SensorEvent)}
+ */
+ public List<Task<?>> getTasks() {
+ return MutableList.copyOf(tasks).asUnmodifiable();
+ }
+
+ /**
* @return A live read-only view of recorded events.
*/
public Iterable<T> getEventValues() {
@@ -89,7 +102,8 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
* Clears all events recorded by the listener.
*/
public void clearEvents() {
- this.events.clear();
+ events.clear();
+ tasks.clear();
lastValue = null;
}
[5/6] brooklyn-server git commit: More use of
Tasks.resolving.immediately
Posted by sv...@apache.org.
More use of Tasks.resolving.immediately
Similar to BROOKLYN-356, use more efficient thread usage and avoid
premature timeouts if threads are starved.
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/0ded2cbd
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/0ded2cbd
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/0ded2cbd
Branch: refs/heads/master
Commit: 0ded2cbdd8233101a44961f1f6389809c01b4f2a
Parents: 8601d21
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 1 19:56:59 2016 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 20:50:58 2016 +0000
----------------------------------------------------------------------
.../java/org/apache/brooklyn/enricher/stock/Propagator.java | 6 +++---
.../org/apache/brooklyn/enricher/stock/reducer/Reducer.java | 6 +++++-
.../brooklyn/rest/resources/AbstractBrooklynRestResource.java | 7 +++++--
.../apache/brooklyn/rest/resources/EntityConfigResource.java | 2 +-
.../org/apache/brooklyn/rest/resources/SensorResource.java | 2 +-
.../apache/brooklyn/rest/transform/EffectorTransformer.java | 7 +++++--
6 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
index a8f9f97..76d3cba 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
@@ -220,8 +220,8 @@ public class Propagator extends AbstractEnricher implements SensorEventListener<
for (Map.Entry<?,?> entry : mapping.entrySet()) {
Object keyO = entry.getKey();
Object valueO = entry.getValue();
- Sensor<?> key = Tasks.resolving(keyO).as(Sensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
- Sensor<?> value = Tasks.resolving(valueO).as(Sensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+ Sensor<?> key = Tasks.resolving(keyO).as(Sensor.class).immediately(true).context(producer).get();
+ Sensor<?> value = Tasks.resolving(valueO).as(Sensor.class).immediately(true).context(producer).get();
result.put(key, value);
}
return result;
@@ -233,7 +233,7 @@ public class Propagator extends AbstractEnricher implements SensorEventListener<
}
List<Sensor<?>> result = MutableList.of();
for (Object sensorO : sensors) {
- Sensor<?> sensor = Tasks.resolving(sensorO).as(Sensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+ Sensor<?> sensor = Tasks.resolving(sensorO).as(Sensor.class).immediately(true).context(producer).get();
result.add(sensor);
}
return result;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
index 558d50a..d2189ac 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
@@ -78,7 +78,11 @@ public class Reducer extends AbstractEnricher implements SensorEventListener<Obj
List<AttributeSensor<?>> sensorListTemp = Lists.newArrayList();
for (Object sensorO : getConfig(SOURCE_SENSORS)) {
- AttributeSensor<?> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+ AttributeSensor<?> sensor = Tasks.resolving(sensorO)
+ .as(AttributeSensor.class)
+ .immediately(true)
+ .context(producer)
+ .get();
Optional<? extends Sensor<?>> foundSensor = Iterables.tryFind(sensorListTemp,
SensorPredicates.nameEqualTo(sensor.getName()));
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/AbstractBrooklynRestResource.java
----------------------------------------------------------------------
diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/AbstractBrooklynRestResource.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/AbstractBrooklynRestResource.java
index 2c47866..02e856f 100644
--- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/AbstractBrooklynRestResource.java
+++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/AbstractBrooklynRestResource.java
@@ -105,6 +105,7 @@ public abstract class AbstractBrooklynRestResource {
private @Nullable Entity entity;
private @Nullable Duration timeout;
private @Nullable Object rendererHintSource;
+ private @Nullable Boolean immediately;
public static RestValueResolver resolving(Object v) { return new RestValueResolver(v); }
@@ -130,10 +131,11 @@ public abstract class AbstractBrooklynRestResource {
public RestValueResolver raw(Boolean raw) { this.raw = raw; return this; }
public RestValueResolver context(Entity entity) { this.entity = entity; return this; }
public RestValueResolver timeout(Duration timeout) { this.timeout = timeout; return this; }
+ public RestValueResolver immediately(boolean immediately) { this.immediately = immediately; return this; }
public RestValueResolver renderAs(Object rendererHintSource) { this.rendererHintSource = rendererHintSource; return this; }
public Object resolve() {
- Object valueResult = getImmediateValue(valueToResolve, entity, timeout);
+ Object valueResult = getImmediateValue(valueToResolve, entity, immediately, timeout);
if (valueResult==UNRESOLVED) valueResult = valueToResolve;
if (rendererHintSource!=null && Boolean.FALSE.equals(raw)) {
valueResult = RendererHints.applyDisplayValueHintUnchecked(rendererHintSource, valueResult);
@@ -143,11 +145,12 @@ public abstract class AbstractBrooklynRestResource {
private static Object UNRESOLVED = "UNRESOLVED".toCharArray();
- private static Object getImmediateValue(Object value, @Nullable Entity context, @Nullable Duration timeout) {
+ private static Object getImmediateValue(Object value, @Nullable Entity context, @Nullable Boolean immediately, @Nullable Duration timeout) {
return Tasks.resolving(value)
.as(Object.class)
.defaultValue(UNRESOLVED)
.timeout(timeout)
+ .immediately(immediately == null ? false : immediately.booleanValue())
.context(context)
.swallowExceptions()
.get();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
----------------------------------------------------------------------
diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
index 5a2ac0d..fb9dc32 100644
--- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
+++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
@@ -151,7 +151,7 @@ public class EntityConfigResource extends AbstractBrooklynRestResource implement
}
Object value = ((EntityInternal)entity).config().getRaw(ck).orNull();
- return resolving(value).preferJson(preferJson).asJerseyOutermostReturnValue(true).raw(raw).context(entity).timeout(ValueResolver.PRETTY_QUICK_WAIT).renderAs(ck).resolve();
+ return resolving(value).preferJson(preferJson).asJerseyOutermostReturnValue(true).raw(raw).context(entity).immediately(true).renderAs(ck).resolve();
}
private ConfigKey<?> findConfig(Entity entity, String configKeyName) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/SensorResource.java
----------------------------------------------------------------------
diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/SensorResource.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/SensorResource.java
index 750b7de..c51043e 100644
--- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/SensorResource.java
+++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/SensorResource.java
@@ -113,7 +113,7 @@ public class SensorResource extends AbstractBrooklynRestResource implements Sens
}
Object value = entity.getAttribute(sensor);
- return resolving(value).preferJson(preferJson).asJerseyOutermostReturnValue(true).raw(raw).context(entity).timeout(ValueResolver.PRETTY_QUICK_WAIT).renderAs(sensor).resolve();
+ return resolving(value).preferJson(preferJson).asJerseyOutermostReturnValue(true).raw(raw).context(entity).immediately(true).renderAs(sensor).resolve();
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/transform/EffectorTransformer.java
----------------------------------------------------------------------
diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/transform/EffectorTransformer.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/transform/EffectorTransformer.java
index 9304008..6987b5a 100644
--- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/transform/EffectorTransformer.java
+++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/transform/EffectorTransformer.java
@@ -80,8 +80,11 @@ public class EffectorTransformer {
@SuppressWarnings({ "unchecked", "rawtypes" })
protected static EffectorSummary.ParameterSummary<?> parameterSummary(Entity entity, ParameterType<?> parameterType) {
try {
- Maybe<?> defaultValue = Tasks.resolving(parameterType.getDefaultValue()).as(parameterType.getParameterClass())
- .context(entity).timeout(ValueResolver.REAL_QUICK_WAIT).getMaybe();
+ Maybe<?> defaultValue = Tasks.resolving(parameterType.getDefaultValue())
+ .as(parameterType.getParameterClass())
+ .context(entity)
+ .immediately(true)
+ .getMaybe();
return new ParameterSummary(parameterType.getName(), parameterType.getParameterClassName(),
parameterType.getDescription(),
WebResourceUtils.getValueForDisplay(defaultValue.orNull(), true, false),
[4/6] brooklyn-server git commit: PR #390: incorporate more review
comments
Posted by sv...@apache.org.
PR #390: incorporate more review comments
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/8601d219
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/8601d219
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/8601d219
Branch: refs/heads/master
Commit: 8601d219f4a5bec4ab751257b7c022545bfdd92b
Parents: cee60cc
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 1 17:48:46 2016 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 20:50:58 2016 +0000
----------------------------------------------------------------------
.../spi/dsl/methods/BrooklynDslCommon.java | 2 +
.../brooklyn/spi/dsl/methods/DslComponent.java | 62 ++++++---
.../brooklyn/camp/brooklyn/dsl/DslTest.java | 134 ++++++++++++++++++-
.../AbstractConfigurationSupportInternal.java | 2 +-
.../util/core/task/ValueResolverTest.java | 25 ++--
5 files changed, 191 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8601d219/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
index f508db9..8fb48cf 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
@@ -584,6 +584,8 @@ public class BrooklynDslCommon {
@Override
public final Maybe<Object> getImmediately() {
+ // Note this call to getConfig() is different from entity.getConfig.
+ // We expect it to not block waiting for other entities.
ManagementContextInternal managementContext = DslExternal.managementContext();
return Maybe.<Object>of(managementContext.getExternalConfigProviderRegistry().getConfig(providerName, key));
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8601d219/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
index cded853..720dfae 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
@@ -37,6 +37,7 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.internal.EntityManagerInternal;
import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.util.core.task.ImmediateSupplier;
import org.apache.brooklyn.util.core.task.TaskBuilder;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -91,11 +92,7 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
@Override
public final Maybe<Entity> getImmediately() {
- try {
- return Maybe.of(new EntityInScopeFinder(scopeComponent, scope, componentId).call());
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
+ return new EntityInScopeFinder(scopeComponent, scope, componentId).getImmediately();
}
@Override
@@ -108,7 +105,7 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
.build();
}
- protected static class EntityInScopeFinder implements Callable<Entity> {
+ protected static class EntityInScopeFinder implements Callable<Entity>, ImmediateSupplier<Entity> {
protected final DslComponent scopeComponent;
protected final Scope scope;
protected final String componentId;
@@ -119,33 +116,55 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
this.componentId = componentId;
}
- protected EntityInternal getEntity() {
- if (scopeComponent!=null) {
- return (EntityInternal)scopeComponent.get();
- } else {
- return entity();
+ @Override
+ public Maybe<Entity> getImmediately() {
+ try {
+ return callImpl(true);
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
}
}
-
+
@Override
public Entity call() throws Exception {
+ return callImpl(false).get();
+ }
+
+ protected Maybe<Entity> getEntity(boolean immediate) {
+ if (scopeComponent != null) {
+ if (immediate) {
+ return scopeComponent.getImmediately();
+ } else {
+ return Maybe.of(scopeComponent.get());
+ }
+ } else {
+ return Maybe.<Entity>of(entity());
+ }
+ }
+
+ protected Maybe<Entity> callImpl(boolean immediate) throws Exception {
+ Maybe<Entity> entityMaybe = getEntity(immediate);
+ if (immediate && entityMaybe.isAbsent()) {
+ return entityMaybe;
+ }
+ EntityInternal entity = (EntityInternal) entityMaybe.get();
+
Iterable<Entity> entitiesToSearch = null;
- EntityInternal entity = getEntity();
Predicate<Entity> notSelfPredicate = Predicates.not(Predicates.<Entity>equalTo(entity));
switch (scope) {
case THIS:
- return entity;
+ return Maybe.<Entity>of(entity);
case PARENT:
- return entity.getParent();
+ return Maybe.<Entity>of(entity.getParent());
case GLOBAL:
entitiesToSearch = ((EntityManagerInternal)entity.getManagementContext().getEntityManager())
.getAllEntitiesInApplication( entity().getApplication() );
break;
case ROOT:
- return entity.getApplication();
+ return Maybe.<Entity>of(entity.getApplication());
case SCOPE_ROOT:
- return Entities.catalogItemScopeRoot(entity);
+ return Maybe.<Entity>of(Entities.catalogItemScopeRoot(entity));
case DESCENDANT:
entitiesToSearch = Entities.descendantsWithoutSelf(entity);
break;
@@ -165,8 +184,9 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
Optional<Entity> result = Iterables.tryFind(entitiesToSearch, EntityPredicates.configEqualTo(BrooklynCampConstants.PLAN_ID, componentId));
- if (result.isPresent())
- return result.get();
+ if (result.isPresent()) {
+ return Maybe.of(result.get());
+ }
// TODO may want to block and repeat on new entities joining?
throw new NoSuchElementException("No entity matching id " + componentId+
@@ -340,9 +360,9 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
public final Maybe<Object> getImmediately() {
Maybe<Entity> targetEntityMaybe = component.getImmediately();
if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
- Entity targetEntity = targetEntityMaybe.get();
+ EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get();
- return Maybe.of(targetEntity.getConfig(ConfigKeys.newConfigKey(Object.class, keyName)));
+ return targetEntity.config().getNonBlocking(ConfigKeys.newConfigKey(Object.class, keyName));
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8601d219/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
index 8db3720..69a0aee 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
@@ -32,6 +32,8 @@ import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.camp.brooklyn.spi.dsl.BrooklynDslDeferredSupplier;
import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.BrooklynDslCommon;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestApplication;
@@ -47,6 +49,8 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
@@ -116,6 +120,74 @@ public class DslTest extends BrooklynAppUnitTestSupport {
}
@Test
+ public void testConfig() throws Exception {
+ ConfigKey<String> configKey = ConfigKeys.newStringConfigKey("testConfig");
+ BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.config(configKey.getName());
+ new ConfigTestWorker(app, configKey, dsl).run();
+ }
+
+ @Test
+ public void testConfigWithDsl() throws Exception {
+ ConfigKey<?> configKey = ConfigKeys.newConfigKey(Entity.class, "testConfig");
+ BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.config(configKey.getName());
+ Supplier<ConfigValuePair> valueSupplier = new Supplier<ConfigValuePair>() {
+ @Override public ConfigValuePair get() {
+ return new ConfigValuePair(BrooklynDslCommon.root(), app);
+ }
+ };
+ new ConfigTestWorker(app, configKey, valueSupplier, dsl).run();
+ }
+
+ @Test
+ public void testConfigWithDslNotReadyImmediately() throws Exception {
+ final ConfigKey<String> configKey = ConfigKeys.newStringConfigKey("testConfig");
+ BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.config(configKey.getName());
+ Function<Entity, ConfigValuePair> valueSupplier = new Function<Entity, ConfigValuePair>() {
+ private ListenableScheduledFuture<?> future;
+ @Override
+ public ConfigValuePair apply(final Entity entity) {
+ try {
+ // If executed in a loop, then wait for previous call's future to complete.
+ // If previous assertion used getImmediately, then it won't have waited for the future to complete.
+ if (future != null) {
+ future.get(Asserts.DEFAULT_LONG_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+ future = null;
+ }
+
+ // Reset sensor - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
+ entity.sensors().set(TestApplication.MY_ATTRIBUTE, null);
+
+ final String expectedValue = Identifiers.makeRandomId(10);
+ Runnable job = new Runnable() {
+ public void run() {
+ entity.sensors().set(TestApplication.MY_ATTRIBUTE, expectedValue);
+ }
+ };
+ future = executor.schedule(job, random.nextInt(20), TimeUnit.MILLISECONDS);
+
+ BrooklynDslDeferredSupplier<?> attributeDsl = BrooklynDslCommon.attributeWhenReady(TestApplication.MY_ATTRIBUTE.getName());
+ return new ConfigValuePair(attributeDsl, expectedValue);
+
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+ };
+ new ConfigTestWorker(app, configKey, valueSupplier, dsl).satisfiedAsynchronously(true).resolverIterations(2).run();
+ }
+
+ @Test
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public void testConfigImmediatelyDoesNotBlock() throws Exception {
+ ConfigKey<String> configKey = ConfigKeys.newStringConfigKey("testConfig");
+ BrooklynDslDeferredSupplier<?> attributeDsl = BrooklynDslCommon.attributeWhenReady(TestApplication.MY_ATTRIBUTE.getName());
+ app.config().set((ConfigKey)configKey, attributeDsl); // ugly cast because val is DSL, resolving to a string
+ BrooklynDslDeferredSupplier<?> configDsl = BrooklynDslCommon.config(configKey.getName());
+ Maybe<?> actualValue = execDslImmediately(configDsl, configKey.getType(), app, true);
+ assertTrue(actualValue.isAbsent());
+ }
+
+ @Test
public void testSelf() throws Exception {
BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.self();
new SelfTestWorker(app, dsl).run();
@@ -263,7 +335,6 @@ public class DslTest extends BrooklynAppUnitTestSupport {
// Reset sensor - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
entity.sensors().set(sensor, null);
}
-
}
private static class SelfTestWorker extends DslTestWorker {
@@ -292,6 +363,67 @@ public class DslTest extends BrooklynAppUnitTestSupport {
assertEquals(actualValue.get(), parent);
}
}
+
+ private class ConfigTestWorker extends DslTestWorker {
+ private ConfigKey<?> config;
+ private Object expectedValue;
+ private Function<? super Entity, ConfigValuePair> valueFunction;
+
+ public ConfigTestWorker(TestApplication parent, ConfigKey<?> config, BrooklynDslDeferredSupplier<?> dsl) {
+ this(parent, config, newRandomConfigValueSupplier(), dsl);
+ }
+
+ public ConfigTestWorker(TestApplication parent, ConfigKey<?> config, Supplier<ConfigValuePair> valueSupplier, BrooklynDslDeferredSupplier<?> dsl) {
+ this(parent, config, Functions.forSupplier(valueSupplier), dsl);
+ }
+
+ public ConfigTestWorker(TestApplication parent, ConfigKey<?> config, Function<? super Entity, ConfigValuePair> valueFunction, BrooklynDslDeferredSupplier<?> dsl) {
+ super(parent, dsl, config.getType());
+ this.config = config;
+ this.valueFunction = valueFunction;
+ }
+
+ @Override
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected void preResolve(final TestEntity entity) {
+ ConfigValuePair pair = valueFunction.apply(entity);
+ expectedValue = pair.expectedResolvedVal;
+ entity.config().set((ConfigKey)config, pair.configVal); // nasty cast, because val might be a DSL
+ }
+
+ @Override
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ protected void postResolve(TestEntity entity, Maybe<?> actualValue, boolean isImmediate) throws Exception {
+ if (satisfiedAsynchronously && isImmediate) {
+ // We accept a maybe.absent if we called getImmediately when satisfiedAsynchronously
+ assertTrue(actualValue.isAbsent() || expectedValue.equals(actualValue.get()), "actual="+actualValue+"; expected="+expectedValue);
+ } else {
+ assertEquals(actualValue.get(), expectedValue);
+ }
+
+ // Reset config - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
+ entity.config().set((ConfigKey)config, (Object)null); // ugly cast from ConfigKey<?>
+ }
+ }
+
+ static class ConfigValuePair {
+ public final Object configVal;
+ public final Object expectedResolvedVal;
+
+ public ConfigValuePair(Object configVal, Object expectedResolvedVal) {
+ this.configVal = configVal;
+ this.expectedResolvedVal = expectedResolvedVal;
+ }
+ }
+
+ private static Supplier<ConfigValuePair> newRandomConfigValueSupplier() {
+ return new Supplier<ConfigValuePair>() {
+ @Override public ConfigValuePair get() {
+ String val = Identifiers.makeRandomId(10);
+ return new ConfigValuePair(val, val);
+ }
+ };
+ }
static Maybe<?> execDslImmediately(final BrooklynDslDeferredSupplier<?> dsl, final Class<?> type, final Entity context, boolean execInTask) throws Exception {
// Exec'ing immediately will call DSL in current thread. It needs to find the context entity,
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8601d219/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
index 61dc513..3465ecf 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
@@ -139,7 +139,7 @@ public abstract class AbstractConfigurationSupportInternal implements BrooklynOb
Object resolved = Tasks.resolving(unresolved)
.as(Object.class)
.defaultValue(marker)
- .timeout(ValueResolver.REAL_REAL_QUICK_WAIT)
+ .immediately(true)
.context(getContext())
.swallowExceptions()
.get();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8601d219/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index b13d2b5..b7e9085 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -60,29 +60,31 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
}
public void testCompletedTaskReturnsResultImmediately() {
- // Call ValueResolver.getMaybe() from this thread, which has no execution context.
- // However, the task has already been submitted and we have waited for it to complete.
- // Therefore the ValueResolver can simply check for task.isDone() and return its result immediately.
Task<String> t = newSleepTask(Duration.ZERO, "foo");
app.getExecutionContext().submit(t).getUnchecked();
+
+ // Below, we call ValueResolver.getMaybe() from this thread, which has no execution context.
+ // However, the task has already been submitted and we have waited for it to complete.
+ // Therefore the ValueResolver can simply check for task.isDone() and return its result immediately.
Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
Assert.assertEquals(result.get(), "foo");
}
public void testUnsubmittedTaskWhenNoExecutionContextFails() {
- // ValueResolver.getMaybe() is called with no execution context. Therefore it will not execute the task.
Task<String> t = newSleepTask(Duration.ZERO, "foo");
+
+ // Below, we call ValueResolver.getMaybe() with no execution context. Therefore it will not execute the task.
Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
Assert.assertTrue(result.isAbsent(), "result="+result);
- Exception exception = ((Maybe.Absent<?>)result).getException();
+ Exception exception = Maybe.getException(result);
Assert.assertTrue(exception.toString().contains("no execution context available"), "exception="+exception);
}
public void testUnsubmittedTaskWithExecutionContextExecutesAndReturns() {
- // ValueResolver.getMaybe() is called in app's execution context. Therefore it will execute the task.
final Task<String> t = newSleepTask(Duration.ZERO, "foo");
+ // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
Maybe<String> result = app.getExecutionContext()
.submit(new Callable<Maybe<String> >() {
public Maybe<String> call() throws Exception {
@@ -94,9 +96,10 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
}
public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOut() {
- // ValueResolver.getMaybe() is called in app's execution context. Therefore it will execute the task.
final Task<String> t = newSleepTask(Duration.ONE_MINUTE, "foo");
+ // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
+ // However, it will quickly timeout as the task will not have completed.
Maybe<String> result = app.getExecutionContext()
.submit(new Callable<Maybe<String> >() {
public Maybe<String> call() throws Exception {
@@ -105,7 +108,7 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
.getUnchecked();
Assert.assertTrue(result.isAbsent(), "result="+result);
- Exception exception = ((Maybe.Absent<?>)result).getException();
+ Exception exception = Maybe.getException(result);
Assert.assertTrue(exception.toString().contains("not completed when immediate completion requested"), "exception="+exception);
}
@@ -140,11 +143,11 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
assertContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
}
- public void testGetImmediateSupplierWithTimeoutUsesBlocking() {
+ public void testImmediateSupplierWithTimeoutUsesBlocking() {
MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).timeout(Asserts.DEFAULT_LONG_TIMEOUT).get();
assertNotNull(callInfo.task);
- assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+ assertNotContainsCallingMethod(callInfo.stackTrace, "testImmediateSupplierWithTimeoutUsesBlocking");
}
public void testGetImmediatelyInTask() throws Exception {
@@ -167,7 +170,7 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get();
assertNotNull(callInfo.task);
assertEquals(BrooklynTaskTags.getContextEntity(callInfo.task), app);
- assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+ assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediatelyFallsBackToDeferredCallInTask");
}
private static class MyImmediateAndDeferredSupplier implements ImmediateSupplier<CallInfo>, DeferredSupplier<CallInfo> {
[3/6] brooklyn-server git commit: BROOKLYN-356: threading for DSL task
Posted by sv...@apache.org.
BROOKLYN-356: threading for DSL task
Adds ImmediateSupplier, so can fetch Maybe<?> of DSL value immediately,
in current thread (rather than invoking in task with a timeout).
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/1cd2a091
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/1cd2a091
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/1cd2a091
Branch: refs/heads/master
Commit: 1cd2a09198d3fa962f08676abc2fea1fe3d499da
Parents: 3f37e65
Author: Aled Sage <al...@gmail.com>
Authored: Tue Oct 18 19:08:18 2016 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 11:06:02 2016 +0000
----------------------------------------------------------------------
.../brooklyn/api/mgmt/ManagementContext.java | 6 +-
.../spi/dsl/BrooklynDslDeferredSupplier.java | 4 +-
.../spi/dsl/methods/BrooklynDslCommon.java | 78 +++++++-
.../brooklyn/spi/dsl/methods/DslComponent.java | 62 +++++-
.../brooklyn/camp/brooklyn/dsl/DslTest.java | 192 +++++++++++++++----
.../TransformerEnricherWithDslTest.java | 9 +-
.../core/sensor/DependentConfiguration.java | 87 +++++++--
.../brooklyn/enricher/stock/Transformer.java | 3 +-
.../util/core/task/ImmediateSupplier.java | 50 +++++
.../brooklyn/util/core/task/ValueResolver.java | 27 ++-
.../brooklyn/util/core/task/TasksTest.java | 2 +-
.../util/core/task/ValueResolverTest.java | 187 ++++++++++++++----
12 files changed, 600 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
index cabadee..4931be7 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
@@ -154,7 +154,11 @@ public interface ManagementContext {
/**
* Returns a {@link SubscriptionContext} instance representing subscriptions
* (from the {@link SubscriptionManager}) associated with this entity, and capable
- * of conveniently subscribing on behalf of that entity
+ * of conveniently subscribing on behalf of that entity.
+ *
+ * For subscriptions made using this {@link SubscriptionContext}, the calls to
+ * {@link org.apache.brooklyn.api.sensor.SensorEventListener#onEvent(org.apache.brooklyn.api.sensor.SensorEvent)}
+ * will be made in a task that has the {@code CONTEXT_ENTITY} tag set to this entity (see BrooklynTaskTag).
*/
SubscriptionContext getSubscriptionContext(Entity entity);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
index 0225bbb..86f1c82 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
@@ -34,8 +34,10 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
import org.apache.brooklyn.util.core.task.BasicExecutionContext;
import org.apache.brooklyn.util.core.task.DeferredSupplier;
+import org.apache.brooklyn.util.core.task.ImmediateSupplier;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +66,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
* use {@code synchronized} because that is not interruptible - if someone tries to get the value
* and interrupts after a short wait, then we must release the lock immediately and return.
**/
-public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier<T>, TaskFactory<Task<T>>, Serializable {
+public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier<T>, ImmediateSupplier<T>, TaskFactory<Task<T>>, Serializable {
private static final long serialVersionUID = -8789624905412198233L;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
index a4bedf7..d92bf46 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
@@ -39,6 +39,7 @@ import org.apache.brooklyn.camp.brooklyn.spi.creation.BrooklynYamlTypeInstantiat
import org.apache.brooklyn.camp.brooklyn.spi.creation.EntitySpecConfiguration;
import org.apache.brooklyn.camp.brooklyn.spi.dsl.BrooklynDslDeferredSupplier;
import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.DslComponent.Scope;
+import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.config.external.ExternalConfigSupplier;
import org.apache.brooklyn.core.entity.EntityDynamicType;
import org.apache.brooklyn.core.entity.EntityInternal;
@@ -53,7 +54,9 @@ import org.apache.brooklyn.util.core.ClassLoaderUtils;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.core.task.ValueResolver;
import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.javalang.Reflections;
import org.apache.brooklyn.util.text.StringEscapes.JavaStringEscapes;
import org.apache.brooklyn.util.text.Strings;
@@ -252,6 +255,11 @@ public class BrooklynDslCommon {
}
@Override
+ public final Maybe<String> getImmediately() {
+ return DependentConfiguration.formatStringImmediately(pattern, args);
+ }
+
+ @Override
public Task<String> newTask() {
return DependentConfiguration.formatString(pattern, args);
}
@@ -295,6 +303,11 @@ public class BrooklynDslCommon {
}
@Override
+ public Maybe<String> getImmediately() {
+ return DependentConfiguration.regexReplacementImmediately(source, pattern, replacement);
+ }
+
+ @Override
public Task<String> newTask() {
return DependentConfiguration.regexReplacement(source, pattern, replacement);
}
@@ -398,6 +411,58 @@ public class BrooklynDslCommon {
this.config = MutableMap.copyOf(config);
}
+
+ @Override
+ public Maybe<Object> getImmediately() {
+ Class<?> type = this.type;
+ if (type == null) {
+ EntityInternal entity = entity();
+ try {
+ type = new ClassLoaderUtils(BrooklynDslCommon.class, entity).loadClass(typeName);
+ } catch (ClassNotFoundException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+ final Class<?> clazz = type;
+
+ final ExecutionContext executionContext = ((EntityInternal)entity()).getExecutionContext();
+
+ // Marker exception that one of our component-parts cannot yet be resolved -
+ // throwing and catching this allows us to abort fast.
+ // A bit messy to use exceptions in normal control flow, but this allows the Maps util methods to be used.
+ @SuppressWarnings("serial")
+ class UnavailableException extends RuntimeException {
+ }
+
+ final Function<Object, Object> resolver = new Function<Object, Object>() {
+ @Override public Object apply(Object value) {
+ Maybe<Object> result = Tasks.resolving(value, Object.class).context(executionContext).deep(true).immediately(true).getMaybe();
+ if (result.isAbsent()) {
+ throw new UnavailableException();
+ } else {
+ return result.get();
+ }
+ }
+ };
+
+ try {
+ Map<String, Object> resolvedFields = MutableMap.copyOf(Maps.transformValues(fields, resolver));
+ Map<String, Object> resolvedConfig = MutableMap.copyOf(Maps.transformValues(config, resolver));
+ List<Object> resolvedConstructorArgs = MutableList.copyOf(Lists.transform(constructorArgs, resolver));
+ List<Object> resolvedFactoryMethodArgs = MutableList.copyOf(Lists.transform(factoryMethodArgs, resolver));
+
+ Object result;
+ if (factoryMethodName == null) {
+ result = create(clazz, resolvedConstructorArgs, resolvedFields, resolvedConfig);
+ } else {
+ result = create(clazz, factoryMethodName, resolvedFactoryMethodArgs, resolvedFields, resolvedConfig);
+ }
+ return Maybe.of(result);
+ } catch (UnavailableException e) {
+ return Maybe.absent();
+ }
+ }
+
@Override
public Task<Object> newTask() {
Class<?> type = this.type;
@@ -463,7 +528,7 @@ public class BrooklynDslCommon {
}
}
- public static Object create(Class<?> type, String factoryMethodName, List<Object> factoryMethodArgs, Map<String,?> fields, Map<String,?> config) {
+ public static Object create(Class<?> type, String factoryMethodName, List<?> factoryMethodArgs, Map<String,?> fields, Map<String,?> config) {
try {
Object bean = Reflections.invokeMethodFromArgs(type, factoryMethodName, factoryMethodArgs).get();
BeanUtils.populate(bean, fields);
@@ -525,6 +590,12 @@ public class BrooklynDslCommon {
}
@Override
+ public final Maybe<Object> getImmediately() {
+ ManagementContextInternal managementContext = DslExternal.managementContext();
+ return Maybe.<Object>of(managementContext.getExternalConfigProviderRegistry().getConfig(providerName, key));
+ }
+
+ @Override
public Task<Object> newTask() {
return Tasks.<Object>builder()
.displayName("resolving external configuration: '" + key + "' from provider '" + providerName + "'")
@@ -597,6 +668,11 @@ public class BrooklynDslCommon {
}
@Override
+ public Maybe<Function<String, String>> getImmediately() {
+ return DependentConfiguration.regexReplacementImmediately(pattern, replacement);
+ }
+
+ @Override
public Task<Function<String, String>> newTask() {
return DependentConfiguration.regexReplacement(pattern, replacement);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
index 0d3321b..b134450 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
@@ -39,6 +39,8 @@ import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.util.core.task.TaskBuilder;
import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.StringEscapes.JavaStringEscapes;
@@ -88,6 +90,15 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
// ---------------------------
@Override
+ public final Maybe<Entity> getImmediately() {
+ try {
+ return Maybe.of(new EntityInScopeFinder(scopeComponent, scope, componentId).call());
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ @Override
public Task<Entity> newTask() {
return TaskBuilder.<Entity>builder()
.displayName(toString())
@@ -224,6 +235,11 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
}
@Override
+ public Maybe<Object> getImmediately() {
+ return Maybe.<Object>of(component.get().getId());
+ }
+
+ @Override
public Task<Object> newTask() {
Entity targetEntity = component.get();
return Tasks.create("identity", Callables.<Object>returning(targetEntity.getId()));
@@ -259,6 +275,17 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
this.sensorName = sensorName;
}
+ @Override
+ public final Maybe<Object> getImmediately() {
+ Entity targetEntity = component.getImmediately().get();
+ AttributeSensor<?> targetSensor = (AttributeSensor<?>) targetEntity.getEntityType().getSensor(sensorName);
+ if (targetSensor == null) {
+ targetSensor = Sensors.newSensor(Object.class, sensorName);
+ }
+ Object result = targetEntity.sensors().get(targetSensor);
+ return GroovyJavaMethods.truth(result) ? Maybe.of(result) : Maybe.absent();
+ }
+
@SuppressWarnings("unchecked")
@Override
public Task<Object> newTask() {
@@ -303,6 +330,12 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
}
@Override
+ public final Maybe<Object> getImmediately() {
+ Entity targetEntity = component.get();
+ return Maybe.of(targetEntity.getConfig(ConfigKeys.newConfigKey(Object.class, keyName)));
+ }
+
+ @Override
public Task<Object> newTask() {
return Tasks.builder()
.displayName("retrieving config for "+keyName)
@@ -353,6 +386,33 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
}
@Override
+ public Maybe<Sensor<?>> getImmediately() {
+ return getImmediately(sensorName, false);
+ }
+
+ protected Maybe<Sensor<?>> getImmediately(Object si, boolean resolved) {
+ if (si instanceof Sensor) {
+ return Maybe.<Sensor<?>>of((Sensor<?>)si);
+ } else if (si instanceof String) {
+ Entity targetEntity = component.get();
+ Sensor<?> result = null;
+ if (targetEntity!=null) {
+ result = targetEntity.getEntityType().getSensor((String)si);
+ }
+ if (result!=null) return Maybe.<Sensor<?>>of(result);
+ return Maybe.<Sensor<?>>of(Sensors.newSensor(Object.class, (String)si));
+ }
+ if (!resolved) {
+ // attempt to resolve, and recurse
+ final ExecutionContext executionContext = ((EntityInternal)entity()).getExecutionContext();
+ Maybe<Object> resolvedSi = Tasks.resolving(si, Object.class).deep(true).immediately(true).context(executionContext).getMaybe();
+ if (resolvedSi.isAbsent()) return Maybe.absent();
+ return getImmediately(resolvedSi.get(), true);
+ }
+ throw new IllegalStateException("Cannot resolve '"+sensorName+"' as a sensor (got type "+(si == null ? "null" : si.getClass().getName()+")"));
+ }
+
+ @Override
public Task<Sensor<?>> newTask() {
return Tasks.<Sensor<?>>builder()
.displayName("looking up sensor for "+sensorName)
@@ -380,7 +440,7 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
final ExecutionContext executionContext = ((EntityInternal)entity()).getExecutionContext();
return resolve(Tasks.resolveDeepValue(si, Object.class, executionContext), true);
}
- throw new IllegalStateException("Cannot resolve '"+sensorName+"' as a sensor");
+ throw new IllegalStateException("Cannot resolve '"+sensorName+"' as a sensor (got type "+(si == null ? "null" : si.getClass().getName()+")"));
}})
.build();
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
index 29cc7d2..d27e6c2 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
@@ -15,11 +15,16 @@
*/
package org.apache.brooklyn.camp.brooklyn.dsl;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
@@ -27,42 +32,79 @@ import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.camp.brooklyn.spi.dsl.BrooklynDslDeferredSupplier;
import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.BrooklynDslCommon;
+import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestApplication;
import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.core.task.ValueResolver;
+import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.time.Duration;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
public class DslTest extends BrooklynAppUnitTestSupport {
private static final int MAX_PARALLEL_RESOLVERS = 50;
- private static final int RESOLVER_ITERATIONS = 1000;
+ private static final int MANY_RESOLVER_ITERATIONS = 100;
+
+ private ListeningScheduledExecutorService executor;
+ private Random random = new Random();
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ try {
+ if (executor != null) executor.shutdownNow();
+ } finally {
+ super.tearDown();
+ }
+ }
+
+ @Test
+ public void testAttributeWhenReadyEmptyDoesNotBlock() throws Exception {
+ BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestApplication.MY_ATTRIBUTE.getName());
+ Maybe<?> actualValue = execDslRealRealyQuick(dsl, TestApplication.MY_ATTRIBUTE.getType(), app);
+ assertTrue(actualValue.isAbsent());
+ }
@Test
- public void testAttributeWhenReadyEmptyDoesNotBlock() {
+ public void testAttributeWhenReadyEmptyImmediatelyDoesNotBlock() throws Exception {
BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestApplication.MY_ATTRIBUTE.getName());
- Maybe<? super String> actualValue = Tasks.resolving(dsl).as(TestEntity.NAME.getType())
- .context(app)
- .description("Computing sensor "+TestEntity.NAME+" from "+dsl)
- .timeout(ValueResolver.REAL_REAL_QUICK_WAIT)
- .getMaybe();
+ Maybe<?> actualValue = execDslImmediately(dsl, TestApplication.MY_ATTRIBUTE.getType(), app, true);
assertTrue(actualValue.isAbsent());
}
@Test
- public void testAttributeWhenReady() {
+ public void testAttributeWhenReady() throws Exception {
BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestEntity.NAME.getName());
new AttributeWhenReadyTestWorker(app, TestEntity.NAME, dsl).run();
}
+ @Test
+ public void testAttributeWhenReadyBlocksUntilReady() throws Exception {
+ // Fewer iterations, because there is a sleep each time
+ BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestEntity.NAME.getName());
+ new AttributeWhenReadyTestWorker(app, TestEntity.NAME, dsl).eventually(true).resolverIterations(2).run();
+ }
+
@Test(groups="Integration")
- public void testAttributeWhenReadyConcurrent() {
+ public void testAttributeWhenReadyConcurrent() throws Exception {
final BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestEntity.NAME.getName());
runConcurrentWorker(new Supplier<Runnable>() {
@Override
@@ -73,13 +115,13 @@ public class DslTest extends BrooklynAppUnitTestSupport {
}
@Test
- public void testSelf() {
+ public void testSelf() throws Exception {
BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.self();
new SelfTestWorker(app, dsl).run();
}
@Test(groups="Integration")
- public void testSelfConcurrent() {
+ public void testSelfConcurrent() throws Exception {
final BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.self();
runConcurrentWorker(new Supplier<Runnable>() {
@Override
@@ -90,13 +132,13 @@ public class DslTest extends BrooklynAppUnitTestSupport {
}
@Test
- public void testParent() {
+ public void testParent() throws Exception {
BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.parent();
new ParentTestWorker(app, dsl).run();
}
@Test(groups="Integration")
- public void testParentConcurrent() {
+ public void testParentConcurrent() throws Exception {
final BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.parent();
runConcurrentWorker(new Supplier<Runnable>() {
@Override
@@ -106,7 +148,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
});
}
- public void runConcurrentWorker(Supplier<Runnable> taskSupplier) {
+ protected void runConcurrentWorker(Supplier<Runnable> taskSupplier) {
Collection<Task<?>> results = new ArrayList<>();
for (int i = 0; i < MAX_PARALLEL_RESOLVERS; i++) {
Task<?> result = mgmt.getExecutionManager().submit(taskSupplier.get());
@@ -118,39 +160,64 @@ public class DslTest extends BrooklynAppUnitTestSupport {
}
private static class DslTestWorker implements Runnable {
- protected TestApplication parent;
- protected BrooklynDslDeferredSupplier<?> dsl;
+ protected final TestApplication parent;
+ protected final BrooklynDslDeferredSupplier<?> dsl;
+ protected final Class<?> type;
protected EntitySpec<TestEntity> childSpec = EntitySpec.create(TestEntity.class);
- protected Class<?> type;
-
+ protected int resolverIterations = MANY_RESOLVER_ITERATIONS;
+ protected boolean eventually = false;
+ private boolean wrapInTaskForImmediately = true;
+
public DslTestWorker(TestApplication parent, BrooklynDslDeferredSupplier<?> dsl, Class<?> type) {
- this.parent = parent;
- this.dsl = dsl;
- this.type = type;
+ this.parent = checkNotNull(parent, "parent");
+ this.dsl = checkNotNull(dsl, "dsl");
+ this.type = checkNotNull(type, "type");
}
+ public DslTestWorker resolverIterations(int val) {
+ resolverIterations = val;
+ return this;
+ }
+
+ public DslTestWorker eventually(boolean val) {
+ eventually = val;
+ return this;
+ }
+
+ public DslTestWorker wrapInTaskForImmediately(boolean val) {
+ wrapInTaskForImmediately = val;
+ return this;
+ }
+
@Override
public void run() {
- TestEntity entity = parent.createAndManageChild(childSpec);
- for (int i = 0; i < RESOLVER_ITERATIONS; i++) {
+ TestEntity entity = parent.addChild(childSpec);
+ for (int i = 0; i < resolverIterations; i++) {
preResolve(entity);
- Maybe<?> actualValue = Tasks.resolving(dsl).as(type)
- .context(entity)
- .description("Computing sensor "+type+" from "+dsl)
- .timeout(Duration.ONE_MINUTE)
- .getMaybe();
- postResolve(actualValue);
+ Maybe<?> eventualValue = execDslEventually(dsl, type, entity, Duration.FIVE_SECONDS);//FIXME ONE_MINUTE);
+ postResolve(entity, eventualValue);
+
+ if (!eventually) {
+ preResolve(entity);
+ Maybe<?> immediateValue;
+ try {
+ immediateValue = execDslImmediately(dsl, type, entity, wrapInTaskForImmediately);
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ postResolve(entity, immediateValue);
+ }
}
}
protected void preResolve(TestEntity entity) {
}
- protected void postResolve(Maybe<?> actualValue) {
+ protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
}
}
- private static class AttributeWhenReadyTestWorker extends DslTestWorker {
+ private class AttributeWhenReadyTestWorker extends DslTestWorker {
private AttributeSensor<String> sensor;
private String expectedValue;
@@ -160,33 +227,43 @@ public class DslTest extends BrooklynAppUnitTestSupport {
}
@Override
- protected void preResolve(TestEntity entity) {
+ protected void preResolve(final TestEntity entity) {
expectedValue = Identifiers.makeRandomId(10);
- entity.sensors().set(sensor, expectedValue);
+ Runnable job = new Runnable() {
+ public void run() {
+ entity.sensors().set(sensor, expectedValue);
+ }
+ };
+ if (eventually) {
+ executor.schedule(job, random.nextInt(20), TimeUnit.MILLISECONDS);
+ } else {
+ job.run();
+ }
}
-
@Override
- protected void postResolve(Maybe<?> actualValue) {
+ protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
assertEquals(actualValue.get(), expectedValue);
+
+ if (eventually) {
+ // Reset sensor - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
+ entity.sensors().set(sensor, null);
+ }
}
}
private static class SelfTestWorker extends DslTestWorker {
- private TestEntity entity;
-
public SelfTestWorker(TestApplication parent, BrooklynDslDeferredSupplier<?> dsl) {
super(parent, dsl, Entity.class);
}
@Override
protected void preResolve(TestEntity entity) {
- this.entity = entity;
}
@Override
- protected void postResolve(Maybe<?> actualValue) {
+ protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
assertEquals(actualValue.get(), entity);
}
@@ -198,9 +275,44 @@ public class DslTest extends BrooklynAppUnitTestSupport {
}
@Override
- protected void postResolve(Maybe<?> actualValue) {
+ protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
assertEquals(actualValue.get(), parent);
}
}
+ static Maybe<?> execDslImmediately(final BrooklynDslDeferredSupplier<?> dsl, final Class<?> type, final Entity context, boolean execInTask) throws Exception {
+ // Exec'ing immediately will call DSL in current thread. It needs to find the context entity,
+ // and does this using BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()).
+ // If we are not in a task executed by the context entity, then this lookup will fail.
+ Callable<Maybe<?>> job = new Callable<Maybe<?>>() {
+ public Maybe<?> call() throws Exception {
+ return Tasks.resolving(dsl).as(type)
+ .context(context)
+ .description("Computing "+dsl)
+ .immediately(true)
+ .getMaybe();
+ }
+ };
+ if (execInTask) {
+ Task<Maybe<?>> task = ((EntityInternal)context).getExecutionContext().submit(job);
+ task.get(Asserts.DEFAULT_LONG_TIMEOUT);
+ assertTrue(task.isDone());
+ return task.get();
+
+ } else {
+ return job.call();
+ }
+ }
+
+ static Maybe<?> execDslRealRealyQuick(BrooklynDslDeferredSupplier<?> dsl, Class<?> type, Entity context) {
+ return execDslEventually(dsl, type, context, ValueResolver.REAL_REAL_QUICK_WAIT);
+ }
+
+ static Maybe<?> execDslEventually(BrooklynDslDeferredSupplier<?> dsl, Class<?> type, Entity context, Duration timeout) {
+ return Tasks.resolving(dsl).as(type)
+ .context(context)
+ .description("Computing "+dsl)
+ .timeout(timeout)
+ .getMaybe();
+ }
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java
index 2f4465a..3a62016 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java
@@ -33,16 +33,17 @@ public class TransformerEnricherWithDslTest extends BrooklynAppUnitTestSupport {
int START_PORT = 10000;
- @Test(groups="Broken")
+ @Test
// See https://issues.apache.org/jira/browse/BROOKLYN-356
public void testTransformerResolvesResolvableValues() {
- testTransformerResolvesResolvableValues(START_PORT, 200);
+ LOG.info("Starting 100 iterations of testTransformerResolvesResolvableValues");
+ testTransformerResolvesResolvableValues(START_PORT, 100);
}
- @Test(groups={"Integration", "Broken"}, invocationCount=10)
+ @Test(groups={"Integration"}, invocationCount=10)
// See https://issues.apache.org/jira/browse/BROOKLYN-356
public void testTransformerResolvesResolvableValuesIntegration() {
- LOG.info("Starting 1000 iterations");
+ LOG.info("Starting 1000 iterations of testTransformerResolvesResolvableValues");
testTransformerResolvesResolvableValues(START_PORT, 1000);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
index 868bdc1..1263226 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.mgmt.Task;
@@ -55,6 +56,7 @@ import org.apache.brooklyn.util.core.task.BasicExecutionContext;
import org.apache.brooklyn.util.core.task.BasicTask;
import org.apache.brooklyn.util.core.task.DeferredSupplier;
import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.ImmediateSupplier;
import org.apache.brooklyn.util.core.task.ParallelTask;
import org.apache.brooklyn.util.core.task.TaskInternal;
import org.apache.brooklyn.util.core.task.Tasks;
@@ -67,6 +69,7 @@ import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
import org.apache.brooklyn.util.guava.Functionals;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.StringFunctions;
+import org.apache.brooklyn.util.text.StringFunctions.RegexReplacer;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.CountdownTimer;
import org.apache.brooklyn.util.time.Duration;
@@ -471,22 +474,67 @@ public class DependentConfiguration {
return transformMultiple(
MutableMap.<String,String>of("displayName", "formatting '"+spec+"' with "+taskArgs.size()+" task"+(taskArgs.size()!=1?"s":"")),
- new Function<List<Object>, String>() {
- @Override public String apply(List<Object> input) {
- Iterator<?> tri = input.iterator();
- Object[] vv = new Object[args.length];
- int i=0;
- for (Object arg : args) {
- if (arg instanceof TaskAdaptable || arg instanceof TaskFactory) vv[i] = tri.next();
- else if (arg instanceof DeferredSupplier) vv[i] = ((DeferredSupplier<?>) arg).get();
- else vv[i] = arg;
- i++;
- }
- return String.format(spec, vv);
- }},
+ new Function<List<Object>, String>() {
+ @Override public String apply(List<Object> input) {
+ Iterator<?> tri = input.iterator();
+ Object[] vv = new Object[args.length];
+ int i=0;
+ for (Object arg : args) {
+ if (arg instanceof TaskAdaptable || arg instanceof TaskFactory) vv[i] = tri.next();
+ else if (arg instanceof DeferredSupplier) vv[i] = ((DeferredSupplier<?>) arg).get();
+ else vv[i] = arg;
+ i++;
+ }
+ return String.format(spec, vv);
+ }},
taskArgs);
}
+ /**
+ * @throws ImmediateSupplier.ImmediateUnsupportedException if cannot evaluate this in a timely manner
+ */
+ public static Maybe<String> formatStringImmediately(final String spec, final Object ...args) {
+ List<Object> resolvedArgs = Lists.newArrayList();
+ for (Object arg : args) {
+ Maybe<?> argVal = resolveImmediately(arg);
+ if (argVal.isAbsent()) return Maybe.absent();
+ resolvedArgs.add(argVal.get());
+ }
+
+ return Maybe.of(String.format(spec, resolvedArgs.toArray()));
+ }
+
+ protected static <T> Maybe<?> resolveImmediately(Object val) {
+ if (val instanceof ImmediateSupplier<?>) {
+ return ((ImmediateSupplier<?>)val).getImmediately();
+ } else if (val instanceof TaskAdaptable) {
+ throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot immediately resolve value "+val);
+ } else if (val instanceof TaskFactory) {
+ throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot immediately resolve value "+val);
+ } else if (val instanceof DeferredSupplier<?>) {
+ throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot immediately resolve value "+val);
+ } else {
+ return Maybe.of(val);
+ }
+ }
+
+ public static Maybe<String> regexReplacementImmediately(Object source, Object pattern, Object replacement) {
+ Maybe<?> resolvedSource = resolveImmediately(source);
+ if (resolvedSource.isAbsent()) return Maybe.absent();
+ String resolvedSourceStr = String.valueOf(resolvedSource.get());
+
+ Maybe<?> resolvedPattern = resolveImmediately(pattern);
+ if (resolvedPattern.isAbsent()) return Maybe.absent();
+ String resolvedPatternStr = String.valueOf(resolvedPattern.get());
+
+ Maybe<?> resolvedReplacement = resolveImmediately(replacement);
+ if (resolvedReplacement.isAbsent()) return Maybe.absent();
+ String resolvedReplacementStr = String.valueOf(resolvedReplacement.get());
+
+ String result = new StringFunctions.RegexReplacer(resolvedPatternStr, resolvedReplacementStr).apply(resolvedSourceStr);
+ return Maybe.of(result);
+ }
+
public static Task<String> regexReplacement(Object source, Object pattern, Object replacement) {
List<TaskAdaptable<Object>> taskArgs = getTaskAdaptable(source, pattern, replacement);
Function<List<Object>, String> transformer = new RegexTransformerString(source, pattern, replacement);
@@ -497,6 +545,19 @@ public class DependentConfiguration {
);
}
+ public static Maybe<Function<String, String>> regexReplacementImmediately(Object pattern, Object replacement) {
+ Maybe<?> resolvedPattern = resolveImmediately(pattern);
+ if (resolvedPattern.isAbsent()) return Maybe.absent();
+ String resolvedPatternStr = String.valueOf(resolvedPattern.get());
+
+ Maybe<?> resolvedReplacement = resolveImmediately(replacement);
+ if (resolvedReplacement.isAbsent()) return Maybe.absent();
+ String resolvedReplacementStr = String.valueOf(resolvedReplacement.get());
+
+ RegexReplacer result = new StringFunctions.RegexReplacer(resolvedPatternStr, resolvedReplacementStr);
+ return Maybe.<Function<String, String>>of(result);
+ }
+
public static Task<Function<String, String>> regexReplacement(Object pattern, Object replacement) {
List<TaskAdaptable<Object>> taskArgs = getTaskAdaptable(pattern, replacement);
Function<List<Object>, Function<String, String>> transformer = new RegexTransformerFunction(pattern, replacement);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
index 9c4e657..2cc4d75 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
@@ -25,7 +25,6 @@ import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.core.task.ValueResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +89,7 @@ public class Transformer<T,U> extends AbstractTransformer<T,U> {
return (U) Tasks.resolving(targetValueRaw).as(targetSensor.getType())
.context(entity)
.description("Computing sensor "+targetSensor+" from "+targetValueRaw)
- .timeout(ValueResolver.NON_BLOCKING_WAIT)
+ .immediately(true)
.getMaybe().orNull();
}
public String toString() {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
new file mode 100644
index 0000000..03a1da6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.util.core.task;
+
+import org.apache.brooklyn.util.guava.Maybe;
+
+/**
+ * A class that supplies objects of a single type, without blocking for any significant length
+ * of time.
+ */
+public interface ImmediateSupplier<T> {
+
+ /**
+ * Indicates that we are unable to get the value immediately, because that is not supported
+ * (e.g. because the supplier is composed of sub-tasks that do not support {@link ImmediateSupplier}.
+ */
+ public static class ImmediateUnsupportedException extends RuntimeException {
+ private static final long serialVersionUID = -7942339715007942797L;
+
+ public ImmediateUnsupportedException(String message) {
+ super(message);
+ }
+ public ImmediateUnsupportedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Gets the value promptly, or returns {@link Maybe#absent()} if the value is not yet available.
+ *
+ * @throws ImmediateUnsupportedException if cannot determinte the value immediately
+ */
+ Maybe<T> getImmediately();
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
index 2942b23..10fd665 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
@@ -109,6 +109,7 @@ public class ValueResolver<T> implements DeferredSupplier<T> {
Boolean embedResolutionInTask;
/** timeout on execution, if possible, or if embedResolutionInTask is true */
Duration timeout;
+ boolean immediately;
boolean isTransientTask = true;
T defaultValue = null;
@@ -142,6 +143,7 @@ public class ValueResolver<T> implements DeferredSupplier<T> {
parentOriginalValue = parent.getOriginalValue();
timeout = parent.timeout;
+ immediately = parent.immediately;
parentTimer = parent.parentTimer;
if (parentTimer!=null && parentTimer.isExpired())
expired = true;
@@ -250,7 +252,18 @@ public class ValueResolver<T> implements DeferredSupplier<T> {
this.timeout = timeout;
return this;
}
-
+
+ /**
+ * Whether the value should be resolved immediately (and if not available immediately,
+ * return absent).
+ */
+ @Beta
+ public ValueResolver<T> immediately(boolean val) {
+ this.immediately = val;
+ if (timeout == null) timeout = ValueResolver.NON_BLOCKING_WAIT;
+ return this;
+ }
+
protected void checkTypeNotNull() {
if (type==null)
throw new NullPointerException("type must be set to resolve, for '"+value+"'"+(description!=null ? ", "+description : ""));
@@ -300,6 +313,18 @@ public class ValueResolver<T> implements DeferredSupplier<T> {
return Maybe.of((T) v);
try {
+ if (immediately && v instanceof ImmediateSupplier) {
+ final ImmediateSupplier<?> supplier = (ImmediateSupplier<?>) v;
+ try {
+ Maybe<?> result = supplier.getImmediately();
+
+ // Recurse: need to ensure returned value is cast, etc
+ return (result.isPresent()) ? new ValueResolver(result.get(), type, this).getMaybe() : Maybe.<T>absent();
+ } catch (ImmediateSupplier.ImmediateUnsupportedException e) {
+ log.debug("Unable to resolve-immediately for "+description+" ("+v+"); falling back to executing with timeout", e);
+ }
+ }
+
//if it's a task or a future, we wait for the task to complete
if (v instanceof TaskAdaptable<?>) {
//if it's a task, we make sure it is submitted
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
index 074c14a..e1952c2 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
@@ -121,7 +121,7 @@ public class TasksTest extends BrooklynAppUnitTestSupport {
Task<Object> t = Tasks.builder().body(Functionals.callable(EntityFunctions.config(TestEntity.CONF_OBJECT), app)).build();
ValueResolver<Object> v = Tasks.resolving(t).as(Object.class).context(app);
- ValueResolverTest.assertThrowsOnMaybe(v);
+ ValueResolverTest.assertThrowsOnGetMaybe(v);
ValueResolverTest.assertThrowsOnGet(v);
v.swallowExceptions();
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index 6c5e990..43cd0ba 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -18,10 +18,18 @@
*/
package org.apache.brooklyn.util.core.task;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
+
+import java.util.Arrays;
import java.util.concurrent.Callable;
import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
@@ -41,6 +49,125 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
super.setUp();
}
+ public void testTimeoutZero() {
+ Maybe<String> result = Tasks.resolving(newSleepTask(Duration.TEN_SECONDS, "foo")).as(String.class).context(app).timeout(Duration.ZERO).getMaybe();
+ Assert.assertFalse(result.isPresent());
+ }
+
+ public void testTimeoutBig() {
+ Maybe<String> result = Tasks.resolving(newSleepTask(Duration.ZERO, "foo")).as(String.class).context(app).timeout(Duration.TEN_SECONDS).getMaybe();
+ Assert.assertEquals(result.get(), "foo");
+ }
+
+ public void testNoExecutionContextOnCompleted() {
+ Task<String> t = newSleepTask(Duration.ZERO, "foo");
+ app.getExecutionContext().submit(t).getUnchecked();
+ Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
+ Assert.assertEquals(result.get(), "foo");
+ }
+
+ public void testSwallowError() {
+ ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app).swallowExceptions();
+ assertMaybeIsAbsent(result);
+ assertThrowsOnGet(result);
+ }
+
+ public void testDontSwallowError() {
+ ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app);
+ assertThrowsOnGetMaybe(result);
+ assertThrowsOnGet(result);
+ }
+
+ public void testDefaultWhenSwallowError() {
+ ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app).swallowExceptions().defaultValue("foo");
+ assertMaybeIsAbsent(result);
+ Assert.assertEquals(result.get(), "foo");
+ }
+
+ public void testDefaultBeforeDelayAndError() {
+ ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.TEN_SECONDS)).as(String.class).context(app).timeout(Duration.ZERO).defaultValue("foo");
+ assertMaybeIsAbsent(result);
+ Assert.assertEquals(result.get(), "foo");
+ }
+
+ public void testGetImmediately() {
+ MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
+ CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get();
+ assertNull(callInfo.task);
+ assertContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+ }
+
+ public void testGetImmediateSupplierWithTimeoutUsesBlocking() {
+ MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
+ CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).timeout(Asserts.DEFAULT_LONG_TIMEOUT).get();
+ assertNotNull(callInfo.task);
+ assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+ }
+
+ public void testGetImmediatelyInTask() throws Exception {
+ final MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
+ Task<CallInfo> task = app.getExecutionContext().submit(new Callable<CallInfo>() {
+ public CallInfo call() {
+ return myUniquelyNamedMethod();
+ }
+ private CallInfo myUniquelyNamedMethod() {
+ return Tasks.resolving(supplier).as(CallInfo.class).immediately(true).get();
+ }
+ });
+ CallInfo callInfo = task.get();
+ assertEquals(callInfo.task, task);
+ assertContainsCallingMethod(callInfo.stackTrace, "myUniquelyNamedMethod");
+ }
+
+ public void testGetImmediatelyFallsBackToDeferredCallInTask() throws Exception {
+ final MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier(true);
+ CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get();
+ assertNotNull(callInfo.task);
+ assertEquals(BrooklynTaskTags.getContextEntity(callInfo.task), app);
+ assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+ }
+
+ private static class MyImmediateAndDeferredSupplier implements ImmediateSupplier<CallInfo>, DeferredSupplier<CallInfo> {
+ private final boolean failImmediately;
+
+ public MyImmediateAndDeferredSupplier() {
+ this(false);
+ }
+
+ public MyImmediateAndDeferredSupplier(boolean simulateImmediateUnsupported) {
+ this.failImmediately = simulateImmediateUnsupported;
+ }
+
+ @Override
+ public Maybe<CallInfo> getImmediately() {
+ if (failImmediately) {
+ throw new ImmediateSupplier.ImmediateUnsupportedException("Simulate immediate unsupported");
+ } else {
+ return Maybe.of(CallInfo.newInstance());
+ }
+ }
+ @Override
+ public CallInfo get() {
+ return CallInfo.newInstance();
+ }
+ }
+
+ private static class CallInfo {
+ final StackTraceElement[] stackTrace;
+ final Task<?> task;
+
+ public static CallInfo newInstance() {
+ Exception e = new Exception("for stacktrace");
+ e.fillInStackTrace();
+ return new CallInfo(e.getStackTrace(), (Task<?>) Tasks.current());
+ }
+
+ CallInfo(StackTraceElement[] stackTrace, Task<?> task) {
+ this.stackTrace = stackTrace;
+ this.task = task;
+ }
+ }
+
public static final Task<String> newSleepTask(final Duration timeout, final String result) {
return Tasks.<String>builder().body(new Callable<String>() {
public String call() {
@@ -59,24 +186,7 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
).build();
}
- public void testTimeoutZero() {
- Maybe<String> result = Tasks.resolving(newSleepTask(Duration.TEN_SECONDS, "foo")).as(String.class).context(app).timeout(Duration.ZERO).getMaybe();
- Assert.assertFalse(result.isPresent());
- }
-
- public void testTimeoutBig() {
- Maybe<String> result = Tasks.resolving(newSleepTask(Duration.ZERO, "foo")).as(String.class).context(app).timeout(Duration.TEN_SECONDS).getMaybe();
- Assert.assertEquals(result.get(), "foo");
- }
-
- public void testNoExecutionContextOnCompleted() {
- Task<String> t = newSleepTask(Duration.ZERO, "foo");
- app.getExecutionContext().submit(t).getUnchecked();
- Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
- Assert.assertEquals(result.get(), "foo");
- }
-
- public static Throwable assertThrowsOnMaybe(ValueResolver<?> result) {
+ public static Exception assertThrowsOnGetMaybe(ValueResolver<?> result) {
try {
result = result.clone();
result.getMaybe();
@@ -84,7 +194,8 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
return null;
} catch (Exception e) { return e; }
}
- public static Throwable assertThrowsOnGet(ValueResolver<?> result) {
+
+ public static Exception assertThrowsOnGet(ValueResolver<?> result) {
result = result.clone();
try {
result.get();
@@ -92,6 +203,7 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
return null;
} catch (Exception e) { return e; }
}
+
public static <T> Maybe<T> assertMaybeIsAbsent(ValueResolver<T> result) {
result = result.clone();
Maybe<T> maybe = result.getMaybe();
@@ -99,29 +211,20 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
return maybe;
}
- public void testSwallowError() {
- ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app).swallowExceptions();
- assertMaybeIsAbsent(result);
- assertThrowsOnGet(result);
- }
-
-
- public void testDontSwallowError() {
- ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app);
- assertThrowsOnMaybe(result);
- assertThrowsOnGet(result);
- }
-
- public void testDefaultWhenSwallowError() {
- ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app).swallowExceptions().defaultValue("foo");
- assertMaybeIsAbsent(result);
- Assert.assertEquals(result.get(), "foo");
+ private void assertContainsCallingMethod(StackTraceElement[] stackTrace, String expectedMethod) {
+ for (StackTraceElement element : stackTrace) {
+ if (expectedMethod.equals(element.getMethodName())) {
+ return;
+ }
+ }
+ fail("Method "+expectedMethod+" not found: "+Arrays.toString(stackTrace));
}
-
- public void testDefaultBeforeDelayAndError() {
- ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.TEN_SECONDS)).as(String.class).context(app).timeout(Duration.ZERO).defaultValue("foo");
- assertMaybeIsAbsent(result);
- Assert.assertEquals(result.get(), "foo");
+
+ private void assertNotContainsCallingMethod(StackTraceElement[] stackTrace, String notExpectedMethod) {
+ for (StackTraceElement element : stackTrace) {
+ if (notExpectedMethod.equals(element.getMethodName())) {
+ fail("Method "+notExpectedMethod+" not expected: "+Arrays.toString(stackTrace));
+ }
+ }
}
-
}
[6/6] brooklyn-server git commit: Closes #390
Posted by sv...@apache.org.
Closes #390
BROOKLYN-356: fix race in transformer by adding getImmediately
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4fab5b1a
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4fab5b1a
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4fab5b1a
Branch: refs/heads/master
Commit: 4fab5b1a800ca2a1f5689ec04c5731808da2f200
Parents: c4cc0d1 0ded2cb
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Wed Nov 2 12:06:05 2016 +0200
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Wed Nov 2 12:06:05 2016 +0200
----------------------------------------------------------------------
.../brooklyn/api/mgmt/ManagementContext.java | 6 +-
.../spi/dsl/BrooklynDslDeferredSupplier.java | 4 +-
.../spi/dsl/methods/BrooklynDslCommon.java | 96 +++++-
.../brooklyn/spi/dsl/methods/DslComponent.java | 125 ++++++-
.../brooklyn/camp/brooklyn/dsl/DslTest.java | 345 ++++++++++++++++---
.../TransformerEnricherWithDslTest.java | 9 +-
.../internal/AbstractManagementContext.java | 6 +-
.../mgmt/internal/BasicSubscriptionContext.java | 3 +-
.../mgmt/internal/LocalSubscriptionManager.java | 35 +-
.../core/mgmt/internal/Subscription.java | 1 +
.../AbstractConfigurationSupportInternal.java | 2 +-
.../core/sensor/DependentConfiguration.java | 87 ++++-
.../brooklyn/enricher/stock/Propagator.java | 6 +-
.../brooklyn/enricher/stock/Transformer.java | 3 +-
.../enricher/stock/reducer/Reducer.java | 6 +-
.../util/core/task/ImmediateSupplier.java | 50 +++
.../brooklyn/util/core/task/ValueResolver.java | 34 +-
.../core/entity/EntitySubscriptionTest.java | 85 ++++-
.../entity/RecordingSensorEventListener.java | 16 +-
.../brooklyn/util/core/task/TasksTest.java | 2 +-
.../util/core/task/ValueResolverTest.java | 231 ++++++++++---
.../resources/AbstractBrooklynRestResource.java | 7 +-
.../rest/resources/EntityConfigResource.java | 2 +-
.../brooklyn/rest/resources/SensorResource.java | 2 +-
.../rest/transform/EffectorTransformer.java | 7 +-
25 files changed, 993 insertions(+), 177 deletions(-)
----------------------------------------------------------------------