You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/09/23 12:51:58 UTC
[06/10] incubator-brooklyn git commit: Move subscriptions() from
Entity to BrooklynObject
Move subscriptions() from Entity to BrooklynObject
- Add support for subscriptions to Location (and test)
- For CatalogItem.subscriptions(), throw UnsupportedOperationException
- Deprecate methods on AbstractEntityAdjunct, in preference for
AbstractEntityAdjunct.subscriptions().*
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/4ad6cc96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/4ad6cc96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/4ad6cc96
Branch: refs/heads/master
Commit: 4ad6cc967730001715ac3a97661106b5ed1b6c67
Parents: 2b29795
Author: Aled Sage <al...@gmail.com>
Authored: Mon Sep 21 10:05:11 2015 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Wed Sep 23 10:33:50 2015 +0100
----------------------------------------------------------------------
.../brooklyn/api/catalog/CatalogItem.java | 12 +
.../org/apache/brooklyn/api/entity/Entity.java | 51 ----
.../brooklyn/api/mgmt/ManagementContext.java | 9 +
.../brooklyn/api/objs/BrooklynObject.java | 63 ++++-
.../core/catalog/internal/CatalogItemDo.java | 11 +-
.../internal/CatalogItemDtoAbstract.java | 14 +-
.../brooklyn/core/entity/EntityInternal.java | 8 -
.../core/location/AbstractLocation.java | 79 +++++-
.../access/PortForwardManagerClient.java | 5 +
.../internal/AbstractManagementContext.java | 11 +-
.../mgmt/internal/BasicSubscriptionContext.java | 3 +-
.../NonDeploymentManagementContext.java | 7 +
.../core/objs/AbstractEntityAdjunct.java | 164 ++++++++++---
.../core/objs/BrooklynObjectInternal.java | 9 +
.../core/location/LocationSubscriptionTest.java | 241 +++++++++++++++++++
15 files changed, 580 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java b/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java
index 3758d08..bf806aa 100644
--- a/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java
+++ b/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java
@@ -48,6 +48,18 @@ public interface CatalogItem<T,SpecT> extends BrooklynObject, Rebindable {
public boolean isNamed();
}
+ /**
+ * @throws UnsupportedOperationException; config not supported for catalog items
+ */
+ @Override
+ ConfigurationSupport config();
+
+ /**
+ * @throws UnsupportedOperationException; subscriptions are not supported for catalog items
+ */
+ @Override
+ SubscriptionSupport subscriptions();
+
@Deprecated
public static interface CatalogItemLibraries {
Collection<String> getBundles();
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java b/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java
index 795218c..dd141f0 100644
--- a/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java
+++ b/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java
@@ -301,8 +301,6 @@ public interface Entity extends BrooklynObject {
SensorSupport sensors();
- SubscriptionSupport subscriptions();
-
PolicySupport policies();
EnricherSupport enrichers();
@@ -354,55 +352,6 @@ public interface Entity extends BrooklynObject {
}
@Beta
- public interface SubscriptionSupport {
- /**
- * Allow us to subscribe to data from a {@link Sensor} on another entity.
- *
- * @return a subscription id which can be used to unsubscribe
- *
- * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)
- */
- // FIXME remove from interface?
- @Beta
- <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
-
- /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */
- // FIXME remove from interface?
- @Beta
- <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener);
-
- /** @see SubscriptionManager#subscribeToMembers(Group, Sensor, SensorEventListener) */
- // FIXME remove from interface?
- @Beta
- <T> SubscriptionHandle subscribeToMembers(Group group, Sensor<T> sensor, SensorEventListener<? super T> listener);
-
- /**
- * Unsubscribes from the given producer.
- *
- * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
- */
- @Beta
- boolean unsubscribe(Entity producer);
-
- /**
- * Unsubscribes the given handle.
- *
- * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
- */
- @Beta
- boolean unsubscribe(Entity producer, SubscriptionHandle handle);
-
- /**
- * Unsubscribes the given handle.
- *
- * It is (currently) more efficient to also pass in the producer -
- * see {@link BasicSubscriptionSupport#unsubscribe(Entity, SubscriptionHandle)}
- */
- @Beta
- boolean unsubscribe(SubscriptionHandle handle);
- }
-
- @Beta
public interface PolicySupport {
/**
* @return an immutable thread-safe view of the policies.
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
index ac4924e..f809fb2 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
@@ -28,6 +28,7 @@ import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.drivers.DriverDependentEntity;
import org.apache.brooklyn.api.entity.drivers.EntityDriverManager;
import org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager;
+import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.location.LocationRegistry;
import org.apache.brooklyn.api.mgmt.entitlement.EntitlementManager;
import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager;
@@ -156,6 +157,14 @@ public interface ManagementContext {
*/
SubscriptionContext getSubscriptionContext(Entity entity);
+ /**
+ * Returns a {@link SubscriptionContext} instance representing subscriptions
+ * (from the {@link SubscriptionManager}) associated with this location, and capable
+ * of conveniently subscribing on behalf of that location
+ */
+ @Beta
+ SubscriptionContext getSubscriptionContext(Location location);
+
@Beta // method may move to an internal interface; brooklyn users should not need to call this directly
RebindManager getRebindManager();
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java b/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java
index c932f02..094586f 100644
--- a/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java
+++ b/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java
@@ -18,10 +18,20 @@
*/
package org.apache.brooklyn.api.objs;
+import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.mgmt.SubscriptionContext;
+import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
+import org.apache.brooklyn.api.mgmt.SubscriptionManager;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+
+import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableMap;
/**
@@ -53,7 +63,13 @@ public interface BrooklynObject extends Identifiable, Configurable {
* and they should be amenable to our persistence (on-disk serialization) and our JSON serialization in the REST API.
*/
TagSupport tags();
-
+
+ /**
+ * Subscriptions are the mechanism for receiving notifications of sensor-events (e.g. attribute-changed) from
+ * other entities.
+ */
+ SubscriptionSupport subscriptions();
+
public interface TagSupport {
/**
* @return An immutable copy of the set of tags on this entity.
@@ -70,4 +86,49 @@ public interface BrooklynObject extends Identifiable, Configurable {
boolean removeTag(@Nonnull Object tag);
}
+
+ @Beta
+ public interface SubscriptionSupport {
+ /**
+ * Allow us to subscribe to data from a {@link Sensor} on another entity.
+ *
+ * @return a subscription id which can be used to unsubscribe
+ *
+ * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)
+ */
+ @Beta
+ <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
+
+ /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */
+ @Beta
+ <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener);
+
+ /** @see SubscriptionManager#subscribeToMembers(Group, Sensor, SensorEventListener) */
+ @Beta
+ <T> SubscriptionHandle subscribeToMembers(Group group, Sensor<T> sensor, SensorEventListener<? super T> listener);
+
+ /**
+ * Unsubscribes from the given producer.
+ *
+ * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
+ */
+ @Beta
+ boolean unsubscribe(Entity producer);
+
+ /**
+ * Unsubscribes the given handle.
+ *
+ * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
+ */
+ @Beta
+ boolean unsubscribe(Entity producer, SubscriptionHandle handle);
+
+ /**
+ * Unsubscribes the given handle.
+ *
+ * It is (currently) more efficient to also pass in the producer -
+ * see {@link SubscriptionSupport#unsubscribe(Entity, SubscriptionHandle)}
+ */
+ boolean unsubscribe(SubscriptionHandle handle);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java
index 5029d8d..0545a06 100644
--- a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java
+++ b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
+import org.apache.brooklyn.core.objs.BrooklynObjectInternal.SubscriptionSupportInternal;
import org.apache.brooklyn.api.catalog.CatalogItem;
import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
@@ -49,13 +50,21 @@ public class CatalogItemDo<T,SpecT> implements CatalogItem<T,SpecT>, BrooklynObj
}
/**
- * Config not supported for catalog item. See {@link #getPlanYaml()}.
+ * @throws UnsupportedOperationException; Config not supported for catalog item. See {@link #getPlanYaml()}.
*/
@Override
public ConfigurationSupportInternal config() {
throw new UnsupportedOperationException();
}
+ /**
+ * @throws UnsupportedOperationException; subscriptions are not supported for catalog items
+ */
+ @Override
+ public SubscriptionSupportInternal subscriptions() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public <U> U setConfig(ConfigKey<U> key, U val) {
return config().set(key, val);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java
index b281941..c950b7b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java
+++ b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java
@@ -26,8 +26,6 @@ import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.brooklyn.api.catalog.CatalogItem;
import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
import org.apache.brooklyn.api.mgmt.rebind.mementos.CatalogItemMemento;
@@ -37,6 +35,8 @@ import org.apache.brooklyn.core.objs.AbstractBrooklynObject;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.flags.FlagUtils;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
@@ -66,13 +66,21 @@ public abstract class CatalogItemDtoAbstract<T, SpecT> extends AbstractBrooklynO
private @SetFromFlag boolean disabled;
/**
- * Config not supported for catalog item. See {@link #getPlanYaml()}.
+ * @throws UnsupportedOperationException; Config not supported for catalog item. See {@link #getPlanYaml()}.
*/
@Override
public ConfigurationSupportInternal config() {
throw new UnsupportedOperationException();
}
+ /**
+ * @throws UnsupportedOperationException; subscriptions are not supported for catalog items
+ */
+ @Override
+ public SubscriptionSupportInternal subscriptions() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public <U> U setConfig(ConfigKey<U> key, U val) {
return config().set(key, val);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java b/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java
index 4fa9c67..49dfa00 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java
@@ -184,9 +184,6 @@ public interface EntityInternal extends BrooklynObjectInternal, EntityLocal, Reb
SensorSupportInternal sensors();
@Override
- SubscriptionSupportInternal subscriptions();
-
- @Override
PolicySupportInternal policies();
@Override
@@ -230,11 +227,6 @@ public interface EntityInternal extends BrooklynObjectInternal, EntityLocal, Reb
}
@Beta
- public interface SubscriptionSupportInternal extends Entity.SubscriptionSupport {
- public void unsubscribeAll();
- }
-
- @Beta
public interface PolicySupportInternal extends Entity.PolicySupport {
/**
* Removes all policy from this entity.
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java b/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java
index 507e7f5..b8859d6 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java
@@ -29,12 +29,18 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.mgmt.SubscriptionContext;
+import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
import org.apache.brooklyn.api.mgmt.rebind.mementos.LocationMemento;
import org.apache.brooklyn.api.objs.Configurable;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.config.ConfigInheritance;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.config.ConfigKey.HasConfigKey;
@@ -50,17 +56,19 @@ import org.apache.brooklyn.core.location.internal.LocationDynamicType;
import org.apache.brooklyn.core.location.internal.LocationInternal;
import org.apache.brooklyn.core.mgmt.internal.LocalLocationManager;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
+import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker;
import org.apache.brooklyn.core.mgmt.rebind.BasicLocationRebindSupport;
import org.apache.brooklyn.core.objs.AbstractBrooklynObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.brooklyn.util.collections.SetFromLiveMap;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.stream.Streams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.annotations.Beta;
import com.google.common.base.Objects;
import com.google.common.base.Objects.ToStringHelper;
import com.google.common.base.Splitter;
@@ -110,8 +118,13 @@ public abstract class AbstractLocation extends AbstractBrooklynObject implements
private BasicConfigurationSupport config = new BasicConfigurationSupport();
+ private BasicSubscriptionSupport subscriptions = new BasicSubscriptionSupport();
+
private ConfigBag configBag = new ConfigBag();
+ /** not for direct access; refer to as 'subscriptionTracker' via getter so that it is initialized */
+ protected transient SubscriptionTracker _subscriptionTracker;
+
private volatile boolean managed;
private boolean inConstruction;
@@ -354,7 +367,16 @@ public abstract class AbstractLocation extends AbstractBrooklynObject implements
@Override
public ConfigurationSupportInternal config() {
- return config ;
+ return config;
+ }
+
+ // the concrete type rather than an interface is returned because Groovy subclasses
+ // complain (incorrectly) if we return SubscriptionSupportInternal
+ // TODO revert to SubscriptionSupportInternal when groovy subclasses work without this (eg new groovy version)
+ @Override
+ @Beta
+ public BasicSubscriptionSupport subscriptions() {
+ return subscriptions;
}
private class BasicConfigurationSupport implements ConfigurationSupportInternal {
@@ -478,6 +500,57 @@ public abstract class AbstractLocation extends AbstractBrooklynObject implements
}
}
+ public class BasicSubscriptionSupport implements SubscriptionSupportInternal {
+ @Override
+ public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return getSubscriptionTracker().subscribe(producer, sensor, listener);
+ }
+
+ @Override
+ public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return getSubscriptionTracker().subscribeToMembers(producerGroup, sensor, listener);
+ }
+
+ @Override
+ public <T> SubscriptionHandle subscribeToChildren(Entity producerParent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return getSubscriptionTracker().subscribeToChildren(producerParent, sensor, listener);
+ }
+
+ @Override
+ public boolean unsubscribe(Entity producer) {
+ return getSubscriptionTracker().unsubscribe(producer);
+ }
+
+ @Override
+ public boolean unsubscribe(Entity producer, SubscriptionHandle handle) {
+ return getSubscriptionTracker().unsubscribe(producer, handle);
+ }
+
+ @Override
+ public boolean unsubscribe(SubscriptionHandle handle) {
+ return getSubscriptionTracker().unsubscribe(handle);
+ }
+
+ @Override
+ public void unsubscribeAll() {
+ getSubscriptionTracker().unsubscribeAll();
+ }
+
+ protected SubscriptionTracker getSubscriptionTracker() {
+ synchronized (AbstractLocation.this) {
+ if (_subscriptionTracker!=null) return _subscriptionTracker;
+ _subscriptionTracker = new SubscriptionTracker(newSubscriptionContext());
+ return _subscriptionTracker;
+ }
+ }
+
+ private SubscriptionContext newSubscriptionContext() {
+ synchronized (AbstractLocation.this) {
+ return getManagementContext().getSubscriptionContext(AbstractLocation.this);
+ }
+ }
+ }
+
@Override
public <T> T getConfig(HasConfigKey<T> key) {
return config().get(key);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java b/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java
index 12e15aa..d0199a0 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java
@@ -402,4 +402,9 @@ public class PortForwardManagerClient implements PortForwardManager {
public ConfigurationSupport config() {
return getDelegate().config();
}
+
+ @Override
+ public SubscriptionSupport subscriptions() {
+ return getDelegate().subscriptions();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index 76871cd..343528d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -32,14 +32,13 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.brooklyn.api.catalog.BrooklynCatalog;
import org.apache.brooklyn.api.catalog.CatalogItem;
import org.apache.brooklyn.api.effector.Effector;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.drivers.EntityDriverManager;
import org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager;
+import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.location.LocationRegistry;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.ManagementContext;
@@ -78,6 +77,8 @@ import org.apache.brooklyn.util.core.task.BasicExecutionContext;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
import org.apache.brooklyn.util.guava.Maybe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Objects;
@@ -260,6 +261,12 @@ public abstract class AbstractManagementContext implements ManagementContextInte
}
@Override
+ public SubscriptionContext getSubscriptionContext(Location loc) {
+ // BSC is a thin wrapper around SM so fine to create a new one here
+ return new BasicSubscriptionContext(getSubscriptionManager(), loc);
+ }
+
+ @Override
public EntityDriverManager getEntityDriverManager() {
return entityDriverManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
index d821c4e..57d4712 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
@@ -19,7 +19,6 @@
package org.apache.brooklyn.core.mgmt.internal;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.mapOf;
-import groovy.lang.Closure;
import java.util.Collection;
import java.util.Collections;
@@ -42,6 +41,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
+import groovy.lang.Closure;
+
/**
* A {@link SubscriptionContext} for an entity or other user of a {@link SubscriptionManager}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
index 87e8f84..1f62add 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
@@ -241,6 +241,13 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
}
@Override
+ public synchronized SubscriptionContext getSubscriptionContext(Location loc) {
+ // Should never be called; the NonDeploymentManagementContext is associated with a particular entity, whereas
+ // the #getSubscriptionContext(loc) should only be called in the context of a location.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public ExecutionContext getExecutionContext(Entity entity) {
if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity);
if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED)
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index fb71901..814923a 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -32,17 +32,17 @@ import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.Group;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
-import org.apache.brooklyn.api.mgmt.SubscriptionContext;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.objs.Configurable;
import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.config.ConfigMap;
import org.apache.brooklyn.config.ConfigKey.HasConfigKey;
+import org.apache.brooklyn.config.ConfigMap;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.Entities;
@@ -58,7 +58,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.Beta;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
@@ -82,7 +81,9 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
protected transient ExecutionContext execution;
private final BasicConfigurationSupport config = new BasicConfigurationSupport();
-
+
+ private final BasicSubscriptionSupport subscriptions = new BasicSubscriptionSupport();
+
/**
* The config values of this entity. Updating this map should be done
* via {@link #config()}.
@@ -199,6 +200,89 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
return config;
}
+ @Override
+ public BasicSubscriptionSupport subscriptions() {
+ return subscriptions;
+ }
+
+ public class BasicSubscriptionSupport implements SubscriptionSupportInternal {
+ @Override
+ public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ if (!checkCanSubscribe()) return null;
+ return getSubscriptionTracker().subscribe(producer, sensor, listener);
+ }
+
+ @Override
+ public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ if (!checkCanSubscribe(producerGroup)) return null;
+ return getSubscriptionTracker().subscribeToMembers(producerGroup, sensor, listener);
+ }
+
+ @Override
+ public <T> SubscriptionHandle subscribeToChildren(Entity producerParent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ if (!checkCanSubscribe(producerParent)) return null;
+ return getSubscriptionTracker().subscribeToChildren(producerParent, sensor, listener);
+ }
+
+ @Override
+ public boolean unsubscribe(Entity producer) {
+ if (destroyed.get()) return false;
+ return getSubscriptionTracker().unsubscribe(producer);
+ }
+
+ @Override
+ public boolean unsubscribe(Entity producer, SubscriptionHandle handle) {
+ if (destroyed.get()) return false;
+ return getSubscriptionTracker().unsubscribe(producer, handle);
+ }
+
+ @Override
+ public boolean unsubscribe(SubscriptionHandle handle) {
+ if (destroyed.get()) return false;
+ return getSubscriptionTracker().unsubscribe(handle);
+ }
+
+ @Override
+ public void unsubscribeAll() {
+ if (destroyed.get()) return;
+ getSubscriptionTracker().unsubscribeAll();
+ }
+
+ protected SubscriptionTracker getSubscriptionTracker() {
+ synchronized (AbstractEntityAdjunct.this) {
+ if (_subscriptionTracker!=null) return _subscriptionTracker;
+ if (entity==null) return null;
+ _subscriptionTracker = new SubscriptionTracker(((EntityInternal)entity).getManagementSupport().getSubscriptionContext());
+ return _subscriptionTracker;
+ }
+ }
+
+ /** returns false if deleted, throws exception if invalid state, otherwise true.
+ * okay if entity is not yet managed (but not if entity is no longer managed). */
+ protected boolean checkCanSubscribe(Entity producer) {
+ if (destroyed.get()) return false;
+ if (producer==null) throw new IllegalStateException(this+" given a null target for subscription");
+ if (entity==null) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because it is not associated to an entity");
+ if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because the associated entity "+entity+" is no longer managed");
+ return true;
+ }
+
+ protected boolean checkCanSubscribe() {
+ if (destroyed.get()) return false;
+ if (entity==null) throw new IllegalStateException(this+" cannot subscribe because it is not associated to an entity");
+ if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe because the associated entity "+entity+" is no longer managed");
+ return true;
+ }
+
+ /**
+ * @return a list of all subscription handles
+ */
+ protected Collection<SubscriptionHandle> getAllSubscriptions() {
+ SubscriptionTracker tracker = getSubscriptionTracker();
+ return (tracker != null) ? tracker.getAllSubscriptions() : Collections.<SubscriptionHandle>emptyList();
+ }
+ }
+
private class BasicConfigurationSupport implements ConfigurationSupportInternal {
@Override
@@ -377,6 +461,10 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
}
}
+ /**
+ * @deprecated since 0.9.0; for internal use only
+ */
+ @Deprecated
protected synchronized SubscriptionTracker getSubscriptionTracker() {
if (_subscriptionTracker!=null) return _subscriptionTracker;
if (entity==null) return null;
@@ -384,14 +472,15 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
return _subscriptionTracker;
}
- @VisibleForTesting //intended as protected, meant for subclasses
- /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */
+ /**
+ * @deprecated since 0.9.0; see {@link SubscriptionSupport#subscribe(Entity, Sensor, SensorEventListener)} and {@link BrooklynObject#subscriptions()}
+ */
+ @Deprecated
public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
if (!checkCanSubscribe()) return null;
return getSubscriptionTracker().subscribe(producer, sensor, listener);
}
- @VisibleForTesting //intended as protected, meant for subclasses
@Beta
/** @see SubscriptionContext#subscribe(Map, Entity, Sensor, SensorEventListener) */
public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
@@ -399,67 +488,68 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
return getSubscriptionTracker().subscribe(flags, producer, sensor, listener);
}
- @VisibleForTesting //intended as protected, meant for subclasses
- /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */
+ /**
+ * @deprecated since 0.9.0; see {@link SubscriptionSupport#subscribeToMembers(Entity, Sensor, SensorEventListener)} and {@link BrooklynObject#subscriptions()}
+ */
+ @Deprecated
public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) {
if (!checkCanSubscribe(producerGroup)) return null;
return getSubscriptionTracker().subscribeToMembers(producerGroup, sensor, listener);
}
- @VisibleForTesting //intended as protected, meant for subclasses
- /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */
+ /**
+ * @deprecated since 0.9.0; see {@link SubscriptionSupport#subscribeToChildren(Entity, Sensor, SensorEventListener)} and {@link BrooklynObject#subscriptions()}
+ */
+ @Deprecated
public <T> SubscriptionHandle subscribeToChildren(Entity producerParent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
if (!checkCanSubscribe(producerParent)) return null;
return getSubscriptionTracker().subscribeToChildren(producerParent, sensor, listener);
}
- /** @deprecated since 0.7.0 use {@link #checkCanSubscribe(Entity)} */
+ /**
+ * @deprecated since 0.7.0 use {@link BasicSubscriptionSupport#checkCanSubscribe(Entity)
+ */
@Deprecated
protected boolean check(Entity requiredEntity) {
return checkCanSubscribe(requiredEntity);
}
- /** returns false if deleted, throws exception if invalid state, otherwise true.
- * okay if entity is not yet managed (but not if entity is no longer managed). */
+
+ /**
+ * @deprecated since 0.9.0; for internal use only
+ */
+ @Deprecated
protected boolean checkCanSubscribe(Entity producer) {
- if (destroyed.get()) return false;
- if (producer==null) throw new IllegalStateException(this+" given a null target for subscription");
- if (entity==null) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because it is not associated to an entity");
- if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because the associated entity "+entity+" is no longer managed");
- return true;
+ return subscriptions().checkCanSubscribe(producer);
}
+
+ /**
+ * @deprecated since 0.9.0; for internal use only
+ */
+ @Deprecated
protected boolean checkCanSubscribe() {
- if (destroyed.get()) return false;
- if (entity==null) throw new IllegalStateException(this+" cannot subscribe because it is not associated to an entity");
- if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe because the associated entity "+entity+" is no longer managed");
- return true;
+ return subscriptions().checkCanSubscribe();
}
/**
- * Unsubscribes the given producer.
- *
- * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
+ * @deprecated since 0.9.0; see {@link SubscriptionSupport#unsubscribe(Entity)} and {@link BrooklynObject#subscriptions()}
*/
- @VisibleForTesting //intended as protected, meant for subclasses
+ @Deprecated
public boolean unsubscribe(Entity producer) {
- if (destroyed.get()) return false;
- return getSubscriptionTracker().unsubscribe(producer);
+ return subscriptions().unsubscribe(producer);
}
/**
- * Unsubscribes the given producer.
- *
- * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
+ * @deprecated since 0.9.0; see {@link SubscriptionSupport#unsubscribe(Entity, SubscriptionHandle)} and {@link BrooklynObject#subscriptions()}
*/
- @VisibleForTesting //intended as protected, meant for subclasses
+ @Deprecated
public boolean unsubscribe(Entity producer, SubscriptionHandle handle) {
- if (destroyed.get()) return false;
- return getSubscriptionTracker().unsubscribe(producer, handle);
+ return subscriptions().unsubscribe(producer, handle);
}
/**
- * @return a list of all subscription handles
+ * @deprecated since 0.9.0; for internal use only
*/
- @VisibleForTesting //intended as protected, meant for subclasses
+ @Deprecated
protected Collection<SubscriptionHandle> getAllSubscriptions() {
SubscriptionTracker tracker = getSubscriptionTracker();
return (tracker != null) ? tracker.getAllSubscriptions() : Collections.<SubscriptionHandle>emptyList();
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
index ad2cca1..d076f4e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
@@ -38,8 +38,12 @@ public interface BrooklynObjectInternal extends BrooklynObject, Rebindable {
@SuppressWarnings("rawtypes") // subclasses typically apply stronger typing
RebindSupport getRebindSupport();
+ @Override
ConfigurationSupportInternal config();
+ @Override
+ SubscriptionSupportInternal subscriptions();
+
@Beta
public interface ConfigurationSupportInternal extends Configurable.ConfigurationSupport {
@@ -100,4 +104,9 @@ public interface BrooklynObjectInternal extends BrooklynObject, Rebindable {
@Beta
void refreshInheritedConfigOfChildren();
}
+
+ @Beta
+ public interface SubscriptionSupportInternal extends BrooklynObject.SubscriptionSupport {
+ public void unsubscribeAll();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java
new file mode 100644
index 0000000..30352c7
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.location;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.location.SimulatedLocation;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.entity.group.BasicGroup;
+import org.apache.brooklyn.test.Asserts;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class LocationSubscriptionTest {
+
+ // TODO Duplication between this and PolicySubscriptionTest
+
+ private SimulatedLocation loc;
+ private TestApplication app;
+ private TestEntity observedEntity;
+ private BasicGroup observedGroup;
+ private TestEntity observedChildEntity;
+ private TestEntity observedMemberEntity;
+ private TestEntity otherEntity;
+ private RecordingSensorEventListener<Object> listener;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() {
+ app = TestApplication.Factory.newManagedInstanceForTests();
+ loc = app.newSimulatedLocation();
+ observedEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ observedChildEntity = observedEntity.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+ observedGroup = app.createAndManageChild(EntitySpec.create(BasicGroup.class));
+ observedMemberEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ observedGroup.addMember(observedMemberEntity);
+
+ otherEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+ listener = new RecordingSensorEventListener<>();
+
+ app.start(ImmutableList.of(loc));
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() {
+ if (app != null) Entities.destroyAll(app.getManagementContext());
+ }
+
+ @Test
+ public void testSubscriptionReceivesEvents() {
+ loc.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
+ loc.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+ loc.subscriptions().subscribe(observedEntity, TestEntity.MY_NOTIF, listener);
+
+ otherEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ observedEntity.sensors().set(TestEntity.NAME, "myname");
+ observedEntity.sensors().emit(TestEntity.MY_NOTIF, 456);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123),
+ new BasicSensorEvent<String>(TestEntity.NAME, observedEntity, "myname"),
+ new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, observedEntity, 456)));
+ }});
+ }
+
+ @Test
+ public void testSubscriptionToAllReceivesEvents() {
+ loc.subscriptions().subscribe(null, TestEntity.SEQUENCE, listener);
+
+ observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ otherEntity.sensors().set(TestEntity.SEQUENCE, 456);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123),
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456)));
+ }});
+ }
+
+ @Test
+ public void testSubscribeToChildrenReceivesEvents() {
+ loc.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener);
+
+ observedChildEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ observedEntity.sensors().set(TestEntity.SEQUENCE, 456);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedChildEntity, 123)));
+ }});
+ }
+
+ @Test
+ public void testSubscribeToChildrenReceivesEventsForDynamicallyAddedChildren() {
+ loc.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener);
+
+ final TestEntity observedChildEntity2 = observedEntity.createAndManageChild(EntitySpec.create(TestEntity.class));
+ observedChildEntity2.sensors().set(TestEntity.SEQUENCE, 123);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedChildEntity2, 123)));
+ }});
+ }
+
+ @Test
+ public void testSubscribeToMembersReceivesEvents() {
+ loc.subscriptions().subscribeToMembers(observedGroup, TestEntity.SEQUENCE, listener);
+
+ observedMemberEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ observedGroup.sensors().set(TestEntity.SEQUENCE, 456);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedMemberEntity, 123)));
+ }});
+ }
+
+ @Test
+ public void testSubscribeToMembersReceivesEventsForDynamicallyAddedMembers() {
+ loc.subscriptions().subscribeToMembers(observedGroup, TestEntity.SEQUENCE, listener);
+
+ final TestEntity observedMemberEntity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ observedGroup.addMember(observedMemberEntity2);
+ observedMemberEntity2.sensors().set(TestEntity.SEQUENCE, 123);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedMemberEntity2, 123)));
+ }});
+ }
+
+ @Test(groups="Integration")
+ public void testSubscribeToMembersIgnoresEventsForDynamicallyRemovedMembers() {
+ loc.subscriptions().subscribeToMembers(observedGroup, TestEntity.SEQUENCE, listener);
+
+ observedGroup.removeMember(observedMemberEntity);
+
+ observedMemberEntity.sensors().set(TestEntity.SEQUENCE, 123);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of());
+ }});
+ }
+
+ @Test
+ public void testUnsubscribeRemovesAllSubscriptionsForThatEntity() {
+ loc.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
+ loc.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+ loc.subscriptions().subscribe(observedEntity, TestEntity.MY_NOTIF, listener);
+ loc.subscriptions().subscribe(otherEntity, TestEntity.SEQUENCE, listener);
+ loc.subscriptions().unsubscribe(observedEntity);
+
+ observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ observedEntity.sensors().set(TestEntity.NAME, "myname");
+ observedEntity.sensors().emit(TestEntity.MY_NOTIF, 123);
+ otherEntity.sensors().set(TestEntity.SEQUENCE, 456);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456)));
+ }});
+ }
+
+ @Test
+ public void testUnsubscribeUsingHandleStopsEvents() {
+ SubscriptionHandle handle1 = loc.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
+ SubscriptionHandle handle2 = loc.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+ SubscriptionHandle handle3 = loc.subscriptions().subscribe(otherEntity, TestEntity.SEQUENCE, listener);
+
+ loc.subscriptions().unsubscribe(observedEntity, handle2);
+
+ observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+ observedEntity.sensors().set(TestEntity.NAME, "myname");
+ otherEntity.sensors().set(TestEntity.SEQUENCE, 456);
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123),
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456)));
+ }});
+ }
+
+ @Test
+ public void testSubscriptionReceivesEventsInOrder() {
+ final int NUM_EVENTS = 100;
+ loc.subscriptions().subscribe(observedEntity, TestEntity.MY_NOTIF, listener);
+
+ for (int i = 0; i < NUM_EVENTS; i++) {
+ observedEntity.sensors().emit(TestEntity.MY_NOTIF, i);
+ }
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(Iterables.size(listener.getEvents()), NUM_EVENTS);
+ for (int i = 0; i < NUM_EVENTS; i++) {
+ assertEquals(Iterables.get(listener.getEvents(), i).getValue(), i);
+ }
+ }});
+ }
+
+}