You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sv...@apache.org on 2016/11/02 10:06:22 UTC

[1/6] brooklyn-server git commit: PR #390: incorporate review comments

Repository: brooklyn-server
Updated Branches:
  refs/heads/master c4cc0d1bd -> 4fab5b1a8


PR #390: incorporate review comments

Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/cee60cc8
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/cee60cc8
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/cee60cc8

Branch: refs/heads/master
Commit: cee60cc84510aebb548cecc9c2a3e65e96635406
Parents: 1cd2a09
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 1 11:05:45 2016 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 11:06:02 2016 +0000

----------------------------------------------------------------------
 .../spi/dsl/methods/BrooklynDslCommon.java      | 40 +++++--------
 .../brooklyn/spi/dsl/methods/DslComponent.java  | 23 +++++--
 .../brooklyn/camp/brooklyn/dsl/DslTest.java     | 63 ++++++++++++--------
 .../util/core/task/ImmediateSupplier.java       |  2 +-
 .../brooklyn/util/core/task/ValueResolver.java  |  7 +--
 .../core/entity/EntitySubscriptionTest.java     | 19 +++++-
 .../util/core/task/ValueResolverTest.java       | 45 +++++++++++++-
 7 files changed, 137 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
index d92bf46..f508db9 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
@@ -414,17 +414,7 @@ public class BrooklynDslCommon {
 
         @Override
         public Maybe<Object> getImmediately() {
-            Class<?> type = this.type;
-            if (type == null) {
-                EntityInternal entity = entity();
-                try {
-                    type = new ClassLoaderUtils(BrooklynDslCommon.class, entity).loadClass(typeName);
-                } catch (ClassNotFoundException e) {
-                    throw Exceptions.propagate(e);
-                }
-            }
-            final Class<?> clazz = type;
-
+            final Class<?> clazz = getOrLoadType();
             final ExecutionContext executionContext = ((EntityInternal)entity()).getExecutionContext();
 
             // Marker exception that one of our component-parts cannot yet be resolved - 
@@ -465,17 +455,7 @@ public class BrooklynDslCommon {
         
         @Override
         public Task<Object> newTask() {
-            Class<?> type = this.type;
-            if (type == null) {
-                EntityInternal entity = entity();
-                try {
-                    type = new ClassLoaderUtils(BrooklynDslCommon.class, entity).loadClass(typeName);
-                } catch (ClassNotFoundException e) {
-                    throw Exceptions.propagate(e);
-                }
-            }
-            final Class<?> clazz = type;
-
+            final Class<?> clazz = getOrLoadType();
             final ExecutionContext executionContext = ((EntityInternal)entity()).getExecutionContext();
             
             final Function<Object, Object> resolver = new Function<Object, Object>() {
@@ -508,6 +488,19 @@ public class BrooklynDslCommon {
                     .build();
         }
 
+        protected Class<?> getOrLoadType() {
+            Class<?> type = this.type;
+            if (type == null) {
+                EntityInternal entity = entity();
+                try {
+                    type = new ClassLoaderUtils(BrooklynDslCommon.class, entity).loadClass(typeName);
+                } catch (ClassNotFoundException e) {
+                    throw Exceptions.propagate(e);
+                }
+            }
+            return type;
+        }
+        
         public static <T> T create(Class<T> type, List<?> constructorArgs, Map<String,?> fields, Map<String,?> config) {
             try {
                 T bean = Reflections.invokeConstructorFromArgs(type, constructorArgs.toArray()).get();
@@ -603,8 +596,7 @@ public class BrooklynDslCommon {
                 .body(new Callable<Object>() {
                     @Override
                     public Object call() throws Exception {
-                        ManagementContextInternal managementContext = DslExternal.managementContext();
-                        return managementContext.getExternalConfigProviderRegistry().getConfig(providerName, key);
+                        return getImmediately().get();
                     }
                 })
                 .build();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
index b134450..cded853 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
@@ -88,7 +88,7 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
     }
 
     // ---------------------------
-    
+
     @Override
     public final Maybe<Entity> getImmediately() {
         try {
@@ -236,7 +236,11 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
 
         @Override
         public Maybe<Object> getImmediately() {
-            return Maybe.<Object>of(component.get().getId());
+            Maybe<Entity> targetEntityMaybe = component.getImmediately();
+            if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
+            Entity targetEntity = targetEntityMaybe.get();
+
+            return Maybe.<Object>of(targetEntity.getId());
         }
         
         @Override
@@ -277,7 +281,10 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
 
         @Override
         public final Maybe<Object> getImmediately() {
-            Entity targetEntity = component.getImmediately().get();
+            Maybe<Entity> targetEntityMaybe = component.getImmediately();
+            if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
+            Entity targetEntity = targetEntityMaybe.get();
+
             AttributeSensor<?> targetSensor = (AttributeSensor<?>) targetEntity.getEntityType().getSensor(sensorName);
             if (targetSensor == null) {
                 targetSensor = Sensors.newSensor(Object.class, sensorName);
@@ -331,7 +338,10 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
 
         @Override
         public final Maybe<Object> getImmediately() {
-            Entity targetEntity = component.get();
+            Maybe<Entity> targetEntityMaybe = component.getImmediately();
+            if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
+            Entity targetEntity = targetEntityMaybe.get();
+            
             return Maybe.of(targetEntity.getConfig(ConfigKeys.newConfigKey(Object.class, keyName)));
         }
 
@@ -394,7 +404,10 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
             if (si instanceof Sensor) {
                 return Maybe.<Sensor<?>>of((Sensor<?>)si);
             } else if (si instanceof String) {
-                Entity targetEntity = component.get();
+                Maybe<Entity> targetEntityMaybe = component.getImmediately();
+                if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
+                Entity targetEntity = targetEntityMaybe.get();
+
                 Sensor<?> result = null;
                 if (targetEntity!=null) {
                     result = targetEntity.getEntityType().getSensor((String)si);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
index d27e6c2..8db3720 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
@@ -48,6 +48,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
@@ -79,7 +80,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
     @Test
     public void testAttributeWhenReadyEmptyDoesNotBlock() throws Exception {
         BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestApplication.MY_ATTRIBUTE.getName());
-        Maybe<?> actualValue = execDslRealRealyQuick(dsl, TestApplication.MY_ATTRIBUTE.getType(), app);
+        Maybe<?> actualValue = execDslRealRealQuick(dsl, TestApplication.MY_ATTRIBUTE.getType(), app);
         assertTrue(actualValue.isAbsent());
     }
 
@@ -100,7 +101,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
     public void testAttributeWhenReadyBlocksUntilReady() throws Exception {
         // Fewer iterations, because there is a sleep each time
         BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestEntity.NAME.getName());
-        new AttributeWhenReadyTestWorker(app, TestEntity.NAME, dsl).eventually(true).resolverIterations(2).run();
+        new AttributeWhenReadyTestWorker(app, TestEntity.NAME, dsl).satisfiedAsynchronously(true).resolverIterations(2).run();
     }
 
     @Test(groups="Integration")
@@ -165,7 +166,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
         protected final Class<?> type;
         protected EntitySpec<TestEntity> childSpec = EntitySpec.create(TestEntity.class);
         protected int resolverIterations = MANY_RESOLVER_ITERATIONS;
-        protected boolean eventually = false;
+        protected boolean satisfiedAsynchronously = false;
         private boolean wrapInTaskForImmediately = true;
         
         public DslTestWorker(TestApplication parent, BrooklynDslDeferredSupplier<?> dsl, Class<?> type) {
@@ -179,8 +180,8 @@ public class DslTest extends BrooklynAppUnitTestSupport {
             return this;
         }
         
-        public DslTestWorker eventually(boolean val) {
-            eventually = val;
+        public DslTestWorker satisfiedAsynchronously(boolean val) {
+            satisfiedAsynchronously = val;
             return this;
         }
         
@@ -191,13 +192,10 @@ public class DslTest extends BrooklynAppUnitTestSupport {
         
         @Override
         public void run() {
-            TestEntity entity = parent.addChild(childSpec);
-            for (int i = 0; i < resolverIterations; i++) {
-                preResolve(entity);
-                Maybe<?> eventualValue = execDslEventually(dsl, type, entity, Duration.FIVE_SECONDS);//FIXME ONE_MINUTE);
-                postResolve(entity, eventualValue);
-
-                if (!eventually) {
+            try {
+                TestEntity entity = parent.addChild(childSpec);
+                for (int i = 0; i < resolverIterations; i++) {
+                    // Call dsl.getImmediately()
                     preResolve(entity);
                     Maybe<?> immediateValue;
                     try {
@@ -205,21 +203,29 @@ public class DslTest extends BrooklynAppUnitTestSupport {
                     } catch (Exception e) {
                         throw Exceptions.propagate(e);
                     }
-                    postResolve(entity, immediateValue);
+                    postResolve(entity, immediateValue, true);
+                    
+                    // Call dsl.get()
+                    preResolve(entity);
+                    Maybe<?> eventualValue = execDslEventually(dsl, type, entity, Duration.ONE_MINUTE);
+                    postResolve(entity, eventualValue, false);
                 }
+            } catch (Exception e) {
+                Exceptions.propagate(e);
             }
         }
 
-        protected void preResolve(TestEntity entity) {
+        protected void preResolve(TestEntity entity) throws Exception {
         }
 
-        protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
+        protected void postResolve(TestEntity entity, Maybe<?> actualValue, boolean isImmediate) throws Exception {
         }
     }
 
     private class AttributeWhenReadyTestWorker extends DslTestWorker {
         private AttributeSensor<String> sensor;
         private String expectedValue;
+        private ListenableScheduledFuture<?> future;
 
         public AttributeWhenReadyTestWorker(TestApplication parent, AttributeSensor<String> sensor, BrooklynDslDeferredSupplier<?> dsl) {
             super(parent, dsl, sensor.getType());
@@ -234,21 +240,28 @@ public class DslTest extends BrooklynAppUnitTestSupport {
                     entity.sensors().set(sensor, expectedValue);
                 }
             };
-            if (eventually) {
-                executor.schedule(job, random.nextInt(20), TimeUnit.MILLISECONDS);
+            if (satisfiedAsynchronously) {
+                future = executor.schedule(job, random.nextInt(20), TimeUnit.MILLISECONDS);
             } else {
                 job.run();
             }
         }
 
         @Override
-        protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
-            assertEquals(actualValue.get(), expectedValue);
+        protected void postResolve(TestEntity entity, Maybe<?> actualValue, boolean isImmediate) throws Exception {
+            if (satisfiedAsynchronously && isImmediate) {
+                // We accept a maybe.absent if we called getImmediately when satisfiedAsynchronously
+                assertTrue(actualValue.isAbsent() || expectedValue.equals(actualValue.get()), "actual="+actualValue+"; expected="+expectedValue);
+            } else {
+                assertEquals(actualValue.get(), expectedValue);
+            }
             
-            if (eventually) {
-                // Reset sensor - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
-                entity.sensors().set(sensor, null);
+            if (future != null) {
+                future.get(Asserts.DEFAULT_LONG_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+                future = null;
             }
+            // Reset sensor - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
+            entity.sensors().set(sensor, null);
         }
 
     }
@@ -263,7 +276,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
         }
 
         @Override
-        protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
+        protected void postResolve(TestEntity entity, Maybe<?> actualValue, boolean isImmediate) {
             assertEquals(actualValue.get(), entity);
         }
 
@@ -275,7 +288,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
         }
 
         @Override
-        protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
+        protected void postResolve(TestEntity entity, Maybe<?> actualValue, boolean isImmediate) {
             assertEquals(actualValue.get(), parent);
         }
     }
@@ -304,7 +317,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
         }
     }
     
-    static Maybe<?> execDslRealRealyQuick(BrooklynDslDeferredSupplier<?> dsl, Class<?> type, Entity context) {
+    static Maybe<?> execDslRealRealQuick(BrooklynDslDeferredSupplier<?> dsl, Class<?> type, Entity context) {
         return execDslEventually(dsl, type, context, ValueResolver.REAL_REAL_QUICK_WAIT);
     }
     

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
index 03a1da6..ac0aae4 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
@@ -30,7 +30,7 @@ public interface ImmediateSupplier<T> {
      * Indicates that we are unable to get the value immediately, because that is not supported
      * (e.g. because the supplier is composed of sub-tasks that do not support {@link ImmediateSupplier}.  
      */
-    public static class ImmediateUnsupportedException extends RuntimeException {
+    public static class ImmediateUnsupportedException extends UnsupportedOperationException {
         private static final long serialVersionUID = -7942339715007942797L;
         
         public ImmediateUnsupportedException(String message) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
index 10fd665..4c3cbe2 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
@@ -379,12 +379,9 @@ public class ValueResolver<T> implements DeferredSupplier<T> {
                             .tagIfNotNull(BrooklynTaskTags.getTargetOrContextEntityTag(Tasks.current()));
                     if (isTransientTask) tb.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG);
                     
+                    // Note that immediate resolution is handled by using ImmediateSupplier (using an instanceof check), 
+                    // so that it executes in the current thread instead of using task execution.
                     Task<Object> vt = exec.submit(tb.build());
-                    // TODO to handle immediate resolution, it would be nice to be able to submit 
-                    // so it executes in the current thread,
-                    // or put a marker in the target thread or task while it is running that the task 
-                    // should never wait on anything other than another value being resolved 
-                    // (though either could recurse infinitely) 
                     Maybe<Object> vm = Durations.get(vt, timer);
                     vt.cancel(true);
                     if (vm.isAbsent()) return (Maybe<T>)vm;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index a16576c..ea91f36 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.core.test.policy.TestEnricher;
 import org.apache.brooklyn.core.test.policy.TestPolicy;
 import org.apache.brooklyn.entity.group.BasicGroup;
 import org.apache.brooklyn.test.Asserts;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -76,6 +77,23 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
         app.start(ImmutableList.of(loc));
     }
     
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        try {
+            super.tearDown();
+        } finally {
+            loc = null;
+            entity = null;
+            observedEntity = null;
+            observedChildEntity = null;
+            observedGroup = null;
+            observedMemberEntity = null;
+            otherEntity = null;
+            listener = null;
+        }
+    }
+    
     @Test
     public void testSubscriptionReceivesEvents() {
         entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
@@ -298,7 +316,6 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
         entity.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener);
         observedChildEntity.sensors().set(TestEntity.SEQUENCE, 123);
         assertListenerCalledOnceWithContextEntityEventually(listener, entity);
-        listener.clearEvents();
     }
     
     @Test

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cee60cc8/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index 43cd0ba..b13d2b5 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -59,13 +59,56 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
         Assert.assertEquals(result.get(), "foo");
     }
 
-    public void testNoExecutionContextOnCompleted() {
+    public void testCompletedTaskReturnsResultImmediately() {
+        // Call ValueResolver.getMaybe() from this thread, which has no execution context.
+        // However, the task has already been submitted and we have waited for it to complete.
+        // Therefore the ValueResolver can simply check for task.isDone() and return its result immediately.
         Task<String> t = newSleepTask(Duration.ZERO, "foo");
         app.getExecutionContext().submit(t).getUnchecked();
         Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
         Assert.assertEquals(result.get(), "foo");
     }
 
+    public void testUnsubmittedTaskWhenNoExecutionContextFails() {
+        // ValueResolver.getMaybe() is called with no execution context. Therefore it will not execute the task.
+        Task<String> t = newSleepTask(Duration.ZERO, "foo");
+        Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
+        
+        Assert.assertTrue(result.isAbsent(), "result="+result);
+        Exception exception = ((Maybe.Absent<?>)result).getException();
+        Assert.assertTrue(exception.toString().contains("no execution context available"), "exception="+exception);
+    }
+
+    public void testUnsubmittedTaskWithExecutionContextExecutesAndReturns() {
+        // ValueResolver.getMaybe() is called in app's execution context. Therefore it will execute the task.
+        final Task<String> t = newSleepTask(Duration.ZERO, "foo");
+        
+        Maybe<String>  result = app.getExecutionContext()
+                .submit(new Callable<Maybe<String> >() {
+                    public Maybe<String>  call() throws Exception {
+                        return Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe();
+                    }})
+                .getUnchecked();
+        
+        Assert.assertEquals(result.get(), "foo");
+    }
+
+    public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOut() {
+        // ValueResolver.getMaybe() is called in app's execution context. Therefore it will execute the task.
+        final Task<String> t = newSleepTask(Duration.ONE_MINUTE, "foo");
+        
+        Maybe<String>  result = app.getExecutionContext()
+                .submit(new Callable<Maybe<String> >() {
+                    public Maybe<String>  call() throws Exception {
+                        return Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
+                    }})
+                .getUnchecked();
+        
+        Assert.assertTrue(result.isAbsent(), "result="+result);
+        Exception exception = ((Maybe.Absent<?>)result).getException();
+        Assert.assertTrue(exception.toString().contains("not completed when immediate completion requested"), "exception="+exception);
+    }
+
     public void testSwallowError() {
         ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app).swallowExceptions();
         assertMaybeIsAbsent(result);


[2/6] brooklyn-server git commit: Subscription callbacks: task has contextEntity tag

Posted by sv...@apache.org.
Subscription callbacks: task has contextEntity tag

Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/3f37e657
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/3f37e657
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/3f37e657

Branch: refs/heads/master
Commit: 3f37e657a3efd0b1b025008ab1070713b9b8badc
Parents: c4cc0d1
Author: Aled Sage <al...@gmail.com>
Authored: Thu Oct 13 22:46:57 2016 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 11:06:02 2016 +0000

----------------------------------------------------------------------
 .../internal/AbstractManagementContext.java     |  6 +-
 .../mgmt/internal/BasicSubscriptionContext.java |  3 +-
 .../mgmt/internal/LocalSubscriptionManager.java | 35 +++++----
 .../core/mgmt/internal/Subscription.java        |  1 +
 .../core/entity/EntitySubscriptionTest.java     | 76 ++++++++++++++++----
 .../entity/RecordingSensorEventListener.java    | 16 ++++-
 6 files changed, 109 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index ea84f3c..b57e6eb 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -66,6 +66,7 @@ import org.apache.brooklyn.core.internal.storage.impl.BrooklynStorageImpl;
 import org.apache.brooklyn.core.internal.storage.impl.inmemory.InMemoryDataGridFactory;
 import org.apache.brooklyn.core.location.BasicLocationRegistry;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
 import org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContextSequential;
 import org.apache.brooklyn.core.mgmt.classloading.JavaBrooklynClassLoadingContext;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
@@ -87,6 +88,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 public abstract class AbstractManagementContext implements ManagementContextInternal {
@@ -277,7 +280,8 @@ public abstract class AbstractManagementContext implements ManagementContextInte
     @Override
     public SubscriptionContext getSubscriptionContext(Entity e) {
         // BSC is a thin wrapper around SM so fine to create a new one here
-        return new BasicSubscriptionContext(getSubscriptionManager(), e);
+        Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e)));
+        return new BasicSubscriptionContext(flags, getSubscriptionManager(), e);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
index 57d4712..2c6ab65 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
 import groovy.lang.Closure;
@@ -55,7 +56,7 @@ public class BasicSubscriptionContext implements SubscriptionContext {
     private final Map<String,Object> flags;
 
     public BasicSubscriptionContext(SubscriptionManager manager, Object subscriber) {
-        this(Collections.<String,Object>emptyMap(), manager, subscriber);
+        this(ImmutableMap.<String, Object>of(), manager, subscriber);
     }
     
     public BasicSubscriptionContext(Map<String, ?> flags, SubscriptionManager manager, Object subscriber) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index f9606f8..fd15cf3 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -21,12 +21,12 @@ package org.apache.brooklyn.core.mgmt.internal;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.join;
-import static org.apache.brooklyn.util.JavaGroovyEquivalents.mapOf;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +43,7 @@ import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.task.BasicExecutionManager;
 import org.apache.brooklyn.util.core.task.SingleThreadedScheduler;
@@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Multimaps;
 
 /**
@@ -97,6 +99,12 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
         Entity producer = s.producer;
         Sensor<T> sensor= s.sensor;
         s.subscriber = getSubscriber(flags, s);
+        if (flags.containsKey("tags") || flags.containsKey("tag")) {
+            Iterable<?> tags = (Iterable<?>) flags.get("tags");
+            Object tag = flags.get("tag");
+            s.subscriberExtraExecTags = (tag == null) ? tags : (tags == null ? ImmutableList.of(tag) : MutableList.builder().addAll(tags).add(tag).build());
+        }
+
         if (flags.containsKey("subscriberExecutionManagerTag")) {
             s.subscriberExecutionManagerTag = flags.remove("subscriberExecutionManagerTag");
             s.subscriberExecutionManagerTagSupplied = true;
@@ -130,8 +138,13 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
                 LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
             } else {
                 if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
-                Map<String, Object> tagsMap = MutableMap.of("tag", s.subscriberExecutionManagerTag);
-                em.submit(tagsMap, new Runnable() {
+                List<Object> tags = MutableList.builder()
+                        .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+                        .add(s.subscriberExecutionManagerTag)
+                        .build()
+                        .asUnmodifiable();
+                Map<String, ?> execFlags = MutableMap.of("tags", tags);
+                em.submit(execFlags, new Runnable() {
                     @Override
                     public String toString() {
                         return "LSM.publishInitialValue("+s.producer+", "+s.sensor+")";
@@ -222,16 +235,14 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
                     continue;
                 final Subscription sAtClosureCreation = s;
                 
-//                Set<Object> tags = MutableSet.of();
-//                if (s.subscriberExecutionManagerTag!=null) tags.add(s.subscriberExecutionManagerTag);
-//                if (event.getSource()!=null) tags.add(BrooklynTaskTags.tagForContextEntity(event.getSource()));
-//                Map<String, Object> tagsMap = mapOf("tags", (Object)tags);
-                // use code above, instead of line below, if we want subscription deliveries associated with the entity;
-                // that will cause them to be cancelled when the entity is unmanaged
-                // (not sure that is useful, and likely NOT worth the expense, but it might be...) -Alex Oct 2014
-                Map<String, Object> tagsMap = mapOf("tag", s.subscriberExecutionManagerTag);
+                List<Object> tags = MutableList.builder()
+                        .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+                        .add(s.subscriberExecutionManagerTag)
+                        .build()
+                        .asUnmodifiable();
+                Map<String, ?> execFlags = MutableMap.of("tags", tags);
                 
-                em.submit(tagsMap, new Runnable() {
+                em.submit(execFlags, new Runnable() {
                     @Override
                     public String toString() {
                         return "LSM.publish("+event+")";

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
index 66706a1..5e71701 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
@@ -37,6 +37,7 @@ class Subscription<T> implements SubscriptionHandle {
     public Object subscriberExecutionManagerTag;
     /** whether the tag was supplied by user, in which case we should not clear execution semantics */
     public boolean subscriberExecutionManagerTagSupplied;
+    public Iterable<?> subscriberExtraExecTags;
     public final Entity producer;
     public final Sensor<T> sensor;
     public final SensorEventListener<? super T> listener;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index 3d61a99..a16576c 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -20,16 +20,20 @@ package org.apache.brooklyn.core.entity;
 
 import static org.testng.Assert.assertEquals;
 
-import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.EnricherSpec;
 import org.apache.brooklyn.core.location.SimulatedLocation;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.sensor.BasicSensorEvent;
-import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.core.test.policy.TestEnricher;
+import org.apache.brooklyn.core.test.policy.TestPolicy;
 import org.apache.brooklyn.entity.group.BasicGroup;
 import org.apache.brooklyn.test.Asserts;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -37,14 +41,13 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
-public class EntitySubscriptionTest {
+public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
 
     // TODO Duplication between this and PolicySubscriptionTest
     
     private static final long SHORT_WAIT_MS = 100;
 
     private SimulatedLocation loc;
-    private TestApplication app;
     private TestEntity entity;
     private TestEntity observedEntity;
     private BasicGroup observedGroup;
@@ -52,10 +55,11 @@ public class EntitySubscriptionTest {
     private TestEntity observedMemberEntity;
     private TestEntity otherEntity;
     private RecordingSensorEventListener<Object> listener;
-    
+
     @BeforeMethod(alwaysRun=true)
-    public void setUp() {
-        app = TestApplication.Factory.newManagedInstanceForTests();
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
         loc = app.newSimulatedLocation();
         entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
         observedEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
@@ -72,11 +76,6 @@ public class EntitySubscriptionTest {
         app.start(ImmutableList.of(loc));
     }
     
-    @AfterMethod(alwaysRun=true)
-    public void tearDown() {
-        if (app != null) Entities.destroyAll(app.getManagementContext());
-    }
-    
     @Test
     public void testSubscriptionReceivesEvents() {
         entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
@@ -280,4 +279,55 @@ public class EntitySubscriptionTest {
         entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, TestEntity.NAME, listener);
         entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, null, listener);
     }
+    
+    @Test
+    public void testContextEntityOnSubscriptionCallbackTask() {
+        observedEntity.sensors().set(TestEntity.NAME, "myval");
+        entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener);
+        
+        // notify-of-initial-value should give us our entity
+        assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+        listener.clearEvents();
+        
+        // as should subsequent events
+        observedEntity.sensors().set(TestEntity.NAME, "myval2");
+        assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+        listener.clearEvents();
+
+        // same for subscribing to children: context should be the subscriber
+        entity.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener);
+        observedChildEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+        listener.clearEvents();
+    }
+    
+    @Test
+    public void testContextEntityOnPolicySubscriptionCallbackTask() {
+        TestPolicy policy = entity.policies().add(PolicySpec.create(TestPolicy.class));
+        policy.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+
+        observedEntity.sensors().set(TestEntity.NAME, "myval");
+        assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+    }
+    
+    @Test
+    public void testContextEntityOnEnricherSubscriptionCallbackTask() {
+        TestEnricher enricher = entity.enrichers().add(EnricherSpec.create(TestEnricher.class));
+        enricher.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+
+        observedEntity.sensors().set(TestEntity.NAME, "myval");
+        assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+    }
+    
+    protected void assertListenerCalledEventually(final RecordingSensorEventListener<?> listener, final int expectedEventCount) {
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents().size(), expectedEventCount);
+            }});
+    }
+    
+    protected void assertListenerCalledOnceWithContextEntityEventually(final RecordingSensorEventListener<?> listener, final Entity expectedContext) {
+        assertListenerCalledEventually(listener, 1);
+        assertEquals(BrooklynTaskTags.getContextEntity(Iterables.getOnlyElement(listener.getTasks())), entity);
+    }
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
index 44920ed..f5384b2 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
@@ -25,8 +25,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 
+import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.core.task.Tasks;
 
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
@@ -41,6 +44,8 @@ import com.google.common.primitives.Longs;
 public class RecordingSensorEventListener<T> implements SensorEventListener<T>, Iterable<SensorEvent<T>> {
 
     private final List<SensorEvent<T>> events = Lists.newCopyOnWriteArrayList();
+    private final List<Task<?>> tasks = Lists.newCopyOnWriteArrayList();
+
     private final boolean suppressDuplicates;
     private T lastValue;
 
@@ -56,6 +61,7 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
     public void onEvent(SensorEvent<T> event) {
         if (!suppressDuplicates || events.isEmpty() || !Objects.equals(lastValue, event.getValue())) {
             events.add(event);
+            tasks.add(Tasks.current());
             lastValue = event.getValue();
         }
     }
@@ -68,6 +74,13 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
     }
 
     /**
+     * The {@link {@link Tasks#current()} for each call to {@link #onEvent(SensorEvent)}
+     */
+    public List<Task<?>> getTasks() {
+        return MutableList.copyOf(tasks).asUnmodifiable();
+    }
+
+    /**
      * @return A live read-only view of recorded events.
      */
     public Iterable<T> getEventValues() {
@@ -89,7 +102,8 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
      * Clears all events recorded by the listener.
      */
     public void clearEvents() {
-        this.events.clear();
+        events.clear();
+        tasks.clear();
         lastValue = null;
     }
 


[5/6] brooklyn-server git commit: More use of Tasks.resolving.immediately

Posted by sv...@apache.org.
More use of Tasks.resolving.immediately

Similar to BROOKLYN-356, use more efficient thread usage and avoid
premature timeouts if threads are starved.

Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/0ded2cbd
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/0ded2cbd
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/0ded2cbd

Branch: refs/heads/master
Commit: 0ded2cbdd8233101a44961f1f6389809c01b4f2a
Parents: 8601d21
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 1 19:56:59 2016 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 20:50:58 2016 +0000

----------------------------------------------------------------------
 .../java/org/apache/brooklyn/enricher/stock/Propagator.java   | 6 +++---
 .../org/apache/brooklyn/enricher/stock/reducer/Reducer.java   | 6 +++++-
 .../brooklyn/rest/resources/AbstractBrooklynRestResource.java | 7 +++++--
 .../apache/brooklyn/rest/resources/EntityConfigResource.java  | 2 +-
 .../org/apache/brooklyn/rest/resources/SensorResource.java    | 2 +-
 .../apache/brooklyn/rest/transform/EffectorTransformer.java   | 7 +++++--
 6 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
index a8f9f97..76d3cba 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Propagator.java
@@ -220,8 +220,8 @@ public class Propagator extends AbstractEnricher implements SensorEventListener<
         for (Map.Entry<?,?> entry : mapping.entrySet()) {
             Object keyO = entry.getKey();
             Object valueO = entry.getValue();
-            Sensor<?> key = Tasks.resolving(keyO).as(Sensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
-            Sensor<?> value = Tasks.resolving(valueO).as(Sensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+            Sensor<?> key = Tasks.resolving(keyO).as(Sensor.class).immediately(true).context(producer).get();
+            Sensor<?> value = Tasks.resolving(valueO).as(Sensor.class).immediately(true).context(producer).get();
             result.put(key, value);
         }
         return result;
@@ -233,7 +233,7 @@ public class Propagator extends AbstractEnricher implements SensorEventListener<
         }
         List<Sensor<?>> result = MutableList.of();
         for (Object sensorO : sensors) {
-            Sensor<?> sensor = Tasks.resolving(sensorO).as(Sensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+            Sensor<?> sensor = Tasks.resolving(sensorO).as(Sensor.class).immediately(true).context(producer).get();
             result.add(sensor);
         }
         return result;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
index 558d50a..d2189ac 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
@@ -78,7 +78,11 @@ public class Reducer extends AbstractEnricher implements SensorEventListener<Obj
         List<AttributeSensor<?>> sensorListTemp = Lists.newArrayList();
 
         for (Object sensorO : getConfig(SOURCE_SENSORS)) {
-            AttributeSensor<?> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+            AttributeSensor<?> sensor = Tasks.resolving(sensorO)
+                    .as(AttributeSensor.class)
+                    .immediately(true)
+                    .context(producer)
+                    .get();
             Optional<? extends Sensor<?>> foundSensor = Iterables.tryFind(sensorListTemp, 
                     SensorPredicates.nameEqualTo(sensor.getName()));
             

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/AbstractBrooklynRestResource.java
----------------------------------------------------------------------
diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/AbstractBrooklynRestResource.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/AbstractBrooklynRestResource.java
index 2c47866..02e856f 100644
--- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/AbstractBrooklynRestResource.java
+++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/AbstractBrooklynRestResource.java
@@ -105,6 +105,7 @@ public abstract class AbstractBrooklynRestResource {
         private @Nullable Entity entity;
         private @Nullable Duration timeout;
         private @Nullable Object rendererHintSource;
+        private @Nullable Boolean immediately;
         
         public static RestValueResolver resolving(Object v) { return new RestValueResolver(v); }
         
@@ -130,10 +131,11 @@ public abstract class AbstractBrooklynRestResource {
         public RestValueResolver raw(Boolean raw) { this.raw = raw; return this; }
         public RestValueResolver context(Entity entity) { this.entity = entity; return this; }
         public RestValueResolver timeout(Duration timeout) { this.timeout = timeout; return this; }
+        public RestValueResolver immediately(boolean immediately) { this.immediately = immediately; return this; }
         public RestValueResolver renderAs(Object rendererHintSource) { this.rendererHintSource = rendererHintSource; return this; }
 
         public Object resolve() {
-            Object valueResult = getImmediateValue(valueToResolve, entity, timeout);
+            Object valueResult = getImmediateValue(valueToResolve, entity, immediately, timeout);
             if (valueResult==UNRESOLVED) valueResult = valueToResolve;
             if (rendererHintSource!=null && Boolean.FALSE.equals(raw)) {
                 valueResult = RendererHints.applyDisplayValueHintUnchecked(rendererHintSource, valueResult);
@@ -143,11 +145,12 @@ public abstract class AbstractBrooklynRestResource {
         
         private static Object UNRESOLVED = "UNRESOLVED".toCharArray();
         
-        private static Object getImmediateValue(Object value, @Nullable Entity context, @Nullable Duration timeout) {
+        private static Object getImmediateValue(Object value, @Nullable Entity context, @Nullable Boolean immediately, @Nullable Duration timeout) {
             return Tasks.resolving(value)
                     .as(Object.class)
                     .defaultValue(UNRESOLVED)
                     .timeout(timeout)
+                    .immediately(immediately == null ? false : immediately.booleanValue())
                     .context(context)
                     .swallowExceptions()
                     .get();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
----------------------------------------------------------------------
diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
index 5a2ac0d..fb9dc32 100644
--- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
+++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityConfigResource.java
@@ -151,7 +151,7 @@ public class EntityConfigResource extends AbstractBrooklynRestResource implement
         }
         
         Object value = ((EntityInternal)entity).config().getRaw(ck).orNull();
-        return resolving(value).preferJson(preferJson).asJerseyOutermostReturnValue(true).raw(raw).context(entity).timeout(ValueResolver.PRETTY_QUICK_WAIT).renderAs(ck).resolve();
+        return resolving(value).preferJson(preferJson).asJerseyOutermostReturnValue(true).raw(raw).context(entity).immediately(true).renderAs(ck).resolve();
     }
 
     private ConfigKey<?> findConfig(Entity entity, String configKeyName) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/SensorResource.java
----------------------------------------------------------------------
diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/SensorResource.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/SensorResource.java
index 750b7de..c51043e 100644
--- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/SensorResource.java
+++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/SensorResource.java
@@ -113,7 +113,7 @@ public class SensorResource extends AbstractBrooklynRestResource implements Sens
         }
         
         Object value = entity.getAttribute(sensor);
-        return resolving(value).preferJson(preferJson).asJerseyOutermostReturnValue(true).raw(raw).context(entity).timeout(ValueResolver.PRETTY_QUICK_WAIT).renderAs(sensor).resolve();
+        return resolving(value).preferJson(preferJson).asJerseyOutermostReturnValue(true).raw(raw).context(entity).immediately(true).renderAs(sensor).resolve();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/0ded2cbd/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/transform/EffectorTransformer.java
----------------------------------------------------------------------
diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/transform/EffectorTransformer.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/transform/EffectorTransformer.java
index 9304008..6987b5a 100644
--- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/transform/EffectorTransformer.java
+++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/transform/EffectorTransformer.java
@@ -80,8 +80,11 @@ public class EffectorTransformer {
     @SuppressWarnings({ "unchecked", "rawtypes" })
     protected static EffectorSummary.ParameterSummary<?> parameterSummary(Entity entity, ParameterType<?> parameterType) {
         try {
-            Maybe<?> defaultValue = Tasks.resolving(parameterType.getDefaultValue()).as(parameterType.getParameterClass())
-                .context(entity).timeout(ValueResolver.REAL_QUICK_WAIT).getMaybe();
+            Maybe<?> defaultValue = Tasks.resolving(parameterType.getDefaultValue())
+                    .as(parameterType.getParameterClass())
+                    .context(entity)
+                    .immediately(true)
+                    .getMaybe();
             return new ParameterSummary(parameterType.getName(), parameterType.getParameterClassName(), 
                 parameterType.getDescription(), 
                 WebResourceUtils.getValueForDisplay(defaultValue.orNull(), true, false),


[4/6] brooklyn-server git commit: PR #390: incorporate more review comments

Posted by sv...@apache.org.
PR #390: incorporate more review comments


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/8601d219
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/8601d219
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/8601d219

Branch: refs/heads/master
Commit: 8601d219f4a5bec4ab751257b7c022545bfdd92b
Parents: cee60cc
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 1 17:48:46 2016 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 20:50:58 2016 +0000

----------------------------------------------------------------------
 .../spi/dsl/methods/BrooklynDslCommon.java      |   2 +
 .../brooklyn/spi/dsl/methods/DslComponent.java  |  62 ++++++---
 .../brooklyn/camp/brooklyn/dsl/DslTest.java     | 134 ++++++++++++++++++-
 .../AbstractConfigurationSupportInternal.java   |   2 +-
 .../util/core/task/ValueResolverTest.java       |  25 ++--
 5 files changed, 191 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8601d219/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
index f508db9..8fb48cf 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
@@ -584,6 +584,8 @@ public class BrooklynDslCommon {
 
         @Override
         public final Maybe<Object> getImmediately() {
+            // Note this call to getConfig() is different from entity.getConfig.
+            // We expect it to not block waiting for other entities.
             ManagementContextInternal managementContext = DslExternal.managementContext();
             return Maybe.<Object>of(managementContext.getExternalConfigProviderRegistry().getConfig(providerName, key));
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8601d219/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
index cded853..720dfae 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
@@ -37,6 +37,7 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.internal.EntityManagerInternal;
 import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.util.core.task.ImmediateSupplier;
 import org.apache.brooklyn.util.core.task.TaskBuilder;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -91,11 +92,7 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
 
     @Override
     public final Maybe<Entity> getImmediately() {
-        try {
-            return Maybe.of(new EntityInScopeFinder(scopeComponent, scope, componentId).call());
-        } catch (Exception e) {
-            throw Exceptions.propagate(e);
-        }
+        return new EntityInScopeFinder(scopeComponent, scope, componentId).getImmediately();
     }
 
     @Override
@@ -108,7 +105,7 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
                 .build();
     }
     
-    protected static class EntityInScopeFinder implements Callable<Entity> {
+    protected static class EntityInScopeFinder implements Callable<Entity>, ImmediateSupplier<Entity> {
         protected final DslComponent scopeComponent;
         protected final Scope scope;
         protected final String componentId;
@@ -119,33 +116,55 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
             this.componentId = componentId;
         }
 
-        protected EntityInternal getEntity() {
-            if (scopeComponent!=null) {
-                return (EntityInternal)scopeComponent.get();
-            } else {
-                return entity();
+        @Override 
+        public Maybe<Entity> getImmediately() {
+            try {
+                return callImpl(true);
+            } catch (Exception e) {
+                throw Exceptions.propagate(e);
             }
         }
-        
+
         @Override
         public Entity call() throws Exception {
+            return callImpl(false).get();
+        }
+
+        protected Maybe<Entity> getEntity(boolean immediate) {
+            if (scopeComponent != null) {
+                if (immediate) {
+                    return scopeComponent.getImmediately();
+                } else {
+                    return Maybe.of(scopeComponent.get());
+                }
+            } else {
+                return Maybe.<Entity>of(entity());
+            }
+        }
+        
+        protected Maybe<Entity> callImpl(boolean immediate) throws Exception {
+            Maybe<Entity> entityMaybe = getEntity(immediate);
+            if (immediate && entityMaybe.isAbsent()) {
+                return entityMaybe;
+            }
+            EntityInternal entity = (EntityInternal) entityMaybe.get();
+            
             Iterable<Entity> entitiesToSearch = null;
-            EntityInternal entity = getEntity();
             Predicate<Entity> notSelfPredicate = Predicates.not(Predicates.<Entity>equalTo(entity));
 
             switch (scope) {
                 case THIS:
-                    return entity;
+                    return Maybe.<Entity>of(entity);
                 case PARENT:
-                    return entity.getParent();
+                    return Maybe.<Entity>of(entity.getParent());
                 case GLOBAL:
                     entitiesToSearch = ((EntityManagerInternal)entity.getManagementContext().getEntityManager())
                         .getAllEntitiesInApplication( entity().getApplication() );
                     break;
                 case ROOT:
-                    return entity.getApplication();
+                    return Maybe.<Entity>of(entity.getApplication());
                 case SCOPE_ROOT:
-                    return Entities.catalogItemScopeRoot(entity);
+                    return Maybe.<Entity>of(Entities.catalogItemScopeRoot(entity));
                 case DESCENDANT:
                     entitiesToSearch = Entities.descendantsWithoutSelf(entity);
                     break;
@@ -165,8 +184,9 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
             
             Optional<Entity> result = Iterables.tryFind(entitiesToSearch, EntityPredicates.configEqualTo(BrooklynCampConstants.PLAN_ID, componentId));
             
-            if (result.isPresent())
-                return result.get();
+            if (result.isPresent()) {
+                return Maybe.of(result.get());
+            }
             
             // TODO may want to block and repeat on new entities joining?
             throw new NoSuchElementException("No entity matching id " + componentId+
@@ -340,9 +360,9 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
         public final Maybe<Object> getImmediately() {
             Maybe<Entity> targetEntityMaybe = component.getImmediately();
             if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
-            Entity targetEntity = targetEntityMaybe.get();
+            EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get();
             
-            return Maybe.of(targetEntity.getConfig(ConfigKeys.newConfigKey(Object.class, keyName)));
+            return targetEntity.config().getNonBlocking(ConfigKeys.newConfigKey(Object.class, keyName));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8601d219/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
index 8db3720..69a0aee 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
@@ -32,6 +32,8 @@ import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.BrooklynDslDeferredSupplier;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.BrooklynDslCommon;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestApplication;
@@ -47,6 +49,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
 import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.ListenableScheduledFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
@@ -116,6 +120,74 @@ public class DslTest extends BrooklynAppUnitTestSupport {
     }
 
     @Test
+    public void testConfig() throws Exception {
+        ConfigKey<String> configKey = ConfigKeys.newStringConfigKey("testConfig");
+        BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.config(configKey.getName());
+        new ConfigTestWorker(app, configKey, dsl).run();
+    }
+
+    @Test
+    public void testConfigWithDsl() throws Exception {
+        ConfigKey<?> configKey = ConfigKeys.newConfigKey(Entity.class, "testConfig");
+        BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.config(configKey.getName());
+        Supplier<ConfigValuePair> valueSupplier = new Supplier<ConfigValuePair>() {
+            @Override public ConfigValuePair get() {
+                return new ConfigValuePair(BrooklynDslCommon.root(), app);
+            }
+        };
+        new ConfigTestWorker(app, configKey, valueSupplier, dsl).run();
+    }
+
+    @Test
+    public void testConfigWithDslNotReadyImmediately() throws Exception {
+        final ConfigKey<String> configKey = ConfigKeys.newStringConfigKey("testConfig");
+        BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.config(configKey.getName());
+        Function<Entity, ConfigValuePair> valueSupplier = new Function<Entity, ConfigValuePair>() {
+            private ListenableScheduledFuture<?> future;
+            @Override
+            public ConfigValuePair apply(final Entity entity) {
+                try {
+                    // If executed in a loop, then wait for previous call's future to complete.
+                    // If previous assertion used getImmediately, then it won't have waited for the future to complete.
+                    if (future != null) {
+                        future.get(Asserts.DEFAULT_LONG_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+                        future = null;
+                    }
+    
+                    // Reset sensor - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
+                    entity.sensors().set(TestApplication.MY_ATTRIBUTE, null);
+                    
+                    final String expectedValue = Identifiers.makeRandomId(10);
+                    Runnable job = new Runnable() {
+                        public void run() {
+                            entity.sensors().set(TestApplication.MY_ATTRIBUTE, expectedValue);
+                        }
+                    };
+                    future = executor.schedule(job, random.nextInt(20), TimeUnit.MILLISECONDS);
+    
+                    BrooklynDslDeferredSupplier<?> attributeDsl = BrooklynDslCommon.attributeWhenReady(TestApplication.MY_ATTRIBUTE.getName());
+                    return new ConfigValuePair(attributeDsl, expectedValue);
+
+                } catch (Exception e) {
+                    throw Exceptions.propagate(e);
+                }
+            }
+        };
+        new ConfigTestWorker(app, configKey, valueSupplier, dsl).satisfiedAsynchronously(true).resolverIterations(2).run();
+    }
+    
+    @Test
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public void testConfigImmediatelyDoesNotBlock() throws Exception {
+        ConfigKey<String> configKey = ConfigKeys.newStringConfigKey("testConfig");
+        BrooklynDslDeferredSupplier<?> attributeDsl = BrooklynDslCommon.attributeWhenReady(TestApplication.MY_ATTRIBUTE.getName());
+        app.config().set((ConfigKey)configKey, attributeDsl); // ugly cast because val is DSL, resolving to a string
+        BrooklynDslDeferredSupplier<?> configDsl = BrooklynDslCommon.config(configKey.getName());
+        Maybe<?> actualValue = execDslImmediately(configDsl, configKey.getType(), app, true);
+        assertTrue(actualValue.isAbsent());
+    }
+
+    @Test
     public void testSelf() throws Exception {
         BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.self();
         new SelfTestWorker(app, dsl).run();
@@ -263,7 +335,6 @@ public class DslTest extends BrooklynAppUnitTestSupport {
             // Reset sensor - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
             entity.sensors().set(sensor, null);
         }
-
     }
 
     private static class SelfTestWorker extends DslTestWorker {
@@ -292,6 +363,67 @@ public class DslTest extends BrooklynAppUnitTestSupport {
             assertEquals(actualValue.get(), parent);
         }
     }
+    
+    private class ConfigTestWorker extends DslTestWorker {
+        private ConfigKey<?> config;
+        private Object expectedValue;
+        private Function<? super Entity, ConfigValuePair> valueFunction;
+        
+        public ConfigTestWorker(TestApplication parent, ConfigKey<?> config, BrooklynDslDeferredSupplier<?> dsl) {
+            this(parent, config, newRandomConfigValueSupplier(), dsl);
+        }
+
+        public ConfigTestWorker(TestApplication parent, ConfigKey<?> config, Supplier<ConfigValuePair> valueSupplier, BrooklynDslDeferredSupplier<?> dsl) {
+            this(parent, config, Functions.forSupplier(valueSupplier), dsl);
+        }
+        
+        public ConfigTestWorker(TestApplication parent, ConfigKey<?> config, Function<? super Entity, ConfigValuePair> valueFunction, BrooklynDslDeferredSupplier<?> dsl) {
+            super(parent, dsl, config.getType());
+            this.config = config;
+            this.valueFunction = valueFunction;
+        }
+
+        @Override
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        protected void preResolve(final TestEntity entity) {
+            ConfigValuePair pair = valueFunction.apply(entity);
+            expectedValue = pair.expectedResolvedVal;
+            entity.config().set((ConfigKey)config, pair.configVal); // nasty cast, because val might be a DSL
+        }
+
+        @Override
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        protected void postResolve(TestEntity entity, Maybe<?> actualValue, boolean isImmediate) throws Exception {
+            if (satisfiedAsynchronously && isImmediate) {
+                // We accept a maybe.absent if we called getImmediately when satisfiedAsynchronously
+                assertTrue(actualValue.isAbsent() || expectedValue.equals(actualValue.get()), "actual="+actualValue+"; expected="+expectedValue);
+            } else {
+                assertEquals(actualValue.get(), expectedValue);
+            }
+            
+            // Reset config - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
+            entity.config().set((ConfigKey)config, (Object)null); // ugly cast from ConfigKey<?>
+        }
+    }
+
+    static class ConfigValuePair {
+        public final Object configVal;
+        public final Object expectedResolvedVal;
+        
+        public ConfigValuePair(Object configVal, Object expectedResolvedVal) {
+            this.configVal = configVal;
+            this.expectedResolvedVal = expectedResolvedVal;
+        }
+    }
+
+    private static Supplier<ConfigValuePair> newRandomConfigValueSupplier() {
+        return new Supplier<ConfigValuePair>() {
+            @Override public ConfigValuePair get() {
+                String val = Identifiers.makeRandomId(10);
+                return new ConfigValuePair(val, val);
+            }
+        };
+    }
 
     static Maybe<?> execDslImmediately(final BrooklynDslDeferredSupplier<?> dsl, final Class<?> type, final Entity context, boolean execInTask) throws Exception {
         // Exec'ing immediately will call DSL in current thread. It needs to find the context entity,

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8601d219/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
index 61dc513..3465ecf 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
@@ -139,7 +139,7 @@ public abstract class AbstractConfigurationSupportInternal implements BrooklynOb
         Object resolved = Tasks.resolving(unresolved)
                 .as(Object.class)
                 .defaultValue(marker)
-                .timeout(ValueResolver.REAL_REAL_QUICK_WAIT)
+                .immediately(true)
                 .context(getContext())
                 .swallowExceptions()
                 .get();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/8601d219/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index b13d2b5..b7e9085 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -60,29 +60,31 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
     }
 
     public void testCompletedTaskReturnsResultImmediately() {
-        // Call ValueResolver.getMaybe() from this thread, which has no execution context.
-        // However, the task has already been submitted and we have waited for it to complete.
-        // Therefore the ValueResolver can simply check for task.isDone() and return its result immediately.
         Task<String> t = newSleepTask(Duration.ZERO, "foo");
         app.getExecutionContext().submit(t).getUnchecked();
+        
+        // Below, we call ValueResolver.getMaybe() from this thread, which has no execution context.
+        // However, the task has already been submitted and we have waited for it to complete.
+        // Therefore the ValueResolver can simply check for task.isDone() and return its result immediately.
         Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
         Assert.assertEquals(result.get(), "foo");
     }
 
     public void testUnsubmittedTaskWhenNoExecutionContextFails() {
-        // ValueResolver.getMaybe() is called with no execution context. Therefore it will not execute the task.
         Task<String> t = newSleepTask(Duration.ZERO, "foo");
+        
+        // Below, we call ValueResolver.getMaybe() with no execution context. Therefore it will not execute the task.
         Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
         
         Assert.assertTrue(result.isAbsent(), "result="+result);
-        Exception exception = ((Maybe.Absent<?>)result).getException();
+        Exception exception = Maybe.getException(result);
         Assert.assertTrue(exception.toString().contains("no execution context available"), "exception="+exception);
     }
 
     public void testUnsubmittedTaskWithExecutionContextExecutesAndReturns() {
-        // ValueResolver.getMaybe() is called in app's execution context. Therefore it will execute the task.
         final Task<String> t = newSleepTask(Duration.ZERO, "foo");
         
+        // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
         Maybe<String>  result = app.getExecutionContext()
                 .submit(new Callable<Maybe<String> >() {
                     public Maybe<String>  call() throws Exception {
@@ -94,9 +96,10 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
     }
 
     public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOut() {
-        // ValueResolver.getMaybe() is called in app's execution context. Therefore it will execute the task.
         final Task<String> t = newSleepTask(Duration.ONE_MINUTE, "foo");
         
+        // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task.
+        // However, it will quickly timeout as the task will not have completed.
         Maybe<String>  result = app.getExecutionContext()
                 .submit(new Callable<Maybe<String> >() {
                     public Maybe<String>  call() throws Exception {
@@ -105,7 +108,7 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
                 .getUnchecked();
         
         Assert.assertTrue(result.isAbsent(), "result="+result);
-        Exception exception = ((Maybe.Absent<?>)result).getException();
+        Exception exception = Maybe.getException(result);
         Assert.assertTrue(exception.toString().contains("not completed when immediate completion requested"), "exception="+exception);
     }
 
@@ -140,11 +143,11 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
         assertContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
     }
     
-    public void testGetImmediateSupplierWithTimeoutUsesBlocking() {
+    public void testImmediateSupplierWithTimeoutUsesBlocking() {
         MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
         CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).timeout(Asserts.DEFAULT_LONG_TIMEOUT).get();
         assertNotNull(callInfo.task);
-        assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+        assertNotContainsCallingMethod(callInfo.stackTrace, "testImmediateSupplierWithTimeoutUsesBlocking");
     }
     
     public void testGetImmediatelyInTask() throws Exception {
@@ -167,7 +170,7 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
         CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get();
         assertNotNull(callInfo.task);
         assertEquals(BrooklynTaskTags.getContextEntity(callInfo.task), app);
-        assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+        assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediatelyFallsBackToDeferredCallInTask");
     }
     
     private static class MyImmediateAndDeferredSupplier implements ImmediateSupplier<CallInfo>, DeferredSupplier<CallInfo> {


[3/6] brooklyn-server git commit: BROOKLYN-356: threading for DSL task

Posted by sv...@apache.org.
BROOKLYN-356: threading for DSL task

Adds ImmediateSupplier, so can fetch Maybe<?> of DSL value immediately,
in current thread (rather than invoking in task with a timeout).


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/1cd2a091
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/1cd2a091
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/1cd2a091

Branch: refs/heads/master
Commit: 1cd2a09198d3fa962f08676abc2fea1fe3d499da
Parents: 3f37e65
Author: Aled Sage <al...@gmail.com>
Authored: Tue Oct 18 19:08:18 2016 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 11:06:02 2016 +0000

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/ManagementContext.java    |   6 +-
 .../spi/dsl/BrooklynDslDeferredSupplier.java    |   4 +-
 .../spi/dsl/methods/BrooklynDslCommon.java      |  78 +++++++-
 .../brooklyn/spi/dsl/methods/DslComponent.java  |  62 +++++-
 .../brooklyn/camp/brooklyn/dsl/DslTest.java     | 192 +++++++++++++++----
 .../TransformerEnricherWithDslTest.java         |   9 +-
 .../core/sensor/DependentConfiguration.java     |  87 +++++++--
 .../brooklyn/enricher/stock/Transformer.java    |   3 +-
 .../util/core/task/ImmediateSupplier.java       |  50 +++++
 .../brooklyn/util/core/task/ValueResolver.java  |  27 ++-
 .../brooklyn/util/core/task/TasksTest.java      |   2 +-
 .../util/core/task/ValueResolverTest.java       | 187 ++++++++++++++----
 12 files changed, 600 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
index cabadee..4931be7 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
@@ -154,7 +154,11 @@ public interface ManagementContext {
     /**
      * Returns a {@link SubscriptionContext} instance representing subscriptions
      * (from the {@link SubscriptionManager}) associated with this entity, and capable 
-     * of conveniently subscribing on behalf of that entity  
+     * of conveniently subscribing on behalf of that entity.
+     * 
+     * For subscriptions made using this {@link SubscriptionContext}, the calls to 
+     * {@link org.apache.brooklyn.api.sensor.SensorEventListener#onEvent(org.apache.brooklyn.api.sensor.SensorEvent)}
+     * will be made in a task that has the {@code CONTEXT_ENTITY} tag set to this entity (see BrooklynTaskTag).
      */
     SubscriptionContext getSubscriptionContext(Entity entity);
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
index 0225bbb..86f1c82 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
@@ -34,8 +34,10 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
 import org.apache.brooklyn.util.core.task.BasicExecutionContext;
 import org.apache.brooklyn.util.core.task.DeferredSupplier;
+import org.apache.brooklyn.util.core.task.ImmediateSupplier;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,7 +66,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
  * use {@code synchronized} because that is not interruptible - if someone tries to get the value
  * and interrupts after a short wait, then we must release the lock immediately and return.
  **/
-public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier<T>, TaskFactory<Task<T>>, Serializable {
+public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier<T>, ImmediateSupplier<T>, TaskFactory<Task<T>>, Serializable {
 
     private static final long serialVersionUID = -8789624905412198233L;
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
index a4bedf7..d92bf46 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/BrooklynDslCommon.java
@@ -39,6 +39,7 @@ import org.apache.brooklyn.camp.brooklyn.spi.creation.BrooklynYamlTypeInstantiat
 import org.apache.brooklyn.camp.brooklyn.spi.creation.EntitySpecConfiguration;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.BrooklynDslDeferredSupplier;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.DslComponent.Scope;
+import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.config.external.ExternalConfigSupplier;
 import org.apache.brooklyn.core.entity.EntityDynamicType;
 import org.apache.brooklyn.core.entity.EntityInternal;
@@ -53,7 +54,9 @@ import org.apache.brooklyn.util.core.ClassLoaderUtils;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.flags.FlagUtils;
 import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.core.task.ValueResolver;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.javalang.Reflections;
 import org.apache.brooklyn.util.text.StringEscapes.JavaStringEscapes;
 import org.apache.brooklyn.util.text.Strings;
@@ -252,6 +255,11 @@ public class BrooklynDslCommon {
         }
 
         @Override
+        public final Maybe<String> getImmediately() {
+            return DependentConfiguration.formatStringImmediately(pattern, args);
+        }
+
+        @Override
         public Task<String> newTask() {
             return DependentConfiguration.formatString(pattern, args);
         }
@@ -295,6 +303,11 @@ public class BrooklynDslCommon {
         }
 
         @Override
+        public Maybe<String> getImmediately() {
+            return DependentConfiguration.regexReplacementImmediately(source, pattern, replacement);
+        }
+        
+        @Override
         public Task<String> newTask() {
             return DependentConfiguration.regexReplacement(source, pattern, replacement);
         }
@@ -398,6 +411,58 @@ public class BrooklynDslCommon {
             this.config = MutableMap.copyOf(config);
         }
 
+
+        @Override
+        public Maybe<Object> getImmediately() {
+            Class<?> type = this.type;
+            if (type == null) {
+                EntityInternal entity = entity();
+                try {
+                    type = new ClassLoaderUtils(BrooklynDslCommon.class, entity).loadClass(typeName);
+                } catch (ClassNotFoundException e) {
+                    throw Exceptions.propagate(e);
+                }
+            }
+            final Class<?> clazz = type;
+
+            final ExecutionContext executionContext = ((EntityInternal)entity()).getExecutionContext();
+
+            // Marker exception that one of our component-parts cannot yet be resolved - 
+            // throwing and catching this allows us to abort fast.
+            // A bit messy to use exceptions in normal control flow, but this allows the Maps util methods to be used.
+            @SuppressWarnings("serial")
+            class UnavailableException extends RuntimeException {
+            }
+            
+            final Function<Object, Object> resolver = new Function<Object, Object>() {
+                @Override public Object apply(Object value) {
+                    Maybe<Object> result = Tasks.resolving(value, Object.class).context(executionContext).deep(true).immediately(true).getMaybe();
+                    if (result.isAbsent()) {
+                        throw new UnavailableException();
+                    } else {
+                        return result.get();
+                    }
+                }
+            };
+            
+            try {
+                Map<String, Object> resolvedFields = MutableMap.copyOf(Maps.transformValues(fields, resolver));
+                Map<String, Object> resolvedConfig = MutableMap.copyOf(Maps.transformValues(config, resolver));
+                List<Object> resolvedConstructorArgs = MutableList.copyOf(Lists.transform(constructorArgs, resolver));
+                List<Object> resolvedFactoryMethodArgs = MutableList.copyOf(Lists.transform(factoryMethodArgs, resolver));
+    
+                Object result;
+                if (factoryMethodName == null) {
+                    result = create(clazz, resolvedConstructorArgs, resolvedFields, resolvedConfig);
+                } else {
+                    result = create(clazz, factoryMethodName, resolvedFactoryMethodArgs, resolvedFields, resolvedConfig);
+                }
+                return Maybe.of(result);
+            } catch (UnavailableException e) {
+                return Maybe.absent();
+            }
+        }
+        
         @Override
         public Task<Object> newTask() {
             Class<?> type = this.type;
@@ -463,7 +528,7 @@ public class BrooklynDslCommon {
             }
         }
 
-        public static Object create(Class<?> type, String factoryMethodName, List<Object> factoryMethodArgs, Map<String,?> fields, Map<String,?> config) {
+        public static Object create(Class<?> type, String factoryMethodName, List<?> factoryMethodArgs, Map<String,?> fields, Map<String,?> config) {
             try {
                 Object bean = Reflections.invokeMethodFromArgs(type, factoryMethodName, factoryMethodArgs).get();
                 BeanUtils.populate(bean, fields);
@@ -525,6 +590,12 @@ public class BrooklynDslCommon {
         }
 
         @Override
+        public final Maybe<Object> getImmediately() {
+            ManagementContextInternal managementContext = DslExternal.managementContext();
+            return Maybe.<Object>of(managementContext.getExternalConfigProviderRegistry().getConfig(providerName, key));
+        }
+
+        @Override
         public Task<Object> newTask() {
             return Tasks.<Object>builder()
                 .displayName("resolving external configuration: '" + key + "' from provider '" + providerName + "'")
@@ -597,6 +668,11 @@ public class BrooklynDslCommon {
             }
 
             @Override
+            public Maybe<Function<String, String>> getImmediately() {
+                return DependentConfiguration.regexReplacementImmediately(pattern, replacement);
+            }
+            
+            @Override
             public Task<Function<String, String>> newTask() {
                 return DependentConfiguration.regexReplacement(pattern, replacement);
             }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
index 0d3321b..b134450 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
@@ -39,6 +39,8 @@ import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.util.core.task.TaskBuilder;
 import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.text.StringEscapes.JavaStringEscapes;
 
@@ -88,6 +90,15 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
     // ---------------------------
     
     @Override
+    public final Maybe<Entity> getImmediately() {
+        try {
+            return Maybe.of(new EntityInScopeFinder(scopeComponent, scope, componentId).call());
+        } catch (Exception e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    @Override
     public Task<Entity> newTask() {
         return TaskBuilder.<Entity>builder()
                 .displayName(toString())
@@ -224,6 +235,11 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
         }
 
         @Override
+        public Maybe<Object> getImmediately() {
+            return Maybe.<Object>of(component.get().getId());
+        }
+        
+        @Override
         public Task<Object> newTask() {
             Entity targetEntity = component.get();
             return Tasks.create("identity", Callables.<Object>returning(targetEntity.getId()));
@@ -259,6 +275,17 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
             this.sensorName = sensorName;
         }
 
+        @Override
+        public final Maybe<Object> getImmediately() {
+            Entity targetEntity = component.getImmediately().get();
+            AttributeSensor<?> targetSensor = (AttributeSensor<?>) targetEntity.getEntityType().getSensor(sensorName);
+            if (targetSensor == null) {
+                targetSensor = Sensors.newSensor(Object.class, sensorName);
+            }
+            Object result = targetEntity.sensors().get(targetSensor);
+            return GroovyJavaMethods.truth(result) ? Maybe.of(result) : Maybe.absent();
+        }
+
         @SuppressWarnings("unchecked")
         @Override
         public Task<Object> newTask() {
@@ -303,6 +330,12 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
         }
 
         @Override
+        public final Maybe<Object> getImmediately() {
+            Entity targetEntity = component.get();
+            return Maybe.of(targetEntity.getConfig(ConfigKeys.newConfigKey(Object.class, keyName)));
+        }
+
+        @Override
         public Task<Object> newTask() {
             return Tasks.builder()
                     .displayName("retrieving config for "+keyName)
@@ -353,6 +386,33 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
         }
 
         @Override
+        public Maybe<Sensor<?>> getImmediately() {
+            return getImmediately(sensorName, false);
+        }
+        
+        protected Maybe<Sensor<?>> getImmediately(Object si, boolean resolved) {
+            if (si instanceof Sensor) {
+                return Maybe.<Sensor<?>>of((Sensor<?>)si);
+            } else if (si instanceof String) {
+                Entity targetEntity = component.get();
+                Sensor<?> result = null;
+                if (targetEntity!=null) {
+                    result = targetEntity.getEntityType().getSensor((String)si);
+                }
+                if (result!=null) return Maybe.<Sensor<?>>of(result);
+                return Maybe.<Sensor<?>>of(Sensors.newSensor(Object.class, (String)si));
+            }
+            if (!resolved) {
+                // attempt to resolve, and recurse
+                final ExecutionContext executionContext = ((EntityInternal)entity()).getExecutionContext();
+                Maybe<Object> resolvedSi = Tasks.resolving(si, Object.class).deep(true).immediately(true).context(executionContext).getMaybe();
+                if (resolvedSi.isAbsent()) return Maybe.absent();
+                return getImmediately(resolvedSi.get(), true);
+            }
+            throw new IllegalStateException("Cannot resolve '"+sensorName+"' as a sensor (got type "+(si == null ? "null" : si.getClass().getName()+")"));
+        }
+        
+        @Override
         public Task<Sensor<?>> newTask() {
             return Tasks.<Sensor<?>>builder()
                     .displayName("looking up sensor for "+sensorName)
@@ -380,7 +440,7 @@ public class DslComponent extends BrooklynDslDeferredSupplier<Entity> {
                                 final ExecutionContext executionContext = ((EntityInternal)entity()).getExecutionContext();
                                 return resolve(Tasks.resolveDeepValue(si, Object.class, executionContext), true);
                             }
-                            throw new IllegalStateException("Cannot resolve '"+sensorName+"' as a sensor");
+                            throw new IllegalStateException("Cannot resolve '"+sensorName+"' as a sensor (got type "+(si == null ? "null" : si.getClass().getName()+")"));
                         }})
                     .build();
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
index 29cc7d2..d27e6c2 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/dsl/DslTest.java
@@ -15,11 +15,16 @@
  */
 package org.apache.brooklyn.camp.brooklyn.dsl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
@@ -27,42 +32,79 @@ import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.BrooklynDslDeferredSupplier;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.BrooklynDslCommon;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestApplication;
 import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.core.task.ValueResolver;
+import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.text.Identifiers;
 import org.apache.brooklyn.util.time.Duration;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 public class DslTest extends BrooklynAppUnitTestSupport {
 
     private static final int MAX_PARALLEL_RESOLVERS = 50;
-    private static final int RESOLVER_ITERATIONS = 1000;
+    private static final int MANY_RESOLVER_ITERATIONS = 100;
+    
+    private ListeningScheduledExecutorService executor;
+    private Random random = new Random();
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        try {
+            if (executor != null) executor.shutdownNow();
+        } finally {
+            super.tearDown();
+        }
+    }
+    
+    @Test
+    public void testAttributeWhenReadyEmptyDoesNotBlock() throws Exception {
+        BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestApplication.MY_ATTRIBUTE.getName());
+        Maybe<?> actualValue = execDslRealRealyQuick(dsl, TestApplication.MY_ATTRIBUTE.getType(), app);
+        assertTrue(actualValue.isAbsent());
+    }
 
     @Test
-    public void testAttributeWhenReadyEmptyDoesNotBlock() {
+    public void testAttributeWhenReadyEmptyImmediatelyDoesNotBlock() throws Exception {
         BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestApplication.MY_ATTRIBUTE.getName());
-        Maybe<? super String> actualValue = Tasks.resolving(dsl).as(TestEntity.NAME.getType())
-                .context(app)
-                .description("Computing sensor "+TestEntity.NAME+" from "+dsl)
-                .timeout(ValueResolver.REAL_REAL_QUICK_WAIT)
-                .getMaybe();
+        Maybe<?> actualValue = execDslImmediately(dsl, TestApplication.MY_ATTRIBUTE.getType(), app, true);
         assertTrue(actualValue.isAbsent());
     }
 
     @Test
-    public void testAttributeWhenReady() {
+    public void testAttributeWhenReady() throws Exception {
         BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestEntity.NAME.getName());
         new AttributeWhenReadyTestWorker(app, TestEntity.NAME, dsl).run();
     }
 
+    @Test
+    public void testAttributeWhenReadyBlocksUntilReady() throws Exception {
+        // Fewer iterations, because there is a sleep each time
+        BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestEntity.NAME.getName());
+        new AttributeWhenReadyTestWorker(app, TestEntity.NAME, dsl).eventually(true).resolverIterations(2).run();
+    }
+
     @Test(groups="Integration")
-    public void testAttributeWhenReadyConcurrent() {
+    public void testAttributeWhenReadyConcurrent() throws Exception {
         final BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.attributeWhenReady(TestEntity.NAME.getName());
         runConcurrentWorker(new Supplier<Runnable>() {
             @Override
@@ -73,13 +115,13 @@ public class DslTest extends BrooklynAppUnitTestSupport {
     }
 
     @Test
-    public void testSelf() {
+    public void testSelf() throws Exception {
         BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.self();
         new SelfTestWorker(app, dsl).run();
     }
 
     @Test(groups="Integration")
-    public void testSelfConcurrent() {
+    public void testSelfConcurrent() throws Exception {
         final BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.self();
         runConcurrentWorker(new Supplier<Runnable>() {
             @Override
@@ -90,13 +132,13 @@ public class DslTest extends BrooklynAppUnitTestSupport {
     }
 
     @Test
-    public void testParent() {
+    public void testParent() throws Exception {
         BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.parent();
         new ParentTestWorker(app, dsl).run();
     }
 
     @Test(groups="Integration")
-    public void testParentConcurrent() {
+    public void testParentConcurrent() throws Exception {
         final BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.parent();
         runConcurrentWorker(new Supplier<Runnable>() {
             @Override
@@ -106,7 +148,7 @@ public class DslTest extends BrooklynAppUnitTestSupport {
         });
     }
 
-    public void runConcurrentWorker(Supplier<Runnable> taskSupplier) {
+    protected void runConcurrentWorker(Supplier<Runnable> taskSupplier) {
         Collection<Task<?>> results = new ArrayList<>();
         for (int i = 0; i < MAX_PARALLEL_RESOLVERS; i++) {
             Task<?> result = mgmt.getExecutionManager().submit(taskSupplier.get());
@@ -118,39 +160,64 @@ public class DslTest extends BrooklynAppUnitTestSupport {
     }
     
     private static class DslTestWorker implements Runnable {
-        protected TestApplication parent;
-        protected BrooklynDslDeferredSupplier<?> dsl;
+        protected final TestApplication parent;
+        protected final BrooklynDslDeferredSupplier<?> dsl;
+        protected final Class<?> type;
         protected EntitySpec<TestEntity> childSpec = EntitySpec.create(TestEntity.class);
-        protected Class<?> type;
-
+        protected int resolverIterations = MANY_RESOLVER_ITERATIONS;
+        protected boolean eventually = false;
+        private boolean wrapInTaskForImmediately = true;
+        
         public DslTestWorker(TestApplication parent, BrooklynDslDeferredSupplier<?> dsl, Class<?> type) {
-            this.parent = parent;
-            this.dsl = dsl;
-            this.type = type;
+            this.parent = checkNotNull(parent, "parent");
+            this.dsl = checkNotNull(dsl, "dsl");
+            this.type = checkNotNull(type, "type");
         }
 
+        public DslTestWorker resolverIterations(int val) {
+            resolverIterations = val;
+            return this;
+        }
+        
+        public DslTestWorker eventually(boolean val) {
+            eventually = val;
+            return this;
+        }
+        
+        public DslTestWorker wrapInTaskForImmediately(boolean val) {
+            wrapInTaskForImmediately = val;
+            return this;
+        }
+        
         @Override
         public void run() {
-            TestEntity entity = parent.createAndManageChild(childSpec);
-            for (int i = 0; i < RESOLVER_ITERATIONS; i++) {
+            TestEntity entity = parent.addChild(childSpec);
+            for (int i = 0; i < resolverIterations; i++) {
                 preResolve(entity);
-                Maybe<?> actualValue = Tasks.resolving(dsl).as(type)
-                        .context(entity)
-                        .description("Computing sensor "+type+" from "+dsl)
-                        .timeout(Duration.ONE_MINUTE)
-                        .getMaybe();
-                postResolve(actualValue);
+                Maybe<?> eventualValue = execDslEventually(dsl, type, entity, Duration.FIVE_SECONDS);//FIXME ONE_MINUTE);
+                postResolve(entity, eventualValue);
+
+                if (!eventually) {
+                    preResolve(entity);
+                    Maybe<?> immediateValue;
+                    try {
+                        immediateValue = execDslImmediately(dsl, type, entity, wrapInTaskForImmediately);
+                    } catch (Exception e) {
+                        throw Exceptions.propagate(e);
+                    }
+                    postResolve(entity, immediateValue);
+                }
             }
         }
 
         protected void preResolve(TestEntity entity) {
         }
 
-        protected void postResolve(Maybe<?> actualValue) {
+        protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
         }
     }
 
-    private static class AttributeWhenReadyTestWorker extends DslTestWorker {
+    private class AttributeWhenReadyTestWorker extends DslTestWorker {
         private AttributeSensor<String> sensor;
         private String expectedValue;
 
@@ -160,33 +227,43 @@ public class DslTest extends BrooklynAppUnitTestSupport {
         }
 
         @Override
-        protected void preResolve(TestEntity entity) {
+        protected void preResolve(final TestEntity entity) {
             expectedValue = Identifiers.makeRandomId(10);
-            entity.sensors().set(sensor, expectedValue);
+            Runnable job = new Runnable() {
+                public void run() {
+                    entity.sensors().set(sensor, expectedValue);
+                }
+            };
+            if (eventually) {
+                executor.schedule(job, random.nextInt(20), TimeUnit.MILLISECONDS);
+            } else {
+                job.run();
+            }
         }
 
-
         @Override
-        protected void postResolve(Maybe<?> actualValue) {
+        protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
             assertEquals(actualValue.get(), expectedValue);
+            
+            if (eventually) {
+                // Reset sensor - otherwise if run in a loop the old value will be picked up, before our execute sets the new value
+                entity.sensors().set(sensor, null);
+            }
         }
 
     }
 
     private static class SelfTestWorker extends DslTestWorker {
-        private TestEntity entity;
-
         public SelfTestWorker(TestApplication parent, BrooklynDslDeferredSupplier<?> dsl) {
             super(parent, dsl, Entity.class);
         }
 
         @Override
         protected void preResolve(TestEntity entity) {
-            this.entity = entity;
         }
 
         @Override
-        protected void postResolve(Maybe<?> actualValue) {
+        protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
             assertEquals(actualValue.get(), entity);
         }
 
@@ -198,9 +275,44 @@ public class DslTest extends BrooklynAppUnitTestSupport {
         }
 
         @Override
-        protected void postResolve(Maybe<?> actualValue) {
+        protected void postResolve(TestEntity entity, Maybe<?> actualValue) {
             assertEquals(actualValue.get(), parent);
         }
     }
 
+    static Maybe<?> execDslImmediately(final BrooklynDslDeferredSupplier<?> dsl, final Class<?> type, final Entity context, boolean execInTask) throws Exception {
+        // Exec'ing immediately will call DSL in current thread. It needs to find the context entity,
+        // and does this using BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()).
+        // If we are not in a task executed by the context entity, then this lookup will fail. 
+        Callable<Maybe<?>> job = new Callable<Maybe<?>>() {
+            public Maybe<?> call() throws Exception {
+                return Tasks.resolving(dsl).as(type)
+                        .context(context)
+                        .description("Computing "+dsl)
+                        .immediately(true)
+                        .getMaybe();
+            }
+        };
+        if (execInTask) {
+            Task<Maybe<?>> task = ((EntityInternal)context).getExecutionContext().submit(job);
+            task.get(Asserts.DEFAULT_LONG_TIMEOUT);
+            assertTrue(task.isDone());
+            return task.get();
+            
+        } else {
+            return job.call();
+        }
+    }
+    
+    static Maybe<?> execDslRealRealyQuick(BrooklynDslDeferredSupplier<?> dsl, Class<?> type, Entity context) {
+        return execDslEventually(dsl, type, context, ValueResolver.REAL_REAL_QUICK_WAIT);
+    }
+    
+    static Maybe<?> execDslEventually(BrooklynDslDeferredSupplier<?> dsl, Class<?> type, Entity context, Duration timeout) {
+        return Tasks.resolving(dsl).as(type)
+                .context(context)
+                .description("Computing "+dsl)
+                .timeout(timeout)
+                .getMaybe();
+    }
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java
index 2f4465a..3a62016 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/enricher/TransformerEnricherWithDslTest.java
@@ -33,16 +33,17 @@ public class TransformerEnricherWithDslTest extends BrooklynAppUnitTestSupport {
 
     int START_PORT = 10000;
 
-    @Test(groups="Broken")
+    @Test
     // See https://issues.apache.org/jira/browse/BROOKLYN-356
     public void testTransformerResolvesResolvableValues() {
-        testTransformerResolvesResolvableValues(START_PORT, 200);
+        LOG.info("Starting 100 iterations of testTransformerResolvesResolvableValues");
+        testTransformerResolvesResolvableValues(START_PORT, 100);
     }
 
-    @Test(groups={"Integration", "Broken"}, invocationCount=10)
+    @Test(groups={"Integration"}, invocationCount=10)
     // See https://issues.apache.org/jira/browse/BROOKLYN-356
     public void testTransformerResolvesResolvableValuesIntegration() {
-        LOG.info("Starting 1000 iterations");
+        LOG.info("Starting 1000 iterations of testTransformerResolvesResolvableValues");
         testTransformerResolvesResolvableValues(START_PORT, 1000);
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
index 868bdc1..1263226 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 
 import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.Task;
@@ -55,6 +56,7 @@ import org.apache.brooklyn.util.core.task.BasicExecutionContext;
 import org.apache.brooklyn.util.core.task.BasicTask;
 import org.apache.brooklyn.util.core.task.DeferredSupplier;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.ImmediateSupplier;
 import org.apache.brooklyn.util.core.task.ParallelTask;
 import org.apache.brooklyn.util.core.task.TaskInternal;
 import org.apache.brooklyn.util.core.task.Tasks;
@@ -67,6 +69,7 @@ import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
 import org.apache.brooklyn.util.guava.Functionals;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.text.StringFunctions;
+import org.apache.brooklyn.util.text.StringFunctions.RegexReplacer;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.CountdownTimer;
 import org.apache.brooklyn.util.time.Duration;
@@ -471,22 +474,67 @@ public class DependentConfiguration {
             
         return transformMultiple(
             MutableMap.<String,String>of("displayName", "formatting '"+spec+"' with "+taskArgs.size()+" task"+(taskArgs.size()!=1?"s":"")), 
-                new Function<List<Object>, String>() {
-            @Override public String apply(List<Object> input) {
-                Iterator<?> tri = input.iterator();
-                Object[] vv = new Object[args.length];
-                int i=0;
-                for (Object arg : args) {
-                    if (arg instanceof TaskAdaptable || arg instanceof TaskFactory) vv[i] = tri.next();
-                    else if (arg instanceof DeferredSupplier) vv[i] = ((DeferredSupplier<?>) arg).get();
-                    else vv[i] = arg;
-                    i++;
-                }
-                return String.format(spec, vv);
-            }},
+            new Function<List<Object>, String>() {
+                @Override public String apply(List<Object> input) {
+                    Iterator<?> tri = input.iterator();
+                    Object[] vv = new Object[args.length];
+                    int i=0;
+                    for (Object arg : args) {
+                        if (arg instanceof TaskAdaptable || arg instanceof TaskFactory) vv[i] = tri.next();
+                        else if (arg instanceof DeferredSupplier) vv[i] = ((DeferredSupplier<?>) arg).get();
+                        else vv[i] = arg;
+                        i++;
+                    }
+                    return String.format(spec, vv);
+                }},
             taskArgs);
     }
 
+    /**
+     * @throws ImmediateSupplier.ImmediateUnsupportedException if cannot evaluate this in a timely manner
+     */
+    public static Maybe<String> formatStringImmediately(final String spec, final Object ...args) {
+        List<Object> resolvedArgs = Lists.newArrayList();
+        for (Object arg : args) {
+            Maybe<?> argVal = resolveImmediately(arg);
+            if (argVal.isAbsent()) return Maybe.absent();
+            resolvedArgs.add(argVal.get());
+        }
+
+        return Maybe.of(String.format(spec, resolvedArgs.toArray()));
+    }
+
+    protected static <T> Maybe<?> resolveImmediately(Object val) {
+        if (val instanceof ImmediateSupplier<?>) {
+            return ((ImmediateSupplier<?>)val).getImmediately();
+        } else if (val instanceof TaskAdaptable) {
+            throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot immediately resolve value "+val);
+        } else if (val instanceof TaskFactory) {
+            throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot immediately resolve value "+val);
+        } else if (val instanceof DeferredSupplier<?>) {
+            throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot immediately resolve value "+val);
+        } else {
+            return Maybe.of(val);
+        }
+    }
+    
+    public static Maybe<String> regexReplacementImmediately(Object source, Object pattern, Object replacement) {
+        Maybe<?> resolvedSource = resolveImmediately(source);
+        if (resolvedSource.isAbsent()) return Maybe.absent();
+        String resolvedSourceStr = String.valueOf(resolvedSource.get());
+        
+        Maybe<?> resolvedPattern = resolveImmediately(pattern);
+        if (resolvedPattern.isAbsent()) return Maybe.absent();
+        String resolvedPatternStr = String.valueOf(resolvedPattern.get());
+        
+        Maybe<?> resolvedReplacement = resolveImmediately(replacement);
+        if (resolvedReplacement.isAbsent()) return Maybe.absent();
+        String resolvedReplacementStr = String.valueOf(resolvedReplacement.get());
+
+        String result = new StringFunctions.RegexReplacer(resolvedPatternStr, resolvedReplacementStr).apply(resolvedSourceStr);
+        return Maybe.of(result);
+    }
+
     public static Task<String> regexReplacement(Object source, Object pattern, Object replacement) {
         List<TaskAdaptable<Object>> taskArgs = getTaskAdaptable(source, pattern, replacement);
         Function<List<Object>, String> transformer = new RegexTransformerString(source, pattern, replacement);
@@ -497,6 +545,19 @@ public class DependentConfiguration {
         );
     }
 
+    public static Maybe<Function<String, String>> regexReplacementImmediately(Object pattern, Object replacement) {
+        Maybe<?> resolvedPattern = resolveImmediately(pattern);
+        if (resolvedPattern.isAbsent()) return Maybe.absent();
+        String resolvedPatternStr = String.valueOf(resolvedPattern.get());
+        
+        Maybe<?> resolvedReplacement = resolveImmediately(replacement);
+        if (resolvedReplacement.isAbsent()) return Maybe.absent();
+        String resolvedReplacementStr = String.valueOf(resolvedReplacement.get());
+
+        RegexReplacer result = new StringFunctions.RegexReplacer(resolvedPatternStr, resolvedReplacementStr);
+        return Maybe.<Function<String, String>>of(result);
+    }
+
     public static Task<Function<String, String>> regexReplacement(Object pattern, Object replacement) {
         List<TaskAdaptable<Object>> taskArgs = getTaskAdaptable(pattern, replacement);
         Function<List<Object>, Function<String, String>> transformer = new RegexTransformerFunction(pattern, replacement);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
index 9c4e657..2cc4d75 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
@@ -25,7 +25,6 @@ import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.core.task.ValueResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,7 +89,7 @@ public class Transformer<T,U> extends AbstractTransformer<T,U> {
                 return (U) Tasks.resolving(targetValueRaw).as(targetSensor.getType())
                     .context(entity)
                     .description("Computing sensor "+targetSensor+" from "+targetValueRaw)
-                    .timeout(ValueResolver.NON_BLOCKING_WAIT)
+                    .immediately(true)
                     .getMaybe().orNull();
             }
             public String toString() {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
new file mode 100644
index 0000000..03a1da6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.util.core.task;
+
+import org.apache.brooklyn.util.guava.Maybe;
+
+/**
+ * A class that supplies objects of a single type, without blocking for any significant length
+ * of time.
+ */
+public interface ImmediateSupplier<T> {
+    
+    /**
+     * Indicates that we are unable to get the value immediately, because that is not supported
+     * (e.g. because the supplier is composed of sub-tasks that do not support {@link ImmediateSupplier}.  
+     */
+    public static class ImmediateUnsupportedException extends RuntimeException {
+        private static final long serialVersionUID = -7942339715007942797L;
+        
+        public ImmediateUnsupportedException(String message) {
+            super(message);
+        }
+        public ImmediateUnsupportedException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+    
+    /**
+     * Gets the value promptly, or returns {@link Maybe#absent()} if the value is not yet available.
+     * 
+     * @throws ImmediateUnsupportedException if cannot determinte the value immediately
+     */
+    Maybe<T> getImmediately();
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
index 2942b23..10fd665 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
@@ -109,6 +109,7 @@ public class ValueResolver<T> implements DeferredSupplier<T> {
     Boolean embedResolutionInTask;
     /** timeout on execution, if possible, or if embedResolutionInTask is true */
     Duration timeout;
+    boolean immediately;
     boolean isTransientTask = true;
     
     T defaultValue = null;
@@ -142,6 +143,7 @@ public class ValueResolver<T> implements DeferredSupplier<T> {
         parentOriginalValue = parent.getOriginalValue();
 
         timeout = parent.timeout;
+        immediately = parent.immediately;
         parentTimer = parent.parentTimer;
         if (parentTimer!=null && parentTimer.isExpired())
             expired = true;
@@ -250,7 +252,18 @@ public class ValueResolver<T> implements DeferredSupplier<T> {
         this.timeout = timeout;
         return this;
     }
-    
+
+    /**
+     * Whether the value should be resolved immediately (and if not available immediately,
+     * return absent).
+     */
+    @Beta
+    public ValueResolver<T> immediately(boolean val) {
+        this.immediately = val;
+        if (timeout == null) timeout = ValueResolver.NON_BLOCKING_WAIT;
+        return this;
+    }
+
     protected void checkTypeNotNull() {
         if (type==null) 
             throw new NullPointerException("type must be set to resolve, for '"+value+"'"+(description!=null ? ", "+description : ""));
@@ -300,6 +313,18 @@ public class ValueResolver<T> implements DeferredSupplier<T> {
             return Maybe.of((T) v);
         
         try {
+            if (immediately && v instanceof ImmediateSupplier) {
+                final ImmediateSupplier<?> supplier = (ImmediateSupplier<?>) v;
+                try {
+                    Maybe<?> result = supplier.getImmediately();
+                    
+                    // Recurse: need to ensure returned value is cast, etc
+                    return (result.isPresent()) ? new ValueResolver(result.get(), type, this).getMaybe() : Maybe.<T>absent();
+                } catch (ImmediateSupplier.ImmediateUnsupportedException e) {
+                    log.debug("Unable to resolve-immediately for "+description+" ("+v+"); falling back to executing with timeout", e);
+                }
+            }
+            
             //if it's a task or a future, we wait for the task to complete
             if (v instanceof TaskAdaptable<?>) {
                 //if it's a task, we make sure it is submitted

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
index 074c14a..e1952c2 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java
@@ -121,7 +121,7 @@ public class TasksTest extends BrooklynAppUnitTestSupport {
         Task<Object> t = Tasks.builder().body(Functionals.callable(EntityFunctions.config(TestEntity.CONF_OBJECT), app)).build();
         ValueResolver<Object> v = Tasks.resolving(t).as(Object.class).context(app);
         
-        ValueResolverTest.assertThrowsOnMaybe(v);
+        ValueResolverTest.assertThrowsOnGetMaybe(v);
         ValueResolverTest.assertThrowsOnGet(v);
         
         v.swallowExceptions();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1cd2a091/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index 6c5e990..43cd0ba 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -18,10 +18,18 @@
  */
 package org.apache.brooklyn.util.core.task;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
+
+import java.util.Arrays;
 import java.util.concurrent.Callable;
 
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.time.Duration;
 import org.apache.brooklyn.util.time.Time;
@@ -41,6 +49,125 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
         super.setUp();
     }
     
+    public void testTimeoutZero() {
+        Maybe<String> result = Tasks.resolving(newSleepTask(Duration.TEN_SECONDS, "foo")).as(String.class).context(app).timeout(Duration.ZERO).getMaybe();
+        Assert.assertFalse(result.isPresent());
+    }
+    
+    public void testTimeoutBig() {
+        Maybe<String> result = Tasks.resolving(newSleepTask(Duration.ZERO, "foo")).as(String.class).context(app).timeout(Duration.TEN_SECONDS).getMaybe();
+        Assert.assertEquals(result.get(), "foo");
+    }
+
+    public void testNoExecutionContextOnCompleted() {
+        Task<String> t = newSleepTask(Duration.ZERO, "foo");
+        app.getExecutionContext().submit(t).getUnchecked();
+        Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
+        Assert.assertEquals(result.get(), "foo");
+    }
+
+    public void testSwallowError() {
+        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app).swallowExceptions();
+        assertMaybeIsAbsent(result);
+        assertThrowsOnGet(result);
+    }
+
+    public void testDontSwallowError() {
+        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app);
+        assertThrowsOnGetMaybe(result);
+        assertThrowsOnGet(result);
+    }
+
+    public void testDefaultWhenSwallowError() {
+        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app).swallowExceptions().defaultValue("foo");
+        assertMaybeIsAbsent(result);
+        Assert.assertEquals(result.get(), "foo");
+    }
+
+    public void testDefaultBeforeDelayAndError() {
+        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.TEN_SECONDS)).as(String.class).context(app).timeout(Duration.ZERO).defaultValue("foo");
+        assertMaybeIsAbsent(result);
+        Assert.assertEquals(result.get(), "foo");
+    }
+
+    public void testGetImmediately() {
+        MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
+        CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get();
+        assertNull(callInfo.task);
+        assertContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+    }
+    
+    public void testGetImmediateSupplierWithTimeoutUsesBlocking() {
+        MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
+        CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).timeout(Asserts.DEFAULT_LONG_TIMEOUT).get();
+        assertNotNull(callInfo.task);
+        assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+    }
+    
+    public void testGetImmediatelyInTask() throws Exception {
+        final MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
+        Task<CallInfo> task = app.getExecutionContext().submit(new Callable<CallInfo>() {
+            public CallInfo call() {
+                return myUniquelyNamedMethod();
+            }
+            private CallInfo myUniquelyNamedMethod() {
+                return Tasks.resolving(supplier).as(CallInfo.class).immediately(true).get();
+            }
+        });
+        CallInfo callInfo = task.get();
+        assertEquals(callInfo.task, task);
+        assertContainsCallingMethod(callInfo.stackTrace, "myUniquelyNamedMethod");
+    }
+    
+    public void testGetImmediatelyFallsBackToDeferredCallInTask() throws Exception {
+        final MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier(true);
+        CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get();
+        assertNotNull(callInfo.task);
+        assertEquals(BrooklynTaskTags.getContextEntity(callInfo.task), app);
+        assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+    }
+    
+    private static class MyImmediateAndDeferredSupplier implements ImmediateSupplier<CallInfo>, DeferredSupplier<CallInfo> {
+        private final boolean failImmediately;
+        
+        public MyImmediateAndDeferredSupplier() {
+            this(false);
+        }
+        
+        public MyImmediateAndDeferredSupplier(boolean simulateImmediateUnsupported) {
+            this.failImmediately = simulateImmediateUnsupported;
+        }
+        
+        @Override
+        public Maybe<CallInfo> getImmediately() {
+            if (failImmediately) {
+                throw new ImmediateSupplier.ImmediateUnsupportedException("Simulate immediate unsupported");
+            } else {
+                return Maybe.of(CallInfo.newInstance());
+            }
+        }
+        @Override
+        public CallInfo get() {
+            return CallInfo.newInstance();
+        }
+    }
+    
+    private static class CallInfo {
+        final StackTraceElement[] stackTrace;
+        final Task<?> task;
+
+        public static CallInfo newInstance() {
+            Exception e = new Exception("for stacktrace");
+            e.fillInStackTrace();
+            return new CallInfo(e.getStackTrace(), (Task<?>) Tasks.current());
+        }
+        
+        CallInfo(StackTraceElement[] stackTrace, Task<?> task) {
+            this.stackTrace = stackTrace;
+            this.task = task;
+        }
+    }
+    
     public static final Task<String> newSleepTask(final Duration timeout, final String result) {
         return Tasks.<String>builder().body(new Callable<String>() { 
             public String call() { 
@@ -59,24 +186,7 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
         ).build();
     }
     
-    public void testTimeoutZero() {
-        Maybe<String> result = Tasks.resolving(newSleepTask(Duration.TEN_SECONDS, "foo")).as(String.class).context(app).timeout(Duration.ZERO).getMaybe();
-        Assert.assertFalse(result.isPresent());
-    }
-    
-    public void testTimeoutBig() {
-        Maybe<String> result = Tasks.resolving(newSleepTask(Duration.ZERO, "foo")).as(String.class).context(app).timeout(Duration.TEN_SECONDS).getMaybe();
-        Assert.assertEquals(result.get(), "foo");
-    }
-
-    public void testNoExecutionContextOnCompleted() {
-        Task<String> t = newSleepTask(Duration.ZERO, "foo");
-        app.getExecutionContext().submit(t).getUnchecked();
-        Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
-        Assert.assertEquals(result.get(), "foo");
-    }
-
-    public static Throwable assertThrowsOnMaybe(ValueResolver<?> result) {
+    public static Exception assertThrowsOnGetMaybe(ValueResolver<?> result) {
         try {
             result = result.clone();
             result.getMaybe();
@@ -84,7 +194,8 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
             return null;
         } catch (Exception e) { return e; }
     }
-    public static Throwable assertThrowsOnGet(ValueResolver<?> result) {
+    
+    public static Exception assertThrowsOnGet(ValueResolver<?> result) {
         result = result.clone();
         try {
             result.get();
@@ -92,6 +203,7 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
             return null;
         } catch (Exception e) { return e; }
     }
+    
     public static <T> Maybe<T> assertMaybeIsAbsent(ValueResolver<T> result) {
         result = result.clone();
         Maybe<T> maybe = result.getMaybe();
@@ -99,29 +211,20 @@ public class ValueResolverTest extends BrooklynAppUnitTestSupport {
         return maybe;
     }
     
-    public void testSwallowError() {
-        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app).swallowExceptions();
-        assertMaybeIsAbsent(result);
-        assertThrowsOnGet(result);
-    }
-
-
-    public void testDontSwallowError() {
-        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app);
-        assertThrowsOnMaybe(result);
-        assertThrowsOnGet(result);
-    }
-
-    public void testDefaultWhenSwallowError() {
-        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(app).swallowExceptions().defaultValue("foo");
-        assertMaybeIsAbsent(result);
-        Assert.assertEquals(result.get(), "foo");
+    private void assertContainsCallingMethod(StackTraceElement[] stackTrace, String expectedMethod) {
+        for (StackTraceElement element : stackTrace) {
+            if (expectedMethod.equals(element.getMethodName())) {
+                return;
+            }
+        }
+        fail("Method "+expectedMethod+" not found: "+Arrays.toString(stackTrace));
     }
-
-    public void testDefaultBeforeDelayAndError() {
-        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.TEN_SECONDS)).as(String.class).context(app).timeout(Duration.ZERO).defaultValue("foo");
-        assertMaybeIsAbsent(result);
-        Assert.assertEquals(result.get(), "foo");
+    
+    private void assertNotContainsCallingMethod(StackTraceElement[] stackTrace, String notExpectedMethod) {
+        for (StackTraceElement element : stackTrace) {
+            if (notExpectedMethod.equals(element.getMethodName())) {
+                fail("Method "+notExpectedMethod+" not expected: "+Arrays.toString(stackTrace));
+            }
+        }
     }
-
 }


[6/6] brooklyn-server git commit: Closes #390

Posted by sv...@apache.org.
Closes #390

BROOKLYN-356: fix race in transformer by adding getImmediately


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4fab5b1a
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4fab5b1a
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4fab5b1a

Branch: refs/heads/master
Commit: 4fab5b1a800ca2a1f5689ec04c5731808da2f200
Parents: c4cc0d1 0ded2cb
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Wed Nov 2 12:06:05 2016 +0200
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Wed Nov 2 12:06:05 2016 +0200

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/ManagementContext.java    |   6 +-
 .../spi/dsl/BrooklynDslDeferredSupplier.java    |   4 +-
 .../spi/dsl/methods/BrooklynDslCommon.java      |  96 +++++-
 .../brooklyn/spi/dsl/methods/DslComponent.java  | 125 ++++++-
 .../brooklyn/camp/brooklyn/dsl/DslTest.java     | 345 ++++++++++++++++---
 .../TransformerEnricherWithDslTest.java         |   9 +-
 .../internal/AbstractManagementContext.java     |   6 +-
 .../mgmt/internal/BasicSubscriptionContext.java |   3 +-
 .../mgmt/internal/LocalSubscriptionManager.java |  35 +-
 .../core/mgmt/internal/Subscription.java        |   1 +
 .../AbstractConfigurationSupportInternal.java   |   2 +-
 .../core/sensor/DependentConfiguration.java     |  87 ++++-
 .../brooklyn/enricher/stock/Propagator.java     |   6 +-
 .../brooklyn/enricher/stock/Transformer.java    |   3 +-
 .../enricher/stock/reducer/Reducer.java         |   6 +-
 .../util/core/task/ImmediateSupplier.java       |  50 +++
 .../brooklyn/util/core/task/ValueResolver.java  |  34 +-
 .../core/entity/EntitySubscriptionTest.java     |  85 ++++-
 .../entity/RecordingSensorEventListener.java    |  16 +-
 .../brooklyn/util/core/task/TasksTest.java      |   2 +-
 .../util/core/task/ValueResolverTest.java       | 231 ++++++++++---
 .../resources/AbstractBrooklynRestResource.java |   7 +-
 .../rest/resources/EntityConfigResource.java    |   2 +-
 .../brooklyn/rest/resources/SensorResource.java |   2 +-
 .../rest/transform/EffectorTransformer.java     |   7 +-
 25 files changed, 993 insertions(+), 177 deletions(-)
----------------------------------------------------------------------