You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/09/23 12:51:58 UTC

[06/10] incubator-brooklyn git commit: Move subscriptions() from Entity to BrooklynObject

Move subscriptions() from Entity to BrooklynObject

- Add support for subscriptions to Location (and test)
- For CatalogItem.subscriptions(), throw UnsupportedOperationException
- Deprecate methods on AbstractEntityAdjunct, in preference for
  AbstractEntityAdjunct.subscriptions().*


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/4ad6cc96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/4ad6cc96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/4ad6cc96

Branch: refs/heads/master
Commit: 4ad6cc967730001715ac3a97661106b5ed1b6c67
Parents: 2b29795
Author: Aled Sage <al...@gmail.com>
Authored: Mon Sep 21 10:05:11 2015 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Wed Sep 23 10:33:50 2015 +0100

----------------------------------------------------------------------
 .../brooklyn/api/catalog/CatalogItem.java       |  12 +
 .../org/apache/brooklyn/api/entity/Entity.java  |  51 ----
 .../brooklyn/api/mgmt/ManagementContext.java    |   9 +
 .../brooklyn/api/objs/BrooklynObject.java       |  63 ++++-
 .../core/catalog/internal/CatalogItemDo.java    |  11 +-
 .../internal/CatalogItemDtoAbstract.java        |  14 +-
 .../brooklyn/core/entity/EntityInternal.java    |   8 -
 .../core/location/AbstractLocation.java         |  79 +++++-
 .../access/PortForwardManagerClient.java        |   5 +
 .../internal/AbstractManagementContext.java     |  11 +-
 .../mgmt/internal/BasicSubscriptionContext.java |   3 +-
 .../NonDeploymentManagementContext.java         |   7 +
 .../core/objs/AbstractEntityAdjunct.java        | 164 ++++++++++---
 .../core/objs/BrooklynObjectInternal.java       |   9 +
 .../core/location/LocationSubscriptionTest.java | 241 +++++++++++++++++++
 15 files changed, 580 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java b/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java
index 3758d08..bf806aa 100644
--- a/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java
+++ b/api/src/main/java/org/apache/brooklyn/api/catalog/CatalogItem.java
@@ -48,6 +48,18 @@ public interface CatalogItem<T,SpecT> extends BrooklynObject, Rebindable {
         public boolean isNamed();
     }
 
