You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sv...@apache.org on 2016/11/02 10:06:23 UTC

[2/6] brooklyn-server git commit: Subscription callbacks: task has contextEntity tag

Subscription callbacks: task has contextEntity tag

Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/3f37e657
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/3f37e657
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/3f37e657

Branch: refs/heads/master
Commit: 3f37e657a3efd0b1b025008ab1070713b9b8badc
Parents: c4cc0d1
Author: Aled Sage <al...@gmail.com>
Authored: Thu Oct 13 22:46:57 2016 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 11:06:02 2016 +0000

----------------------------------------------------------------------
 .../internal/AbstractManagementContext.java     |  6 +-
 .../mgmt/internal/BasicSubscriptionContext.java |  3 +-
 .../mgmt/internal/LocalSubscriptionManager.java | 35 +++++----
 .../core/mgmt/internal/Subscription.java        |  1 +
 .../core/entity/EntitySubscriptionTest.java     | 76 ++++++++++++++++----
 .../entity/RecordingSensorEventListener.java    | 16 ++++-
 6 files changed, 109 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index ea84f3c..b57e6eb 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -66,6 +66,7 @@ import org.apache.brooklyn.core.internal.storage.impl.BrooklynStorageImpl;
 import org.apache.brooklyn.core.internal.storage.impl.inmemory.InMemoryDataGridFactory;
 import org.apache.brooklyn.core.location.BasicLocationRegistry;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
 import org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContextSequential;
 import org.apache.brooklyn.core.mgmt.classloading.JavaBrooklynClassLoadingContext;
 import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
@@ -87,6 +88,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 public abstract class AbstractManagementContext implements ManagementContextInternal {
@@ -277,7 +280,8 @@ public abstract class AbstractManagementContext implements ManagementContextInte
     @Override
     public SubscriptionContext getSubscriptionContext(Entity e) {
         // BSC is a thin wrapper around SM so fine to create a new one here
-        return new BasicSubscriptionContext(getSubscriptionManager(), e);
+        Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e)));
+        return new BasicSubscriptionContext(flags, getSubscriptionManager(), e);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
index 57d4712..2c6ab65 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
 import groovy.lang.Closure;
