You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/09/21 18:05:43 UTC

[6/7] incubator-brooklyn git commit: subscriptions: support notifyOfInitialValue

subscriptions: support notifyOfInitialValue

If pass in notifyOfInitialValue=true when subscribing to a single
entity:attribute, then the listener will be called with the current
value (rather than waiting for the first change).


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7ee7d410
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7ee7d410
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7ee7d410

Branch: refs/heads/master
Commit: 7ee7d41026d180aeb4cf9206f86b7b0a7eb263d4
Parents: 0d9b3a8
Author: Aled Sage <al...@gmail.com>
Authored: Fri Sep 18 19:55:29 2015 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Sep 21 14:33:20 2015 +0100

----------------------------------------------------------------------
 .../apache/brooklyn/api/entity/EntityLocal.java | 13 +++++-
 .../brooklyn/api/mgmt/SubscriptionContext.java  |  2 +-
 .../brooklyn/core/entity/AbstractEntity.java    |  7 ++++
 .../mgmt/internal/BasicSubscriptionContext.java |  4 +-
 .../mgmt/internal/LocalSubscriptionManager.java | 41 ++++++++++++++++++-
 .../core/mgmt/internal/SubscriptionTracker.java | 10 ++++-
 .../core/objs/AbstractEntityAdjunct.java        |  8 ++++
 .../brooklyn/enricher/stock/UpdatingMap.java    |  4 +-
 .../core/entity/EntitySubscriptionTest.java     | 43 ++++++++++++++++++++
 .../policy/basic/PolicySubscriptionTest.java    | 30 ++++++++++++++
 10 files changed, 153 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java b/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java
index 5533949..7e5e963 100644
--- a/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java
+++ b/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java
@@ -115,7 +115,18 @@ public interface EntityLocal extends Entity {
     // FIXME remove from interface?
     @Beta
     <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
- 
+
+    /**
+     * Allow us to subscribe to data from a {@link Sensor} on another entity.
+     * 
+     * @return a subscription id which can be used to unsubscribe
+     *
+     * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)
+     */
+    // FIXME remove from interface?
+    @Beta
+    <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
+
     /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */
     // FIXME remove from interface?
     @Beta

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java
index 7b4e6e7..3328b1a 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java
@@ -34,7 +34,7 @@ public interface SubscriptionContext {
     /**
      * As {@link SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)} with default subscription parameters for this context
      */
-    <T> SubscriptionHandle subscribe(Map<String, Object> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
+    <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);
  
     /** @see #subscribe(Map, Entity, Sensor, SensorEventListener) */
     <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index 5dc110d..03254dd 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -1337,6 +1337,13 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
         return getSubscriptionTracker().subscribe(producer, sensor, listener);
     }
 
+    /** @see EntityLocal#subscribe */
+    @Override
+    @Beta
+    public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        return getSubscriptionTracker().subscribe(flags, producer, sensor, listener);
+    }
+
     /** @see EntityLocal#subscribeToChildren */
     @Override
     public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
index 5c38b81..d821c4e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
@@ -70,7 +70,7 @@ public class BasicSubscriptionContext implements SubscriptionContext {
     }
     
     @SuppressWarnings("rawtypes")
