You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/06 08:06:45 UTC
[21/23] brooklyn-server git commit: fix message publish synching to
guarantee in-order delivery
fix message publish synching to guarantee in-order delivery
both for initial subscriptions and in-life - two changes, synching in AttributesInternal for the in-life delivery, and in subscribe/publish for the initial
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/9213f0e2
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/9213f0e2
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/9213f0e2
Branch: refs/heads/master
Commit: 9213f0e25288114718f2a06a50ee206bf0120561
Parents: 508183b
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Oct 4 13:49:42 2017 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:39 2017 +0100
----------------------------------------------------------------------
.../brooklyn/api/mgmt/SubscriptionManager.java | 6 +-
.../brooklyn/core/entity/AbstractEntity.java | 1 +
.../mgmt/internal/LocalSubscriptionManager.java | 108 +++++++++++++------
.../brooklyn/core/sensor/AttributeMap.java | 15 ++-
.../core/effector/EffectorSayHiTest.java | 2 +-
.../core/entity/EntitySubscriptionTest.java | 9 +-
.../internal/LocalSubscriptionManagerTest.java | 91 ++++++++++++++++
.../policy/basic/PolicySubscriptionTest.java | 35 ++++--
8 files changed, 224 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
index 1fa327e..8302ba8 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
@@ -42,8 +42,10 @@ public interface SubscriptionManager {
*
* The method returns an id which can be used to {@link #unsubscribe(SubscriptionHandle)} later.
* <p>
- * The listener callback is in-order single-threaded and synchronized on this object. The flags
- * parameters can include the following:
+ * The listener callback is in-order single-threaded and synchronized on this object.
+ * In other words message delivery from a producer to a given subscriber is in publish order
+ * (or in the case of a late subscriber getting initial values, in subscribe order).
+ * The flags parameters can include the following:
* <ul>
* <li>subscriber - object to identify the subscriber (e.g. entity, or console session uid)
* <li><i>in future</i> - control parameters for the subscription (period, minimum delta for updates, etc)
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index bd7df06..dfbbc8f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -529,6 +529,7 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
* through this method. Internally, all attribute updates synch on this object. Code wishing to
* update attributes or publish while holding some other lock should acquire the monitor on this
* object first to prevent deadlock. */
+ @Beta
protected Object getAttributesSynchObjectInternal() {
return attributesInternal.getSynchObjectInternal();
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index a927a89..a726059 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -54,6 +54,7 @@ import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
@@ -129,15 +130,9 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
if (LOG.isDebugEnabled()) LOG.debug("Creating subscription {} for {} on {} {} in {}", new Object[] {s.id, s.subscriber, producer, sensor, this});
allSubscriptions.put(s.id, s);
- addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor), s);
- if (s.subscriber!=null) {
- addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s);
- }
- if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) {
- ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class);
- }
-
+ T lastVal;
if (notifyOfInitialValue) {
+ notifyOfInitialValue = false;
if (producer == null) {
LOG.warn("Cannot notifyOfInitialValue for subscription with wildcard producer: "+s);
} else if (sensor == null) {
@@ -145,16 +140,58 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
} else if (!(sensor instanceof AttributeSensor)) {
LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s);
} else {
- if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
- em.submit(
- MutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(s.producer), BrooklynTaskTags.SENSOR_TAG),
- "displayName", "Initial publication of "+s.sensor.getName()),
- () -> {
- T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
- submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
- });
+ notifyOfInitialValue = true;
}
}
+ if (notifyOfInitialValue) {
+ lastVal = (T) s.producer.sensors().get((AttributeSensor<?>) s.sensor);
+ } else {
+ lastVal = null; // won't be used
+ }
+ addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor), s);
+ if (s.subscriber!=null) {
+ addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s);
+ }
+ if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) {
+ ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class);
+ }
+
+ if (notifyOfInitialValue) {
+ if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s});
+ // this is run asynchronously to prevent deadlock when trying to get attribute and publish;
+ // however we want it:
+ // (a) to run in the same order as subscriptions are made, so use the manager tag scheduler
+ // (b) ideally to use the last value that was not published to this target, and
+ // (c) to deliver before any subsequent value notification
+ // but we can't guarantee either (b) or (c) without taking a lock from before we added
+ // the subscriber above, mutexing other sets and publications, which feels heavy and dangerous.
+ // so the compromise is to skip this delivery in cases where the last value has obviously changed -
+ // because a more recent notification is guaranteed to be sent.
+ // we may occasionally still send a duplicate, if delivery got sent in the tiny
+ // window between adding the subscription and taking the last value,
+ // we will think the last value hasn't changed. but we will never send a
+ // wrong value as this backs out if there is any confusion over the last value.
+ em.submit(
+ MutableMap.of("tags", getPublishTags(s, s.producer),
+ "displayName", "Initial value publication on subscription to "+s.sensor.getName()),
+ () -> {
+ T val = (T) s.producer.sensors().get((AttributeSensor<?>) s.sensor);
+ if (!Objects.equal(lastVal, val)) {
+ // bail out - value has been changed;
+ // this might be a duplicate if value changed in small window earlier,
+ // but it won't be delivering an old value later than a newer value
+ if (LOG.isDebugEnabled()) LOG.debug("skipping initial value delivery of {} -> {} to {} as value changed from {} to {}", new Object[] {s.producer, s.sensor, s, lastVal, val});
+ return;
+ }
+ // guard against case where other thread changes the val and publish
+ // while we are publishing, and our older val is then delivered after theirs.
+ // 2017-10 previously we did not do this, then looked at doing it with the attribute lock object
+ // synchronized (((AbstractEntity)s.producer).getAttributesSynchObjectInternal()) {
+ // but realized a better thing is to have initial delivery _done_, not just submitted,
+ // by ourselves, as we are already in the right thread now and can prevent interleaving this way
+ submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true);
+ });
+ }
return s;
}
@@ -210,7 +247,6 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
// (recommend exactly one per subscription to prevent deadlock)
// this is done with:
// em.setTaskSchedulerForTag(subscriberId, SingleThreadedScheduler.class);
-
//note, generating the notifications must be done in the calling thread to preserve order
//e.g. emit(A); emit(B); should cause onEvent(A); onEvent(B) in that order
if (LOG.isTraceEnabled()) LOG.trace("{} got event {}", this, event);
@@ -228,18 +264,11 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void submitPublishEvent(final Subscription s, final SensorEvent<?> event, final boolean isInitial) {
+ private void submitPublishEvent(final Subscription s, final SensorEvent<?> event, final boolean isInitialPublicationOfOldValueInCorrectScheduledThread) {
if (s.eventFilter!=null && !s.eventFilter.apply(event))
return;
- List<Object> tags = MutableList.builder()
- .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
- .add(s.subscriberExecutionManagerTag)
- .add(BrooklynTaskTags.SENSOR_TAG)
- // associate the publish event with the publisher (though on init it might be triggered by subscriber)
- .addIfNotNull(event.getSource()!=null ? BrooklynTaskTags.tagForTargetEntity(event.getSource()) : null)
- .build()
- .asUnmodifiable();
+ List<Object> tags = getPublishTags(s, event.getSource()).asUnmodifiable();
StringBuilder name = new StringBuilder("sensor ");
StringBuilder description = new StringBuilder("Sensor ");
@@ -271,12 +300,12 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
"displayName", name.toString(),
"description", description.toString());
- boolean isEntityStarting = s.subscriber instanceof Entity && isInitial;
+ boolean isEntityStarting = s.subscriber instanceof Entity && isInitialPublicationOfOldValueInCorrectScheduledThread;
// will have entity (and adjunct) execution context from tags, so can skip getting exec context
- em.submit(execFlags, new Runnable() {
+ Runnable deliverer = new Runnable() {
@Override
public String toString() {
- if (isInitial) {
+ if (isInitialPublicationOfOldValueInCorrectScheduledThread) {
return "LSM.publishInitial("+event+")";
} else {
return "LSM.publish("+event+")";
@@ -312,7 +341,26 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager {
LOG.warn("Error processing subscriptions to "+this+": "+t, t);
}
}
- }});
+ }};
+ if (!isInitialPublicationOfOldValueInCorrectScheduledThread) {
+ em.submit(execFlags, deliverer);
+ } else {
+ // for initial, caller guarantees he is running in the right thread/context
+ // where the above submission would take place, typically the
+ // subscriber single threaded executor with the entity context;
+ // this allows caller to do extra assertions and bailout steps at the right time
+ deliverer.run();
+ }
+ }
+
+ private MutableList<Object> getPublishTags(final Subscription<?> s, final Entity source) {
+ return MutableList.builder()
+ .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+ .add(s.subscriberExecutionManagerTag)
+ .add(BrooklynTaskTags.SENSOR_TAG)
+ // associate the publish event with the publisher (though on init it might be triggered by subscriber)
+ .addIfNotNull(source!=null ? BrooklynTaskTags.tagForTargetEntity(source) : null)
+ .build();
}
protected boolean includeDescriptionForSensorTask(SensorEvent<?> event) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
index dee0700..d9da6ae 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
@@ -139,9 +139,18 @@ public final class AttributeMap {
}
public <T> T update(AttributeSensor<T> attribute, T newValue) {
- T oldValue = updateWithoutPublishing(attribute, newValue);
- entity.emitInternal(attribute, newValue);
- return oldValue;
+ // 2017-10 this was unsynched which meant if two threads updated
+ // the last publication would not correspond to the last value.
+ // could introduce deadlock but emit internal and publish should
+ // not seek any locks. _subscribe_ and _delivery_ might, but they
+ // won't be in this block. an issue with _subscribe-and-get-initial_
+ // should be resolved by initial subscription queueing the publication
+ // to a context where locks are not held.
+ synchronized (values) {
+ T oldValue = updateWithoutPublishing(attribute, newValue);
+ entity.emitInternal(attribute, newValue);
+ return oldValue;
+ }
}
public <T> T updateWithoutPublishing(AttributeSensor<T> attribute, T newValue) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
index e7dc626..4b991ad 100644
--- a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
@@ -110,7 +110,7 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport {
.get( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob");
}
- @Test(invocationCount=100)
+ @Test
public void testInvocationGetImmediately() throws Exception {
assertEquals(((EntityInternal)e).getExecutionContext()
.getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ).get(), "hi Bob");
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index ea91f36..8ec2794 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -34,6 +34,8 @@ import org.apache.brooklyn.core.test.policy.TestEnricher;
import org.apache.brooklyn.core.test.policy.TestPolicy;
import org.apache.brooklyn.entity.group.BasicGroup;
import org.apache.brooklyn.test.Asserts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -44,7 +46,9 @@ import com.google.common.collect.Iterables;
public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
- // TODO Duplication between this and PolicySubscriptionTest
+ // TODO Duplication between this and PolicySubscriptionTest and LocalSubscriptionManagerTest
+
+ private static final Logger log = LoggerFactory.getLogger(EntitySubscriptionTest.class);
private static final long SHORT_WAIT_MS = 100;
@@ -221,6 +225,7 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
}
@Test
+ @SuppressWarnings("unused")
public void testUnsubscribeUsingHandleStopsEvents() {
SubscriptionHandle handle1 = entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener);
SubscriptionHandle handle2 = entity.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener);
@@ -300,6 +305,8 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
@Test
public void testContextEntityOnSubscriptionCallbackTask() {
+ log.info("Observing "+observedEntity+" from "+entity);
+
observedEntity.sensors().set(TestEntity.NAME, "myval");
entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener);
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
index 4e47090..1373545 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.brooklyn.api.entity.EntitySpec;
@@ -32,12 +33,22 @@ import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.mgmt.SubscriptionManager;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.entity.group.BasicGroup;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
+import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
/**
* testing the {@link SubscriptionManager} and associated classes.
*/
@@ -170,4 +181,84 @@ public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport {
if (threadException.get() != null) throw threadException.get();
}
+ @Test
+ // same test as in PolicySubscriptionTest, but for entities / simpler
+ public void testSubscriptionReceivesInitialValueEventsInOrder() {
+ RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+
+ entity.sensors().set(TestEntity.NAME, "myname");
+ entity.sensors().set(TestEntity.SEQUENCE, 123);
+ entity.sensors().emit(TestEntity.MY_NOTIF, -1);
+
+ // delivery should be in subscription order, so 123 then 456
+ entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener);
+ // wait for the above delivery - otherwise it might get dropped
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> {
+ Asserts.assertSize(listener.getEvents(), 1); });
+ entity.sensors().set(TestEntity.SEQUENCE, 456);
+
+ // notifications don't have "initial value" so don't get -1
+ entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.MY_NOTIF, listener);
+ // but do get 1, after 456
+ entity.sensors().emit(TestEntity.MY_NOTIF, 1);
+
+ // STOPPING and myname received, in subscription order, after everything else
+ entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING);
+ entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener);
+ entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener);
+
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable() {
+ @Override public void run() {
+ Asserts.assertEquals(listener.getEvents(), ImmutableList.of(
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123),
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 456),
+ new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, entity, 1),
+ new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING),
+ new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")),
+ "actually got: "+listener.getEvents());
+ }});
+ }
+
+ @Test
+ public void testNotificationOrderMatchesSetValueOrderWhenSynched() {
+ RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+
+ AtomicInteger count = new AtomicInteger();
+ Runnable set = () -> {
+ synchronized (count) {
+ entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet());
+ }
+ };
+ entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE, listener);
+ for (int i=0; i<10; i++) {
+ new Thread(set).start();
+ }
+
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> {
+ Asserts.assertSize(listener.getEvents(), 10); });
+ for (int i=0; i<10; i++) {
+ Assert.assertEquals(listener.getEvents().get(i).getValue(), i+1);
+ }
+ }
+
+ @Test
+ public void testNotificationOrderMatchesSetValueOrderWhenNotSynched() {
+ RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+
+ AtomicInteger count = new AtomicInteger();
+ Runnable set = () -> {
+ // as this is not synched, the sets may interleave
+ entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet());
+ };
+ entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE, listener);
+ for (int i=0; i<10; i++) {
+ new Thread(set).start();
+ }
+
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> {
+ Asserts.assertSize(listener.getEvents(), 10); });
+ // all we expect for sure is that the last value is whatever the sensor is at the end - internal update and publish is mutexed
+ Assert.assertEquals(listener.getEvents().get(9).getValue(), entity.sensors().get(TestEntity.SEQUENCE));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
index 0f3310e..4d733e6 100644
--- a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
@@ -24,12 +24,15 @@ import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.policy.PolicySpec;
import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.location.SimulatedLocation;
import org.apache.brooklyn.core.policy.AbstractPolicy;
import org.apache.brooklyn.core.sensor.BasicSensorEvent;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -102,6 +105,7 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
}
@Test
+ @SuppressWarnings("unused")
public void testUnsubscribeUsingHandleStopsEvents() throws Exception {
SubscriptionHandle handle1 = policy.subscriptions().subscribe(entity, TestEntity.SEQUENCE, listener);
SubscriptionHandle handle2 = policy.subscriptions().subscribe(entity, TestEntity.NAME, listener);
@@ -122,18 +126,37 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
}
@Test
- public void testSubscriptionReceivesInitialValueEvents() {
- entity.sensors().set(TestEntity.SEQUENCE, 123);
+ public void testSubscriptionReceivesInitialValueEventsInOrder() {
entity.sensors().set(TestEntity.NAME, "myname");
-
+ entity.sensors().set(TestEntity.SEQUENCE, 123);
+ entity.sensors().emit(TestEntity.MY_NOTIF, -1);
+
+ // delivery should be in subscription order, so 123 then 456
policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener);
+ // wait for the above delivery - otherwise it might get dropped
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> {
+ Asserts.assertSize(listener.getEvents(), 1); });
+ entity.sensors().set(TestEntity.SEQUENCE, 456);
+
+ // notifications don't have "initial value" so don't get -1
+ policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.MY_NOTIF, listener);
+ // but do get 1, after 456
+ entity.sensors().emit(TestEntity.MY_NOTIF, 1);
+
+ // STOPPING and myname received, in subscription order, after everything else
+ entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING);
+ policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener);
policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener);
- Asserts.succeedsEventually(new Runnable() {
+ Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable() {
@Override public void run() {
assertEquals(listener.getEvents(), ImmutableList.of(
new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123),
- new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")));
+ new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 456),
+ new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, entity, 1),
+ new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING),
+ new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")),
+ "actually got: "+listener.getEvents());
}});
}
@@ -147,7 +170,7 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport {
Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() {
@Override public void run() {
- assertEquals(listener.getEvents(), ImmutableList.of());
+ Asserts.assertSize(listener.getEvents(), 0);
}});
}