You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/09/21 18:05:43 UTC
[6/7] incubator-brooklyn git commit: subscriptions: support
notifyOfInitialValue
subscriptions: support notifyOfInitialValue
If pass in notifyOfInitialValue=true when subscribing to a single
entity:attribute, then the listener will be called with the current
value (rather than waiting for the first change).
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7ee7d410
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7ee7d410
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7ee7d410
Branch: refs/heads/master
Commit: 7ee7d41026d180aeb4cf9206f86b7b0a7eb263d4
Parents: 0d9b3a8
Author: Aled Sage <al...@gmail.com>
Authored: Fri Sep 18 19:55:29 2015 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Sep 21 14:33:20 2015 +0100
----------------------------------------------------------------------
.../apache/brooklyn/api/entity/EntityLocal.java | 13 +++++-
.../brooklyn/api/mgmt/SubscriptionContext.java | 2 +-
.../brooklyn/core/entity/AbstractEntity.java | 7 ++++
.../mgmt/internal/BasicSubscriptionContext.java | 4 +-
.../mgmt/internal/LocalSubscriptionManager.java | 41 ++++++++++++++++++-
.../core/mgmt/internal/SubscriptionTracker.java | 10 ++++-
.../core/objs/AbstractEntityAdjunct.java | 8 ++++
.../brooklyn/enricher/stock/UpdatingMap.java | 4 +-
.../core/entity/EntitySubscriptionTest.java | 43 ++++++++++++++++++++
.../policy/basic/PolicySubscriptionTest.java | 30 ++++++++++++++
10 files changed, 153 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java b/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java
index 5533949..7e5e963 100644
--- a/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java
+++ b/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java
@@ -115,7 +115,18 @@ public interface EntityLocal extends Entity {
// FIXME remove from interface?
@Beta
<T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
-
+
+ /**
+ * Allow us to subscribe to data from a {@link Sensor} on another entity.
+ *
+ * @return a subscription id which can be used to unsubscribe
+ *
+ * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)
+ */
+ // FIXME remove from interface?
+ @Beta
+ <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
+
/** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */
// FIXME remove from interface?
@Beta
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java
index 7b4e6e7..3328b1a 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java
@@ -34,7 +34,7 @@ public interface SubscriptionContext {
/**
* As {@link SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)} with default subscription parameters for this context
*/
- <T> SubscriptionHandle subscribe(Map<String, Object> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
+ <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
/** @see #subscribe(Map, Entity, Sensor, SensorEventListener) */
<T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index 5dc110d..03254dd 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -1337,6 +1337,13 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
return getSubscriptionTracker().subscribe(producer, sensor, listener);
}
+ /** @see EntityLocal#subscribe */
+ @Override
+ @Beta
+ public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return getSubscriptionTracker().subscribe(flags, producer, sensor, listener);
+ }
+
/** @see EntityLocal#subscribeToChildren */
@Override
public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
index 5c38b81..d821c4e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
@@ -70,7 +70,7 @@ public class BasicSubscriptionContext implements SubscriptionContext {
}
@SuppressWarnings("rawtypes")
- public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, Closure c) {
+ public <T> SubscriptionHandle subscribe(Map<String, ?> newFlags, Entity producer, Sensor<T> sensor, Closure c) {
return subscribe(newFlags, producer, sensor, toSensorEventListener(c));
}
@@ -80,7 +80,7 @@ public class BasicSubscriptionContext implements SubscriptionContext {
}
@Override
- public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ public <T> SubscriptionHandle subscribe(Map<String, ?> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
if (newFlags != null) subscriptionFlags.putAll(newFlags);
return manager.subscribe(subscriptionFlags, producer, sensor, listener);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 6ea94a1..7743995 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -37,10 +37,13 @@ import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.ExecutionManager;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.mgmt.SubscriptionManager;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.task.BasicExecutionManager;
import org.apache.brooklyn.util.core.task.SingleThreadedScheduler;
import org.apache.brooklyn.util.text.Identifiers;
@@ -90,7 +93,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
}
@SuppressWarnings("unchecked")
- protected synchronized <T> SubscriptionHandle subscribe(Map<String, Object> flags, Subscription<T> s) {
+ protected synchronized <T> SubscriptionHandle subscribe(Map<String, Object> flags, final Subscription<T> s) {
Entity producer = s.producer;
Sensor<T> sensor= s.sensor;
s.subscriber = getSubscriber(flags, s);
@@ -105,6 +108,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
s.subscriberExecutionManagerTagSupplied = false;
}
s.eventFilter = (Predicate<SensorEvent<T>>) flags.remove("eventFilter");
+ boolean notifyOfInitialValue = Boolean.TRUE.equals(flags.remove("notifyOfInitialValue"));
s.flags = flags;
if (LOG.isDebugEnabled()) LOG.debug("Creating subscription {} for {} on {} {} in {}", new Object[] {s.id, s.subscriber, producer, sensor, this});
@@ -116,6 +120,41 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) {
((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class);
}
+
+ if (notifyOfInitialValue) {
+ if (producer == null) {
+ LOG.warn("Cannot notifyOfInitialValue for subscription with wildcard producer: "+s);
+ } else if (sensor == null) {
+ LOG.warn("Cannot notifyOfInitialValue for subscription with wilcard sensor: "+s);
+ } else if (!(sensor instanceof AttributeSensor)) {
+ LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
+ Map<String, Object> tagsMap = MutableMap.of("tag", s.subscriberExecutionManagerTag);
+ em.submit(tagsMap, new Runnable() {
+ @Override
+ public String toString() {
+ return "LSM.publishInitialValue("+s.producer+", "+s.sensor+")";
+ }
+ public void run() {
+ Object val = s.producer.getAttribute((AttributeSensor<?>) s.sensor);
+ @SuppressWarnings("rawtypes") // TODO s.listener.onEvent gives compilation error if try to use <T>
+ SensorEvent event = new BasicSensorEvent(s.sensor, s.producer, val);
+ if (s.eventFilter!=null && !s.eventFilter.apply(event))
+ return;
+ try {
+ s.listener.onEvent(event);
+ } catch (Throwable t) {
+ if (event!=null && event.getSource()!=null && Entities.isNoLongerManaged(event.getSource())) {
+ LOG.debug("Error processing initial-value subscription to "+LocalSubscriptionManager.this+", after entity unmanaged: "+t, t);
+ } else {
+ LOG.warn("Error processing initial-value subscription to "+LocalSubscriptionManager.this+": "+t, t);
+ }
+ }
+ }});
+ }
+ }
+
return s;
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java
index 2faad3a..3d5793c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java
@@ -19,6 +19,7 @@
package org.apache.brooklyn.core.mgmt.internal;
import java.util.Collection;
+import java.util.Map;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.Group;
@@ -29,6 +30,7 @@ import org.apache.brooklyn.api.sensor.SensorEventListener;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.SetMultimap;
/**
@@ -57,13 +59,17 @@ public class SubscriptionTracker {
/** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */
public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
- SubscriptionHandle handle = context.subscribe(producer, sensor, listener);
+ return subscribe(ImmutableMap.<String, Object>of(), producer, sensor, listener);
+ }
+
+ public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ SubscriptionHandle handle = context.subscribe(flags, producer, sensor, listener);
synchronized (subscriptions) {
subscriptions.put(producer, handle);
}
return handle;
}
-
+
/** @see SubscriptionContext#subscribeToChildren(Entity, Sensor, SensorEventListener) */
public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
SubscriptionHandle handle = context.subscribeToChildren(parent, sensor, listener);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index e85cc73..fb71901 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -392,6 +392,14 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
}
@VisibleForTesting //intended as protected, meant for subclasses
+ @Beta
+ /** @see SubscriptionContext#subscribe(Map, Entity, Sensor, SensorEventListener) */
+ public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ if (!checkCanSubscribe()) return null;
+ return getSubscriptionTracker().subscribe(flags, producer, sensor, listener);
+ }
+
+ @VisibleForTesting //intended as protected, meant for subclasses
/** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */
public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) {
if (!checkCanSubscribe(producerGroup)) return null;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
index b09b6d6..43aec92 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.reflect.TypeToken;
@@ -102,8 +103,7 @@ public class UpdatingMap<S,TKey,TVal> extends AbstractEnricher implements Sensor
this.computing = (Function) getRequiredConfig(COMPUTING);
this.removingIfResultIsNull = getConfig(REMOVING_IF_RESULT_IS_NULL);
- subscribe(entity, sourceSensor, this);
- onUpdated();
+ subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, sourceSensor, this);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index 8b8d244..620d8e0 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -34,12 +34,15 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
public class EntitySubscriptionTest {
// TODO Duplication between this and PolicySubscriptionTest
+ private static final long SHORT_WAIT_MS = 100;
+
private SimulatedLocation loc;
private TestApplication app;
private TestEntity entity;
@@ -237,4 +240,44 @@ public class EntitySubscriptionTest {
}
}});
}
+
+ @Test
+ public void testSubscriptionReceivesInitialValueEvents() {
+ observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ observedEntity.sensors().set(TestEntity.NAME, "myname");
+
+ entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.SEQUENCE, listener);
+ entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123),
+ new BasicSensorEvent<String>(TestEntity.NAME, observedEntity, "myname")));
+ }});
+ }
+
+
+ @Test
+ public void testSubscriptionNotReceivesInitialValueEventsByDefault() {
+ observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ observedEntity.sensors().set(TestEntity.NAME, "myname");
+
+ entity.subscribe(observedEntity, TestEntity.SEQUENCE, listener);
+ entity.subscribe(observedEntity, TestEntity.NAME, listener);
+
+ Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of());
+ }});
+ }
+
+ // TODO A visual inspection test that we get a log.warn telling us we can't get the initial-value
+ @Test
+ public void testSubscriptionForInitialValueWhenNotValid() {
+ entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.MY_NOTIF, listener);
+ entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, null, listener);
+ entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, TestEntity.NAME, listener);
+ entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, null, listener);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
index fa7333f..ab5bc4a 100644
--- a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
@@ -33,6 +33,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
@@ -119,5 +120,34 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456)));
}});
}
+
+ @Test
+ public void testSubscriptionReceivesInitialValueEvents() {
+ entity.sensors().set(TestEntity.SEQUENCE, 123);
+ entity.sensors().set(TestEntity.NAME, "myname");
+
+ policy.subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener);
+ policy.subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123),
+ new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")));
+ }});
+ }
+ @Test
+ public void testSubscriptionNotReceivesInitialValueEventsByDefault() {
+ entity.sensors().set(TestEntity.SEQUENCE, 123);
+ entity.sensors().set(TestEntity.NAME, "myname");
+
+ policy.subscribe(entity, TestEntity.SEQUENCE, listener);
+ policy.subscribe(entity, TestEntity.NAME, listener);
+
+ Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of());
+ }});
+ }
}