You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sv...@apache.org on 2016/11/02 10:06:23 UTC
[2/6] brooklyn-server git commit: Subscription callbacks: task has
contextEntity tag
Subscription callbacks: task has contextEntity tag
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/3f37e657
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/3f37e657
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/3f37e657
Branch: refs/heads/master
Commit: 3f37e657a3efd0b1b025008ab1070713b9b8badc
Parents: c4cc0d1
Author: Aled Sage <al...@gmail.com>
Authored: Thu Oct 13 22:46:57 2016 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 1 11:06:02 2016 +0000
----------------------------------------------------------------------
.../internal/AbstractManagementContext.java | 6 +-
.../mgmt/internal/BasicSubscriptionContext.java | 3 +-
.../mgmt/internal/LocalSubscriptionManager.java | 35 +++++----
.../core/mgmt/internal/Subscription.java | 1 +
.../core/entity/EntitySubscriptionTest.java | 76 ++++++++++++++++----
.../entity/RecordingSensorEventListener.java | 16 ++++-
6 files changed, 109 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index ea84f3c..b57e6eb 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -66,6 +66,7 @@ import org.apache.brooklyn.core.internal.storage.impl.BrooklynStorageImpl;
import org.apache.brooklyn.core.internal.storage.impl.inmemory.InMemoryDataGridFactory;
import org.apache.brooklyn.core.location.BasicLocationRegistry;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
import org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContextSequential;
import org.apache.brooklyn.core.mgmt.classloading.JavaBrooklynClassLoadingContext;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
@@ -87,6 +88,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public abstract class AbstractManagementContext implements ManagementContextInternal {
@@ -277,7 +280,8 @@ public abstract class AbstractManagementContext implements ManagementContextInte
@Override
public SubscriptionContext getSubscriptionContext(Entity e) {
// BSC is a thin wrapper around SM so fine to create a new one here
- return new BasicSubscriptionContext(getSubscriptionManager(), e);
+ Map<String, ?> flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e)));
+ return new BasicSubscriptionContext(flags, getSubscriptionManager(), e);
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
index 57d4712..2c6ab65 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import groovy.lang.Closure;
@@ -55,7 +56,7 @@ public class BasicSubscriptionContext implements SubscriptionContext {
private final Map<String,Object> flags;
public BasicSubscriptionContext(SubscriptionManager manager, Object subscriber) {
- this(Collections.<String,Object>emptyMap(), manager, subscriber);
+ this(ImmutableMap.<String, Object>of(), manager, subscriber);
}
public BasicSubscriptionContext(Map<String, ?> flags, SubscriptionManager manager, Object subscriber) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index f9606f8..fd15cf3 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -21,12 +21,12 @@ package org.apache.brooklyn.core.mgmt.internal;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.join;
-import static org.apache.brooklyn.util.JavaGroovyEquivalents.mapOf;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +43,7 @@ import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.task.BasicExecutionManager;
import org.apache.brooklyn.util.core.task.SingleThreadedScheduler;
@@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Predicate;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimaps;
/**
@@ -97,6 +99,12 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
Entity producer = s.producer;
Sensor<T> sensor= s.sensor;
s.subscriber = getSubscriber(flags, s);
+ if (flags.containsKey("tags") || flags.containsKey("tag")) {
+ Iterable<?> tags = (Iterable<?>) flags.get("tags");
+ Object tag = flags.get("tag");
+ s.subscriberExtraExecTags = (tag == null) ? tags : (tags == null ? ImmutableList.of(tag) : MutableList.builder().addAll(tags).add(tag).build());
+ }
+
if (flags.containsKey("subscriberExecutionManagerTag")) {
s.subscriberExecutionManagerTag = flags.remove("subscriberExecutionManagerTag");
s.subscriberExecutionManagerTagSupplied = true;
@@ -130,8 +138,13 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
} else {
if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
- Map<String, Object> tagsMap = MutableMap.of("tag", s.subscriberExecutionManagerTag);
- em.submit(tagsMap, new Runnable() {
+ List<Object> tags = MutableList.builder()
+ .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+ .add(s.subscriberExecutionManagerTag)
+ .build()
+ .asUnmodifiable();
+ Map<String, ?> execFlags = MutableMap.of("tags", tags);
+ em.submit(execFlags, new Runnable() {
@Override
public String toString() {
return "LSM.publishInitialValue("+s.producer+", "+s.sensor+")";
@@ -222,16 +235,14 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
continue;
final Subscription sAtClosureCreation = s;
-// Set<Object> tags = MutableSet.of();
-// if (s.subscriberExecutionManagerTag!=null) tags.add(s.subscriberExecutionManagerTag);
-// if (event.getSource()!=null) tags.add(BrooklynTaskTags.tagForContextEntity(event.getSource()));
-// Map<String, Object> tagsMap = mapOf("tags", (Object)tags);
- // use code above, instead of line below, if we want subscription deliveries associated with the entity;
- // that will cause them to be cancelled when the entity is unmanaged
- // (not sure that is useful, and likely NOT worth the expense, but it might be...) -Alex Oct 2014
- Map<String, Object> tagsMap = mapOf("tag", s.subscriberExecutionManagerTag);
+ List<Object> tags = MutableList.builder()
+ .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+ .add(s.subscriberExecutionManagerTag)
+ .build()
+ .asUnmodifiable();
+ Map<String, ?> execFlags = MutableMap.of("tags", tags);
- em.submit(tagsMap, new Runnable() {
+ em.submit(execFlags, new Runnable() {
@Override
public String toString() {
return "LSM.publish("+event+")";
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
index 66706a1..5e71701 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java
@@ -37,6 +37,7 @@ class Subscription<T> implements SubscriptionHandle {
public Object subscriberExecutionManagerTag;
/** whether the tag was supplied by user, in which case we should not clear execution semantics */
public boolean subscriberExecutionManagerTagSupplied;
+ public Iterable<?> subscriberExtraExecTags;
public final Entity producer;
public final Sensor<T> sensor;
public final SensorEventListener<? super T> listener;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index 3d61a99..a16576c 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -20,16 +20,20 @@ package org.apache.brooklyn.core.entity;
import static org.testng.Assert.assertEquals;
-import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.EnricherSpec;
import org.apache.brooklyn.core.location.SimulatedLocation;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.sensor.BasicSensorEvent;
-import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.core.test.policy.TestEnricher;
+import org.apache.brooklyn.core.test.policy.TestPolicy;
import org.apache.brooklyn.entity.group.BasicGroup;
import org.apache.brooklyn.test.Asserts;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -37,14 +41,13 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-public class EntitySubscriptionTest {
+public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
// TODO Duplication between this and PolicySubscriptionTest
private static final long SHORT_WAIT_MS = 100;
private SimulatedLocation loc;
- private TestApplication app;
private TestEntity entity;
private TestEntity observedEntity;
private BasicGroup observedGroup;
@@ -52,10 +55,11 @@ public class EntitySubscriptionTest {
private TestEntity observedMemberEntity;
private TestEntity otherEntity;
private RecordingSensorEventListener<Object> listener;
-
+
@BeforeMethod(alwaysRun=true)
- public void setUp() {
- app = TestApplication.Factory.newManagedInstanceForTests();
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
loc = app.newSimulatedLocation();
entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
observedEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
@@ -72,11 +76,6 @@ public class EntitySubscriptionTest {
app.start(ImmutableList.of(loc));
}
- @AfterMethod(alwaysRun=true)
- public void tearDown() {
- if (app != null) Entities.destroyAll(app.getManagementContext());
- }
-
@Test
public void testSubscriptionReceivesEvents() {
entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
@@ -280,4 +279,55 @@ public class EntitySubscriptionTest {
entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, TestEntity.NAME, listener);
entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, null, listener);
}
+
+ @Test
+ public void testContextEntityOnSubscriptionCallbackTask() {
+ observedEntity.sensors().set(TestEntity.NAME, "myval");
+ entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener);
+
+ // notify-of-initial-value should give us our entity
+ assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+ listener.clearEvents();
+
+ // as should subsequent events
+ observedEntity.sensors().set(TestEntity.NAME, "myval2");
+ assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+ listener.clearEvents();
+
+ // same for subscribing to children: context should be the subscriber
+ entity.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener);
+ observedChildEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+ listener.clearEvents();
+ }
+
+ @Test
+ public void testContextEntityOnPolicySubscriptionCallbackTask() {
+ TestPolicy policy = entity.policies().add(PolicySpec.create(TestPolicy.class));
+ policy.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+
+ observedEntity.sensors().set(TestEntity.NAME, "myval");
+ assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+ }
+
+ @Test
+ public void testContextEntityOnEnricherSubscriptionCallbackTask() {
+ TestEnricher enricher = entity.enrichers().add(EnricherSpec.create(TestEnricher.class));
+ enricher.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+
+ observedEntity.sensors().set(TestEntity.NAME, "myval");
+ assertListenerCalledOnceWithContextEntityEventually(listener, entity);
+ }
+
+ protected void assertListenerCalledEventually(final RecordingSensorEventListener<?> listener, final int expectedEventCount) {
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents().size(), expectedEventCount);
+ }});
+ }
+
+ protected void assertListenerCalledOnceWithContextEntityEventually(final RecordingSensorEventListener<?> listener, final Entity expectedContext) {
+ assertListenerCalledEventually(listener, 1);
+ assertEquals(BrooklynTaskTags.getContextEntity(Iterables.getOnlyElement(listener.getTasks())), entity);
+ }
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/3f37e657/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
index 44920ed..f5384b2 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java
@@ -25,8 +25,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.core.task.Tasks;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
@@ -41,6 +44,8 @@ import com.google.common.primitives.Longs;
public class RecordingSensorEventListener<T> implements SensorEventListener<T>, Iterable<SensorEvent<T>> {
private final List<SensorEvent<T>> events = Lists.newCopyOnWriteArrayList();
+ private final List<Task<?>> tasks = Lists.newCopyOnWriteArrayList();
+
private final boolean suppressDuplicates;
private T lastValue;
@@ -56,6 +61,7 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
public void onEvent(SensorEvent<T> event) {
if (!suppressDuplicates || events.isEmpty() || !Objects.equals(lastValue, event.getValue())) {
events.add(event);
+ tasks.add(Tasks.current());
lastValue = event.getValue();
}
}
@@ -68,6 +74,13 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
}
/**
+ * The {@link {@link Tasks#current()} for each call to {@link #onEvent(SensorEvent)}
+ */
+ public List<Task<?>> getTasks() {
+ return MutableList.copyOf(tasks).asUnmodifiable();
+ }
+
+ /**
* @return A live read-only view of recorded events.
*/
public Iterable<T> getEventValues() {
@@ -89,7 +102,8 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>,
* Clears all events recorded by the listener.
*/
public void clearEvents() {
- this.events.clear();
+ events.clear();
+ tasks.clear();
lastValue = null;
}