+    /**
+     * @throws UnsupportedOperationException; config not supported for catalog items
+     */
+    @Override
+    ConfigurationSupport config();
+
+    /**
+     * @throws UnsupportedOperationException; subscriptions are not supported for catalog items
+     */
+    @Override
+    SubscriptionSupport subscriptions();
+    
     @Deprecated
     public static interface CatalogItemLibraries {
         Collection<String> getBundles();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java b/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java
index 795218c..dd141f0 100644
--- a/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java
+++ b/api/src/main/java/org/apache/brooklyn/api/entity/Entity.java
@@ -301,8 +301,6 @@ public interface Entity extends BrooklynObject {
     
     SensorSupport sensors();
 
-    SubscriptionSupport subscriptions();
-
     PolicySupport policies();
 
     EnricherSupport enrichers();
@@ -354,55 +352,6 @@ public interface Entity extends BrooklynObject {
     }
     
     @Beta
-    public interface SubscriptionSupport {
-        /**
-         * Allow us to subscribe to data from a {@link Sensor} on another entity.
-         * 
-         * @return a subscription id which can be used to unsubscribe
-         *
-         * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)
-         */
-        // FIXME remove from interface?
-        @Beta
-        <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
-     
-        /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */
-        // FIXME remove from interface?
-        @Beta
-        <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener);
-     
-        /** @see SubscriptionManager#subscribeToMembers(Group, Sensor, SensorEventListener) */
-        // FIXME remove from interface?
-        @Beta
-        <T> SubscriptionHandle subscribeToMembers(Group group, Sensor<T> sensor, SensorEventListener<? super T> listener);
-
-        /**
-         * Unsubscribes from the given producer.
-         *
-         * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
-         */
-        @Beta
-        boolean unsubscribe(Entity producer);
-
-        /**
-         * Unsubscribes the given handle.
-         *
-         * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
-         */
-        @Beta
-        boolean unsubscribe(Entity producer, SubscriptionHandle handle);
-        
-        /**
-         * Unsubscribes the given handle.
-         * 
-         * It is (currently) more efficient to also pass in the producer -
-         * see {@link BasicSubscriptionSupport#unsubscribe(Entity, SubscriptionHandle)} 
-         */
-        @Beta
-        boolean unsubscribe(SubscriptionHandle handle);
-    }
-
-    @Beta
     public interface PolicySupport {
         /**
          * @return an immutable thread-safe view of the policies.

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
index ac4924e..f809fb2 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java
@@ -28,6 +28,7 @@ import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.drivers.DriverDependentEntity;
 import org.apache.brooklyn.api.entity.drivers.EntityDriverManager;
 import org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager;
+import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.location.LocationRegistry;
 import org.apache.brooklyn.api.mgmt.entitlement.EntitlementManager;
 import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager;
@@ -156,6 +157,14 @@ public interface ManagementContext {
      */
     SubscriptionContext getSubscriptionContext(Entity entity);
 
+    /**
+     * Returns a {@link SubscriptionContext} instance representing subscriptions
+     * (from the {@link SubscriptionManager}) associated with this location, and capable 
+     * of conveniently subscribing on behalf of that location  
+     */
+    @Beta
+    SubscriptionContext getSubscriptionContext(Location location);
+
     @Beta // method may move to an internal interface; brooklyn users should not need to call this directly
     RebindManager getRebindManager();
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java b/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java
index c932f02..094586f 100644
--- a/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java
+++ b/api/src/main/java/org/apache/brooklyn/api/objs/BrooklynObject.java
@@ -18,10 +18,20 @@
  */
 package org.apache.brooklyn.api.objs;
 
+import java.util.Map;
 import java.util.Set;
 
 import javax.annotation.Nonnull;
 
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.mgmt.SubscriptionContext;
+import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
+import org.apache.brooklyn.api.mgmt.SubscriptionManager;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+
+import com.google.common.annotations.Beta;
 import com.google.common.collect.ImmutableMap;
 
 /**
@@ -53,7 +63,13 @@ public interface BrooklynObject extends Identifiable, Configurable {
      * and they should be amenable to our persistence (on-disk serialization) and our JSON serialization in the REST API.
      */
     TagSupport tags();
-    
+
+    /**
+     * Subscriptions are the mechanism for receiving notifications of sensor-events (e.g. attribute-changed) from 
+     * other entities.
+     */
+    SubscriptionSupport subscriptions();
+
     public interface TagSupport {
         /**
          * @return An immutable copy of the set of tags on this entity. 
@@ -70,4 +86,49 @@ public interface BrooklynObject extends Identifiable, Configurable {
         
         boolean removeTag(@Nonnull Object tag);
     }
+    
+    @Beta
+    public interface SubscriptionSupport {
+        /**
+         * Allow us to subscribe to data from a {@link Sensor} on another entity.
+         * 
+         * @return a subscription id which can be used to unsubscribe
+         *
+         * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)
+         */
+        @Beta
+        <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
+     
+        /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */
+        @Beta
+        <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener);
+     
+        /** @see SubscriptionManager#subscribeToMembers(Group, Sensor, SensorEventListener) */
+        @Beta
+        <T> SubscriptionHandle subscribeToMembers(Group group, Sensor<T> sensor, SensorEventListener<? super T> listener);
+
+        /**
+         * Unsubscribes from the given producer.
+         *
+         * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
+         */
+        @Beta
+        boolean unsubscribe(Entity producer);
+
+        /**
+         * Unsubscribes the given handle.
+         *
+         * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
+         */
+        @Beta
+        boolean unsubscribe(Entity producer, SubscriptionHandle handle);
+        
+        /**
+         * Unsubscribes the given handle.
+         * 
+         * It is (currently) more efficient to also pass in the producer -
+         * see {@link SubscriptionSupport#unsubscribe(Entity, SubscriptionHandle)} 
+         */
+        boolean unsubscribe(SubscriptionHandle handle);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java
index 5029d8d..0545a06 100644
--- a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java
+++ b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDo.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
+import org.apache.brooklyn.core.objs.BrooklynObjectInternal.SubscriptionSupportInternal;
 import org.apache.brooklyn.api.catalog.CatalogItem;
 import org.apache.brooklyn.api.mgmt.ManagementContext;
 import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
@@ -49,13 +50,21 @@ public class CatalogItemDo<T,SpecT> implements CatalogItem<T,SpecT>, BrooklynObj
     }
     
     /**
-     * Config not supported for catalog item. See {@link #getPlanYaml()}.
+     * @throws UnsupportedOperationException; Config not supported for catalog item. See {@link #getPlanYaml()}.
      */
     @Override
     public ConfigurationSupportInternal config() {
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * @throws UnsupportedOperationException; subscriptions are not supported for catalog items
+     */
+    @Override
+    public SubscriptionSupportInternal subscriptions() {
+        throw new UnsupportedOperationException();
+    }
+    
     @Override
     public <U> U setConfig(ConfigKey<U> key, U val) {
         return config().set(key, val);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java
index b281941..c950b7b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java
+++ b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogItemDtoAbstract.java
@@ -26,8 +26,6 @@ import java.util.Set;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.brooklyn.api.catalog.CatalogItem;
 import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.CatalogItemMemento;
@@ -37,6 +35,8 @@ import org.apache.brooklyn.core.objs.AbstractBrooklynObject;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.core.flags.FlagUtils;
 import org.apache.brooklyn.util.core.flags.SetFromFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
@@ -66,13 +66,21 @@ public abstract class CatalogItemDtoAbstract<T, SpecT> extends AbstractBrooklynO
     private @SetFromFlag boolean disabled;
 
     /**
-     * Config not supported for catalog item. See {@link #getPlanYaml()}.
+     * @throws UnsupportedOperationException; Config not supported for catalog item. See {@link #getPlanYaml()}.
      */
     @Override
     public ConfigurationSupportInternal config() {
         throw new UnsupportedOperationException();
     }
     
+    /**
+     * @throws UnsupportedOperationException; subscriptions are not supported for catalog items
+     */
+    @Override
+    public SubscriptionSupportInternal subscriptions() {
+        throw new UnsupportedOperationException();
+    }
+    
     @Override
     public <U> U setConfig(ConfigKey<U> key, U val) {
         return config().set(key, val);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java b/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java
index 4fa9c67..49dfa00 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java
@@ -184,9 +184,6 @@ public interface EntityInternal extends BrooklynObjectInternal, EntityLocal, Reb
     SensorSupportInternal sensors();
 
     @Override
-    SubscriptionSupportInternal subscriptions();
-
-    @Override
     PolicySupportInternal policies();
 
     @Override
@@ -230,11 +227,6 @@ public interface EntityInternal extends BrooklynObjectInternal, EntityLocal, Reb
     }
     
     @Beta
-    public interface SubscriptionSupportInternal extends Entity.SubscriptionSupport {
-        public void unsubscribeAll();
-    }
-    
-    @Beta
     public interface PolicySupportInternal extends Entity.PolicySupport {
         /**
          * Removes all policy from this entity. 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java b/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java
index 507e7f5..b8859d6 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/AbstractLocation.java
@@ -29,12 +29,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
 import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.mgmt.SubscriptionContext;
+import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.LocationMemento;
 import org.apache.brooklyn.api.objs.Configurable;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.config.ConfigInheritance;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.config.ConfigKey.HasConfigKey;
@@ -50,17 +56,19 @@ import org.apache.brooklyn.core.location.internal.LocationDynamicType;
 import org.apache.brooklyn.core.location.internal.LocationInternal;
 import org.apache.brooklyn.core.mgmt.internal.LocalLocationManager;
 import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
+import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker;
 import org.apache.brooklyn.core.mgmt.rebind.BasicLocationRebindSupport;
 import org.apache.brooklyn.core.objs.AbstractBrooklynObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.brooklyn.util.collections.SetFromLiveMap;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.flags.FlagUtils;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.stream.Streams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.Beta;
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Splitter;
@@ -110,8 +118,13 @@ public abstract class AbstractLocation extends AbstractBrooklynObject implements
 
     private BasicConfigurationSupport config = new BasicConfigurationSupport();
     
+    private BasicSubscriptionSupport subscriptions = new BasicSubscriptionSupport();
+    
     private ConfigBag configBag = new ConfigBag();
 
+    /** not for direct access; refer to as 'subscriptionTracker' via getter so that it is initialized */
+    protected transient SubscriptionTracker _subscriptionTracker;
+
     private volatile boolean managed;
 
     private boolean inConstruction;
@@ -354,7 +367,16 @@ public abstract class AbstractLocation extends AbstractBrooklynObject implements
 
     @Override
     public ConfigurationSupportInternal config() {
-        return config ;
+        return config;
+    }
+
+    // the concrete type rather than an interface is returned because Groovy subclasses
+    // complain (incorrectly) if we return SubscriptionSupportInternal
+    // TODO revert to SubscriptionSupportInternal when groovy subclasses work without this (eg new groovy version)
+    @Override
+    @Beta
+    public BasicSubscriptionSupport subscriptions() {
+        return subscriptions;
     }
 
     private class BasicConfigurationSupport implements ConfigurationSupportInternal {
@@ -478,6 +500,57 @@ public abstract class AbstractLocation extends AbstractBrooklynObject implements
         }
     }
     
+    public class BasicSubscriptionSupport implements SubscriptionSupportInternal {
+        @Override
+        public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+            return getSubscriptionTracker().subscribe(producer, sensor, listener);
+        }
+
+        @Override
+        public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+            return getSubscriptionTracker().subscribeToMembers(producerGroup, sensor, listener);
+        }
+
+        @Override
+        public <T> SubscriptionHandle subscribeToChildren(Entity producerParent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+            return getSubscriptionTracker().subscribeToChildren(producerParent, sensor, listener);
+        }
+        
+        @Override
+        public boolean unsubscribe(Entity producer) {
+            return getSubscriptionTracker().unsubscribe(producer);
+        }
+
+        @Override
+        public boolean unsubscribe(Entity producer, SubscriptionHandle handle) {
+            return getSubscriptionTracker().unsubscribe(producer, handle);
+        }
+
+        @Override
+        public boolean unsubscribe(SubscriptionHandle handle) {
+            return getSubscriptionTracker().unsubscribe(handle);
+        }
+
+        @Override
+        public void unsubscribeAll() {
+            getSubscriptionTracker().unsubscribeAll();
+        }
+
+        protected SubscriptionTracker getSubscriptionTracker() {
+            synchronized (AbstractLocation.this) {
+                if (_subscriptionTracker!=null) return _subscriptionTracker;
+                _subscriptionTracker = new SubscriptionTracker(newSubscriptionContext());
+                return _subscriptionTracker;
+            }
+        }
+        
+        private SubscriptionContext newSubscriptionContext() {
+            synchronized (AbstractLocation.this) {
+                return getManagementContext().getSubscriptionContext(AbstractLocation.this);
+            }
+        }
+    }
+    
     @Override
     public <T> T getConfig(HasConfigKey<T> key) {
         return config().get(key);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java b/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java
index 12e15aa..d0199a0 100644
--- a/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java
+++ b/core/src/main/java/org/apache/brooklyn/core/location/access/PortForwardManagerClient.java
@@ -402,4 +402,9 @@ public class PortForwardManagerClient implements PortForwardManager {
     public ConfigurationSupport config() {
         return getDelegate().config();
     }
+    
+    @Override
+    public SubscriptionSupport subscriptions() {
+        return getDelegate().subscriptions();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
index 76871cd..343528d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java
@@ -32,14 +32,13 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.brooklyn.api.catalog.BrooklynCatalog;
 import org.apache.brooklyn.api.catalog.CatalogItem;
 import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.drivers.EntityDriverManager;
 import org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager;
+import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.location.LocationRegistry;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
 import org.apache.brooklyn.api.mgmt.ManagementContext;
@@ -78,6 +77,8 @@ import org.apache.brooklyn.util.core.task.BasicExecutionContext;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
 import org.apache.brooklyn.util.guava.Maybe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
 import com.google.common.base.Objects;
@@ -260,6 +261,12 @@ public abstract class AbstractManagementContext implements ManagementContextInte
     }
 
     @Override
+    public SubscriptionContext getSubscriptionContext(Location loc) {
+        // BSC is a thin wrapper around SM so fine to create a new one here
+        return new BasicSubscriptionContext(getSubscriptionManager(), loc);
+    }
+
+    @Override
     public EntityDriverManager getEntityDriverManager() {
         return entityDriverManager;
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
index d821c4e..57d4712 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
@@ -19,7 +19,6 @@
 package org.apache.brooklyn.core.mgmt.internal;
 
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.mapOf;
-import groovy.lang.Closure;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -42,6 +41,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 
+import groovy.lang.Closure;
+
 /**
  * A {@link SubscriptionContext} for an entity or other user of a {@link SubscriptionManager}.
  */

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
index 87e8f84..1f62add 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
@@ -241,6 +241,13 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
     }
 
     @Override
+    public synchronized SubscriptionContext getSubscriptionContext(Location loc) {
+        // Should never be called; the NonDeploymentManagementContext is associated with a particular entity, whereas
+        // the #getSubscriptionContext(loc) should only be called in the context of a location.
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public ExecutionContext getExecutionContext(Entity entity) {
         if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity);
         if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED)

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index fb71901..814923a 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -32,17 +32,17 @@ import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.entity.Group;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
-import org.apache.brooklyn.api.mgmt.SubscriptionContext;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.objs.BrooklynObject;
 import org.apache.brooklyn.api.objs.Configurable;
 import org.apache.brooklyn.api.objs.EntityAdjunct;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.config.ConfigMap;
 import org.apache.brooklyn.config.ConfigKey.HasConfigKey;
+import org.apache.brooklyn.config.ConfigMap;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.enricher.AbstractEnricher;
 import org.apache.brooklyn.core.entity.Entities;
@@ -58,7 +58,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.Beta;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
@@ -82,7 +81,9 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
     protected transient ExecutionContext execution;
 
     private final BasicConfigurationSupport config = new BasicConfigurationSupport();
-    
+
+    private final BasicSubscriptionSupport subscriptions = new BasicSubscriptionSupport();
+
     /**
      * The config values of this entity. Updating this map should be done
      * via {@link #config()}.
@@ -199,6 +200,89 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
         return config;
     }
 
+    @Override
+    public BasicSubscriptionSupport subscriptions() {
+        return subscriptions;
+    }
+
+    public class BasicSubscriptionSupport implements SubscriptionSupportInternal {
+        @Override
+        public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+            if (!checkCanSubscribe()) return null;
+            return getSubscriptionTracker().subscribe(producer, sensor, listener);
+        }
+
+        @Override
+        public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+            if (!checkCanSubscribe(producerGroup)) return null;
+            return getSubscriptionTracker().subscribeToMembers(producerGroup, sensor, listener);
+        }
+
+        @Override
+        public <T> SubscriptionHandle subscribeToChildren(Entity producerParent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+            if (!checkCanSubscribe(producerParent)) return null;
+            return getSubscriptionTracker().subscribeToChildren(producerParent, sensor, listener);
+        }
+        
+        @Override
+        public boolean unsubscribe(Entity producer) {
+            if (destroyed.get()) return false;
+            return getSubscriptionTracker().unsubscribe(producer);
+        }
+
+        @Override
+        public boolean unsubscribe(Entity producer, SubscriptionHandle handle) {
+            if (destroyed.get()) return false;
+            return getSubscriptionTracker().unsubscribe(producer, handle);
+        }
+
+        @Override
+        public boolean unsubscribe(SubscriptionHandle handle) {
+            if (destroyed.get()) return false;
+            return getSubscriptionTracker().unsubscribe(handle);
+        }
+
+        @Override
+        public void unsubscribeAll() {
+            if (destroyed.get()) return;
+            getSubscriptionTracker().unsubscribeAll();
+        }
+
+        protected SubscriptionTracker getSubscriptionTracker() {
+            synchronized (AbstractEntityAdjunct.this) {
+                if (_subscriptionTracker!=null) return _subscriptionTracker;
+                if (entity==null) return null;
+                _subscriptionTracker = new SubscriptionTracker(((EntityInternal)entity).getManagementSupport().getSubscriptionContext());
+                return _subscriptionTracker;
+            }
+        }
+        
+        /** returns false if deleted, throws exception if invalid state, otherwise true.
+         * okay if entity is not yet managed (but not if entity is no longer managed). */
+        protected boolean checkCanSubscribe(Entity producer) {
+            if (destroyed.get()) return false;
+            if (producer==null) throw new IllegalStateException(this+" given a null target for subscription");
+            if (entity==null) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because it is not associated to an entity");
+            if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because the associated entity "+entity+" is no longer managed");
+            return true;
+        }
+        
+        protected boolean checkCanSubscribe() {
+            if (destroyed.get()) return false;
+            if (entity==null) throw new IllegalStateException(this+" cannot subscribe because it is not associated to an entity");
+            if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe because the associated entity "+entity+" is no longer managed");
+            return true;
+        }
+
+        /**
+         * @return a list of all subscription handles
+         */
+        protected Collection<SubscriptionHandle> getAllSubscriptions() {
+            SubscriptionTracker tracker = getSubscriptionTracker();
+            return (tracker != null) ? tracker.getAllSubscriptions() : Collections.<SubscriptionHandle>emptyList();
+        }
+    }
+    
     private class BasicConfigurationSupport implements ConfigurationSupportInternal {
 
         @Override
@@ -377,6 +461,10 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
         }
     }
 
+    /**
+     * @deprecated since 0.9.0; for internal use only
+     */
+    @Deprecated
     protected synchronized SubscriptionTracker getSubscriptionTracker() {
         if (_subscriptionTracker!=null) return _subscriptionTracker;
         if (entity==null) return null;
@@ -384,14 +472,15 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
         return _subscriptionTracker;
     }
     
-    @VisibleForTesting //intended as protected, meant for subclasses
-    /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */
+    /**
+     * @deprecated since 0.9.0; see {@link SubscriptionSupport#subscribe(Entity, Sensor, SensorEventListener)} and {@link BrooklynObject#subscriptions()}
+     */
+    @Deprecated
     public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
         if (!checkCanSubscribe()) return null;
         return getSubscriptionTracker().subscribe(producer, sensor, listener);
     }
 