@@ -55,7 +56,7 @@ public class BasicSubscriptionContext implements SubscriptionContext {
     private final Map<String,Object> flags;
 
     public BasicSubscriptionContext(SubscriptionManager manager, Object subscriber) {
-        this(Collections.<String,Object>emptyMap(), manager, subscriber);
+        this(ImmutableMap.<String, Object>of(), manager, subscriber);
     }
     
     public BasicSubscriptionContext(Map<String, ?> flags, SubscriptionManager manager, Object subscriber) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index f9606f8..fd15cf3 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -21,12 +21,12 @@ package org.apache.brooklyn.core.mgmt.internal;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.join;
-import static org.apache.brooklyn.util.JavaGroovyEquivalents.mapOf;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +43,7 @@ import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.task.BasicExecutionManager;
 import org.apache.brooklyn.util.core.task.SingleThreadedScheduler;
@@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Multimaps;
 
 /**
@@ -97,6 +99,12 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
         Entity producer = s.producer;
         Sensor<T> sensor= s.sensor;
         s.subscriber = getSubscriber(flags, s);
+        if (flags.containsKey("tags") || flags.containsKey("tag")) {
+            Iterable<?> tags = (Iterable<?>) flags.get("tags");
+            Object tag = flags.get("tag");
+            s.subscriberExtraExecTags = (tag == null) ? tags : (tags == null ? ImmutableList.of(tag) : MutableList.builder().addAll(tags).add(tag).build());
+        }
+
         if (flags.containsKey("subscriberExecutionManagerTag")) {
             s.subscriberExecutionManagerTag = flags.remove("subscriberExecutionManagerTag");
             s.subscriberExecutionManagerTagSupplied = true;
@@ -130,8 +138,13 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
                 LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
             } else {
                 if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
-                Map<String, Object> tagsMap = MutableMap.of("tag", s.subscriberExecutionManagerTag);
-                em.submit(tagsMap, new Runnable() {
+                List<Object> tags = MutableList.builder()
+                        .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+                        .add(s.subscriberExecutionManagerTag)
+                        .build()
+                        .asUnmodifiable();
+                Map<String, ?> execFlags = MutableMap.of("tags", tags);
+                em.submit(execFlags, new Runnable() {
                     @Override
                     public String toString() {
                         return "LSM.publishInitialValue("+s.producer+", "+s.sensor+")";
@@ -222,16 +235,14 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
                     continue;
                 final Subscription sAtClosureCreation = s;
                 
-//                Set<Object> tags = MutableSet.of();
-//                if (s.subscriberExecutionManagerTag!=null) tags.add(s.subscriberExecutionManagerTag);
-//                if (event.getSource()!=null) tags.add(BrooklynTaskTags.tagForContextEntity(event.getSource()));
-//                Map<String, Object> tagsMap = mapOf("tags", (Object)tags);
-                // use code above, instead of line below, if we want subscription deliveries associated with the entity;
-                // that will cause them to be cancelled when the entity is unmanaged
-                // (not sure that is useful, and likely NOT worth the expense, but it might be...) -Alex Oct 2014
-                Map<String, Object> tagsMap = mapOf("tag", s.subscriberExecutionManagerTag);
+                List<Object> tags = MutableList.builder()
+                        .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+                        .add(s.subscriberExecutionManagerTag)
+                        .build()
+                        .asUnmodifiable();
+                Map<String, ?> execFlags = MutableMap.of("tags", tags);
                 
-                em.submit(tagsMap, new Runnable() {
+                em.submit(execFlags, new Runnable() {
                     @Override
                     public String toString() {
                         return "LSM.publish("+event+")";

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
index 66706a1..5e71701 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
@@ -37,6 +37,7 @@ class Subscription<T> implements SubscriptionHandle {
     public Object subscriberExecutionManagerTag;
     /** whether the tag was supplied by user, in which case we should not clear execution semantics */
     public boolean subscriberExecutionManagerTagSupplied;
+    public Iterable<?> subscriberExtraExecTags;
     public final Entity producer;
     public final Sensor<T> sensor;
     public final SensorEventListener<? super T> listener;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index 3d61a99..a16576c 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -20,16 +20,20 @@ package org.apache.brooklyn.core.entity;
 
 import static org.testng.Assert.assertEquals;
 
-import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.EnricherSpec;
 import org.apache.brooklyn.core.location.SimulatedLocation;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.sensor.BasicSensorEvent;
-import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.core.test.policy.TestEnricher;
+import org.apache.brooklyn.core.test.policy.TestPolicy;
 import org.apache.brooklyn.entity.group.BasicGroup;
 import org.apache.brooklyn.test.Asserts;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -37,14 +41,13 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
-public class EntitySubscriptionTest {
+public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
 
     // TODO Duplication between this and PolicySubscriptionTest
     
     private static final long SHORT_WAIT_MS = 100;
 
     private SimulatedLocation loc;
-    private TestApplication app;
     private TestEntity entity;
     private TestEntity observedEntity;
     private BasicGroup observedGroup;
@@ -52,10 +55,11 @@ public class EntitySubscriptionTest {
     private TestEntity observedMemberEntity;
     private TestEntity otherEntity;
     private RecordingSensorEventListener<Object> listener;
-    
+
     @BeforeMethod(alwaysRun=true)
-    public void setUp() {
-        app = TestApplication.Factory.newManagedInstanceForTests();
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
         loc = app.newSimulatedLocation();
         entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
         observedEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
@@ -72,11 +76,6 @@ public class EntitySubscriptionTest {
         app.start(ImmutableList.of(loc));
     }
     
-    @AfterMethod(alwaysRun=true)
-    public void tearDown() {
-        if (app != null) Entities.destroyAll(app.getManagementContext());
-    }
-    
     @Test
     public void testSubscriptionReceivesEvents() {
         entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
@@ -280,4 +279,55 @@ public class EntitySubscriptionTest {
         entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, TestEntity.NAME, listener);
         entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, null, listener);
     }
+    
+    @Test
+    public void testContextEntityOnSubscriptionCallbackTask() {
+        observedEntity.sensors().set(TestEntity.NAME, "myval");
+        entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener);
+        
+        // notify-of-initial-value should give us our entity
+        assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+        listener.clearEvents();
+        
+        // as should subsequent events
+        observedEntity.sensors().set(TestEntity.NAME, "myval2");
+        assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+        listener.clearEvents();
+
+        // same for subscribing to children: context should be the subscriber
+        entity.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener);
+        observedChildEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+        listener.clearEvents();
+    }
+    
+    @Test
+    public void testContextEntityOnPolicySubscriptionCallbackTask() {
+        TestPolicy policy = entity.policies().add(PolicySpec.create(TestPolicy.class));
+        policy.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+
+        observedEntity.sensors().set(TestEntity.NAME, "myval");
+        assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+    }
+    
+    @Test
+    public void testContextEntityOnEnricherSubscriptionCallbackTask() {
+        TestEnricher enricher = entity.enrichers().add(EnricherSpec.create(TestEnricher.class));
+        enricher.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+
+        observedEntity.sensors().set(TestEntity.NAME, "myval");
+        assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+    }
+    
+    protected void assertListenerCalledEventually(final RecordingSensorEventListener<?> listener, final int expectedEventCount) {
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents().size(), expectedEventCount);
+            }});
+    }
+    
+    protected void assertListenerCalledOnceWithContextEntityEventually(final RecordingSensorEventListener<?> listener, final Entity expectedContext) {
+        assertListenerCalledEventually(listener, 1);
+        assertEquals(BrooklynTaskTags.getContextEntity(Iterables.getOnlyElement(listener.getTasks())), entity);
+    }
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
index 44920ed..f5384b2 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
@@ -25,8 +25,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 
+import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.core.task.Tasks;
 
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
@@ -41,6 +44,8 @@ import com.google.common.primitives.Longs;
 public class RecordingSensorEventListener<T> implements SensorEventListener<T>, Iterable<SensorEvent<T>> {
 
     private final List<SensorEvent<T>> events = Lists.newCopyOnWriteArrayList();
+    private final List<Task<?>> tasks = Lists.newCopyOnWriteArrayList();
+
     private final boolean suppressDuplicates;
     private T lastValue;
 
@@ -56,6 +61,7 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
     public void onEvent(SensorEvent<T> event) {
         if (!suppressDuplicates || events.isEmpty() || !Objects.equals(lastValue, event.getValue())) {
             events.add(event);
+            tasks.add(Tasks.current());
             lastValue = event.getValue();
         }
     }
@@ -68,6 +74,13 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
     }
 
     /**
+     * The {@link {@link Tasks#current()} for each call to {@link #onEvent(SensorEvent)}
+     */
+    public List<Task<?>> getTasks() {
+        return MutableList.copyOf(tasks).asUnmodifiable();
+    }
+
+    /**
      * @return A live read-only view of recorded events.
      */
     public Iterable<T> getEventValues() {
@@ -89,7 +102,8 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
      * Clears all events recorded by the listener.
      */
     public void clearEvents() {
-        this.events.clear();
+        events.clear();
+        tasks.clear();
         lastValue = null;
     }