-    public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, Closure c) {
+    public <T> SubscriptionHandle subscribe(Map<String, ?> newFlags, Entity producer, Sensor<T> sensor, Closure c) {
         return subscribe(newFlags, producer, sensor, toSensorEventListener(c));        
     }
 
@@ -80,7 +80,7 @@ public class BasicSubscriptionContext implements SubscriptionContext {
     }
     
     @Override
-    public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+    public <T> SubscriptionHandle subscribe(Map<String, ?> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
         Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
         if (newFlags != null) subscriptionFlags.putAll(newFlags);
         return manager.subscribe(subscriptionFlags, producer, sensor, listener);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 6ea94a1..7743995 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -37,10 +37,13 @@ import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.ExecutionManager;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.SubscriptionManager;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.task.BasicExecutionManager;
 import org.apache.brooklyn.util.core.task.SingleThreadedScheduler;
 import org.apache.brooklyn.util.text.Identifiers;
@@ -90,7 +93,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
     }
     
     @SuppressWarnings("unchecked")
-    protected synchronized <T> SubscriptionHandle subscribe(Map<String, Object> flags, Subscription<T> s) {
+    protected synchronized <T> SubscriptionHandle subscribe(Map<String, Object> flags, final Subscription<T> s) {
         Entity producer = s.producer;
         Sensor<T> sensor= s.sensor;
         s.subscriber = getSubscriber(flags, s);
@@ -105,6 +108,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
             s.subscriberExecutionManagerTagSupplied = false;
         }
         s.eventFilter = (Predicate<SensorEvent<T>>) flags.remove("eventFilter");
+        boolean notifyOfInitialValue = Boolean.TRUE.equals(flags.remove("notifyOfInitialValue"));
         s.flags = flags;
         
         if (LOG.isDebugEnabled()) LOG.debug("Creating subscription {} for {} on {} {} in {}", new Object[] {s.id, s.subscriber, producer, sensor, this});
@@ -116,6 +120,41 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
         if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) {
             ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class);
         }
+
+        if (notifyOfInitialValue) {
+            if (producer == null) {
+                LOG.warn("Cannot notifyOfInitialValue for subscription with wildcard producer: "+s);
+            } else if (sensor == null) {
+                LOG.warn("Cannot notifyOfInitialValue for subscription with wilcard sensor: "+s);
+            } else if (!(sensor instanceof AttributeSensor)) {
+                LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
+            } else {
+                if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
+                Map<String, Object> tagsMap = MutableMap.of("tag", s.subscriberExecutionManagerTag);
+                em.submit(tagsMap, new Runnable() {
+                    @Override
+                    public String toString() {
+                        return "LSM.publishInitialValue("+s.producer+", "+s.sensor+")";
+                    }
+                    public void run() {
+                        Object val = s.producer.getAttribute((AttributeSensor<?>) s.sensor);
+                        @SuppressWarnings("rawtypes") // TODO s.listener.onEvent gives compilation error if try to use <T>
+                        SensorEvent event = new BasicSensorEvent(s.sensor, s.producer, val);
+                        if (s.eventFilter!=null && !s.eventFilter.apply(event))
+                            return;
+                        try {
+                            s.listener.onEvent(event);
+                        } catch (Throwable t) {
+                            if (event!=null && event.getSource()!=null && Entities.isNoLongerManaged(event.getSource())) {
+                                LOG.debug("Error processing initial-value subscription to "+LocalSubscriptionManager.this+", after entity unmanaged: "+t, t);
+                            } else {
+                                LOG.warn("Error processing initial-value subscription to "+LocalSubscriptionManager.this+": "+t, t);
+                            }
+                        }
+                    }});
+            }
+        }
+        
         return s;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java
index 2faad3a..3d5793c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java
@@ -19,6 +19,7 @@
 package org.apache.brooklyn.core.mgmt.internal;
 
 import java.util.Collection;
+import java.util.Map;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.Group;
@@ -29,6 +30,7 @@ import org.apache.brooklyn.api.sensor.SensorEventListener;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.SetMultimap;
 
 /**
@@ -57,13 +59,17 @@ public class SubscriptionTracker {
     
     /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */
     public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
-        SubscriptionHandle handle = context.subscribe(producer, sensor, listener);
+        return subscribe(ImmutableMap.<String, Object>of(), producer, sensor, listener);
+    }
+
+    public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        SubscriptionHandle handle = context.subscribe(flags, producer, sensor, listener);
         synchronized (subscriptions) {
             subscriptions.put(producer, handle);
         }
         return handle;
     }