-    @VisibleForTesting //intended as protected, meant for subclasses
     @Beta
     /** @see SubscriptionContext#subscribe(Map, Entity, Sensor, SensorEventListener) */
     public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
@@ -399,67 +488,68 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
         return getSubscriptionTracker().subscribe(flags, producer, sensor, listener);
     }
 
-    @VisibleForTesting //intended as protected, meant for subclasses
-    /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */
+    /**
+     * @deprecated since 0.9.0; see {@link SubscriptionSupport#subscribeToMembers(Entity, Sensor, SensorEventListener)} and {@link BrooklynObject#subscriptions()}
+     */
+    @Deprecated
     public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) {
         if (!checkCanSubscribe(producerGroup)) return null;
         return getSubscriptionTracker().subscribeToMembers(producerGroup, sensor, listener);
     }
 
-    @VisibleForTesting //intended as protected, meant for subclasses 
-    /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */
+    /**
+     * @deprecated since 0.9.0; see {@link SubscriptionSupport#subscribeToChildren(Entity, Sensor, SensorEventListener)} and {@link BrooklynObject#subscriptions()}
+     */
+    @Deprecated
     public <T> SubscriptionHandle subscribeToChildren(Entity producerParent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
         if (!checkCanSubscribe(producerParent)) return null;
         return getSubscriptionTracker().subscribeToChildren(producerParent, sensor, listener);
     }
 