-    
+
     /** @see SubscriptionContext#subscribeToChildren(Entity, Sensor, SensorEventListener) */
     public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
         SubscriptionHandle handle = context.subscribeToChildren(parent, sensor, listener);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index e85cc73..fb71901 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -392,6 +392,14 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
     }
 
     @VisibleForTesting //intended as protected, meant for subclasses
+    @Beta
+    /** @see SubscriptionContext#subscribe(Map, Entity, Sensor, SensorEventListener) */
+    public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        if (!checkCanSubscribe()) return null;
+        return getSubscriptionTracker().subscribe(flags, producer, sensor, listener);
+    }
+
+    @VisibleForTesting //intended as protected, meant for subclasses
     /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */
     public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) {
         if (!checkCanSubscribe(producerGroup)) return null;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
index b09b6d6..43aec92 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.reflect.TypeToken;
 
@@ -102,8 +103,7 @@ public class UpdatingMap<S,TKey,TVal> extends AbstractEnricher implements Sensor
         this.computing = (Function) getRequiredConfig(COMPUTING);
         this.removingIfResultIsNull = getConfig(REMOVING_IF_RESULT_IS_NULL);
 
-        subscribe(entity, sourceSensor, this);
-        onUpdated();
+        subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, sourceSensor, this);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index 8b8d244..620d8e0 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -34,12 +34,15 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
 public class EntitySubscriptionTest {
 
     // TODO Duplication between this and PolicySubscriptionTest
     
+    private static final long SHORT_WAIT_MS = 100;
+
     private SimulatedLocation loc;
     private TestApplication app;
     private TestEntity entity;
@@ -237,4 +240,44 @@ public class EntitySubscriptionTest {
                 }
             }});
     }
+
+    @Test
+    public void testSubscriptionReceivesInitialValueEvents() {
+        observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        observedEntity.sensors().set(TestEntity.NAME, "myname");
+        
+        entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.SEQUENCE, listener);
+        entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123),
+                        new BasicSensorEvent<String>(TestEntity.NAME, observedEntity, "myname")));
+            }});
+    }
+
+    
+    @Test
+    public void testSubscriptionNotReceivesInitialValueEventsByDefault() {
+        observedEntity.sensors().set(TestEntity.SEQUENCE, 123);
+        observedEntity.sensors().set(TestEntity.NAME, "myname");
+        
+        entity.subscribe(observedEntity, TestEntity.SEQUENCE, listener);
+        entity.subscribe(observedEntity, TestEntity.NAME, listener);
+        
+        Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of());
+            }});
+    }
+
+    // TODO A visual inspection test that we get a log.warn telling us we can't get the initial-value
+    @Test
+    public void testSubscriptionForInitialValueWhenNotValid() {
+        entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.MY_NOTIF, listener);
+        entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, null, listener);
+        entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, TestEntity.NAME, listener);
+        entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, null, listener);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
index fa7333f..ab5bc4a 100644
--- a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
@@ -33,6 +33,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
 
@@ -119,5 +120,34 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
                         new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456)));
             }});
     }
+
+    @Test
+    public void testSubscriptionReceivesInitialValueEvents() {
+        entity.sensors().set(TestEntity.SEQUENCE, 123);
+        entity.sensors().set(TestEntity.NAME, "myname");
+        
+        policy.subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener);
+        policy.subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123),
+                        new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")));
+            }});
+    }
     
+    @Test
+    public void testSubscriptionNotReceivesInitialValueEventsByDefault() {
+        entity.sensors().set(TestEntity.SEQUENCE, 123);
+        entity.sensors().set(TestEntity.NAME, "myname");
+        
+        policy.subscribe(entity, TestEntity.SEQUENCE, listener);
+        policy.subscribe(entity, TestEntity.NAME, listener);
+        
+        Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() {
+            @Override public void run() {
+                assertEquals(listener.getEvents(), ImmutableList.of());
+            }});
+    }
 }