-    /** @deprecated since 0.7.0 use {@link #checkCanSubscribe(Entity)} */
+    /**
+     * @deprecated since 0.7.0 use {@link BasicSubscriptionSupport#checkCanSubscribe(Entity)
+     */
     @Deprecated
     protected boolean check(Entity requiredEntity) {
         return checkCanSubscribe(requiredEntity);
     }
-    /** returns false if deleted, throws exception if invalid state, otherwise true.
-     * okay if entity is not yet managed (but not if entity is no longer managed). */
+    
+    /**
+     * @deprecated since 0.9.0; for internal use only
+     */
+    @Deprecated
     protected boolean checkCanSubscribe(Entity producer) {
-        if (destroyed.get()) return false;
-        if (producer==null) throw new IllegalStateException(this+" given a null target for subscription");
-        if (entity==null) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because it is not associated to an entity");
-        if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe to "+producer+" because the associated entity "+entity+" is no longer managed");
-        return true;
+        return subscriptions().checkCanSubscribe(producer);
     }
+    
+    /**
+     * @deprecated since 0.9.0; for internal use only
+     */
+    @Deprecated
     protected boolean checkCanSubscribe() {
-        if (destroyed.get()) return false;
-        if (entity==null) throw new IllegalStateException(this+" cannot subscribe because it is not associated to an entity");
-        if (((EntityInternal)entity).getManagementSupport().isNoLongerManaged()) throw new IllegalStateException(this+" cannot subscribe because the associated entity "+entity+" is no longer managed");
-        return true;
+        return subscriptions().checkCanSubscribe();
     }
         
     /**
-     * Unsubscribes the given producer.
-     *
-     * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
+     * @deprecated since 0.9.0; see {@link SubscriptionSupport#unsubscribe(Entity)} and {@link BrooklynObject#subscriptions()}
      */
-    @VisibleForTesting //intended as protected, meant for subclasses
+    @Deprecated
     public boolean unsubscribe(Entity producer) {
-        if (destroyed.get()) return false;
-        return getSubscriptionTracker().unsubscribe(producer);
+        return subscriptions().unsubscribe(producer);
     }
 
     /**
-     * Unsubscribes the given producer.
-     *
-     * @see SubscriptionContext#unsubscribe(SubscriptionHandle)
+     * @deprecated since 0.9.0; see {@link SubscriptionSupport#unsubscribe(Entity, SubscriptionHandle)} and {@link BrooklynObject#subscriptions()}
      */
-    @VisibleForTesting //intended as protected, meant for subclasses
+    @Deprecated
     public boolean unsubscribe(Entity producer, SubscriptionHandle handle) {
-        if (destroyed.get()) return false;
-        return getSubscriptionTracker().unsubscribe(producer, handle);
+        return subscriptions().unsubscribe(producer, handle);
     }
 
     /**
-     * @return a list of all subscription handles
+     * @deprecated since 0.9.0; for internal use only
      */
-    @VisibleForTesting //intended as protected, meant for subclasses
+    @Deprecated
     protected Collection<SubscriptionHandle> getAllSubscriptions() {
         SubscriptionTracker tracker = getSubscriptionTracker();
         return (tracker != null) ? tracker.getAllSubscriptions() : Collections.<SubscriptionHandle>emptyList();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
index ad2cca1..d076f4e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java
@@ -38,8 +38,12 @@ public interface BrooklynObjectInternal extends BrooklynObject, Rebindable {
     @SuppressWarnings("rawtypes")  // subclasses typically apply stronger typing
     RebindSupport getRebindSupport();
     
+    @Override
     ConfigurationSupportInternal config();
 
+    @Override
+    SubscriptionSupportInternal subscriptions();
+
     @Beta
     public interface ConfigurationSupportInternal extends Configurable.ConfigurationSupport {
         
@@ -100,4 +104,9 @@ public interface BrooklynObjectInternal extends BrooklynObject, Rebindable {
         @Beta
         void refreshInheritedConfigOfChildren();
     }
+    
+    @Beta
+    public interface SubscriptionSupportInternal extends BrooklynObject.SubscriptionSupport {
+        public void unsubscribeAll();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4ad6cc96/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java
new file mode 100644
index 0000000..30352c7
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/location/LocationSubscriptionTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.location;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.location.SimulatedLocation;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.entity.group.BasicGroup;
+import org.apache.brooklyn.test.Asserts;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class LocationSubscriptionTest {
+
+    // TODO Duplication between this and PolicySubscriptionTest
+    
+    private SimulatedLocation loc;
+    private TestApplication app;
+    private TestEntity observedEntity;
+    private BasicGroup observedGroup;
+    private TestEntity observedChildEntity;
+    private TestEntity observedMemberEntity;
+    private TestEntity otherEntity;
+    private RecordingSensorEventListener<Object> listener;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() {
+        app = TestApplication.Factory.newManagedInstanceForTests();
+        loc = app.newSimulatedLocation();
+        observedEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        observedChildEntity = observedEntity.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+        observedGroup = app.createAndManageChild(EntitySpec.create(BasicGroup.class));
+        observedMemberEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        observedGroup.addMember(observedMemberEntity);
+        
+        otherEntity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        
+        listener = new RecordingSensorEventListener<>();
+        
+        app.start(ImmutableList.of(loc));
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() {
+        if (app != null) Entities.destroyAll(app.getManagementContext());
+    }
+    
+    @Test
+    public void testSubscriptionReceivesEvents() {
+        loc.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
+        loc.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+        loc.subscriptions().subscribe(observedEntity, TestEntity.MY_NOTIF, listener);
+        
+        otherEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        observedEntity.sensors().set(TestEntity.NAME, "myname");
+        observedEntity.sensors().emit(TestEntity.MY_NOTIF, 456);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123),
+                        new BasicSensorEvent<String>(TestEntity.NAME, observedEntity, "myname"),
+                        new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, observedEntity, 456)));
+            }});
+    }
+    
+    @Test
+    public void testSubscriptionToAllReceivesEvents() {
+        loc.subscriptions().subscribe(null, TestEntity.SEQUENCE, listener);
+        
+        observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        otherEntity.sensors().set(TestEntity.SEQUENCE, 456);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123),
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456)));
+            }});
+    }
+    
+    @Test
+    public void testSubscribeToChildrenReceivesEvents() {
+        loc.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener);
+        
+        observedChildEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        observedEntity.sensors().set(TestEntity.SEQUENCE, 456);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedChildEntity, 123)));
+            }});
+    }
+    
+    @Test
+    public void testSubscribeToChildrenReceivesEventsForDynamicallyAddedChildren() {
+        loc.subscriptions().subscribeToChildren(observedEntity, TestEntity.SEQUENCE, listener);
+        
+        final TestEntity observedChildEntity2 = observedEntity.createAndManageChild(EntitySpec.create(TestEntity.class));
+        observedChildEntity2.sensors().set(TestEntity.SEQUENCE, 123);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedChildEntity2, 123)));
+            }});
+    }
+    
+    @Test
+    public void testSubscribeToMembersReceivesEvents() {
+        loc.subscriptions().subscribeToMembers(observedGroup, TestEntity.SEQUENCE, listener);
+        
+        observedMemberEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        observedGroup.sensors().set(TestEntity.SEQUENCE, 456);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedMemberEntity, 123)));
+            }});
+    }
+    
+    @Test
+    public void testSubscribeToMembersReceivesEventsForDynamicallyAddedMembers() {
+        loc.subscriptions().subscribeToMembers(observedGroup, TestEntity.SEQUENCE, listener);
+        
+        final TestEntity observedMemberEntity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        observedGroup.addMember(observedMemberEntity2);
+        observedMemberEntity2.sensors().set(TestEntity.SEQUENCE, 123);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedMemberEntity2, 123)));
+            }});
+    }
+    
+    @Test(groups="Integration")
+    public void testSubscribeToMembersIgnoresEventsForDynamicallyRemovedMembers() {
+        loc.subscriptions().subscribeToMembers(observedGroup, TestEntity.SEQUENCE, listener);
+        
+        observedGroup.removeMember(observedMemberEntity);
+        
+        observedMemberEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of());
+            }});
+    }
+    
+    @Test
+    public void testUnsubscribeRemovesAllSubscriptionsForThatEntity() {
+        loc.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
+        loc.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+        loc.subscriptions().subscribe(observedEntity, TestEntity.MY_NOTIF, listener);
+        loc.subscriptions().subscribe(otherEntity, TestEntity.SEQUENCE, listener);
+        loc.subscriptions().unsubscribe(observedEntity);
+        
+        observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        observedEntity.sensors().set(TestEntity.NAME, "myname");
+        observedEntity.sensors().emit(TestEntity.MY_NOTIF, 123);
+        otherEntity.sensors().set(TestEntity.SEQUENCE, 456);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456)));
+            }});
+    }
+    
+    @Test
+    public void testUnsubscribeUsingHandleStopsEvents() {
+        SubscriptionHandle handle1 = loc.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
+        SubscriptionHandle handle2 = loc.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
+        SubscriptionHandle handle3 = loc.subscriptions().subscribe(otherEntity, TestEntity.SEQUENCE, listener);
+        
+        loc.subscriptions().unsubscribe(observedEntity, handle2);
+        
+        observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        observedEntity.sensors().set(TestEntity.NAME, "myname");
+        otherEntity.sensors().set(TestEntity.SEQUENCE, 456);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123),
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456)));
+            }});
+    }
+    
+    @Test
+    public void testSubscriptionReceivesEventsInOrder() {
+        final int NUM_EVENTS = 100;
+        loc.subscriptions().subscribe(observedEntity, TestEntity.MY_NOTIF, listener);
+
+        for (int i = 0; i < NUM_EVENTS; i++) {
+            observedEntity.sensors().emit(TestEntity.MY_NOTIF, i);
+        }
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(Iterables.size(listener.getEvents()), NUM_EVENTS);
+                for (int i = 0; i < NUM_EVENTS; i++) {
+                    assertEquals(Iterables.get(listener.getEvents(), i).getValue(), i);
+                }
+            }});
+    }
+
+}