You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:48:40 UTC
[10/50] brooklyn-server git commit: Issue 327: fix
ConcurrentModificationException in subscriptions
Issue 327: fix ConcurrentModificationException in subscriptions
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/cfce4732
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/cfce4732
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/cfce4732
Branch: refs/heads/0.4.0
Commit: cfce47322c63fcf023b90223de36a9e3537527d6
Parents: 114e99f
Author: Aled Sage <al...@gmail.com>
Authored: Thu Sep 27 23:34:44 2012 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Thu Sep 27 23:34:44 2012 +0100
----------------------------------------------------------------------
.../internal/LocalSubscriptionManager.java | 2 +-
.../LocalSubscriptionManagerTest.groovy | 91 -----------
.../internal/LocalSubscriptionManagerTest.java | 154 +++++++++++++++++++
3 files changed, 155 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cfce4732/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java b/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java
index cfe49e8..7d48850 100644
--- a/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java
@@ -207,7 +207,7 @@ public class LocalSubscriptionManager implements SubscriptionManager {
return (Set<SubscriptionHandle>) ((Set<?>) elvis(subscriptionsBySubscriber.get(subscriber), Collections.emptySet()));
}
- public Set<SubscriptionHandle> getSubscriptionsForEntitySensor(Entity source, Sensor<?> sensor) {
+ public synchronized Set<SubscriptionHandle> getSubscriptionsForEntitySensor(Entity source, Sensor<?> sensor) {
Set<SubscriptionHandle> subscriptions = new LinkedHashSet<SubscriptionHandle>();
subscriptions.addAll(elvis(subscriptionsByToken.get(makeEntitySensorToken(source, sensor)), Collections.emptySet()));
subscriptions.addAll(elvis(subscriptionsByToken.get(makeEntitySensorToken(null, sensor)), Collections.emptySet()));
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cfce4732/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy b/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy
deleted file mode 100644
index d64d368..0000000
--- a/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy
+++ /dev/null
@@ -1,91 +0,0 @@
-package brooklyn.management.internal;
-
-import static org.testng.Assert.*
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-
-import org.testng.annotations.Test
-
-import brooklyn.entity.basic.AbstractGroup
-import brooklyn.event.SensorEvent
-import brooklyn.event.SensorEventListener
-import brooklyn.test.entity.TestApplication
-import brooklyn.test.entity.TestEntity
-
-/**
- * testing the {@link SubscriptionManager} and associated classes.
- */
-public class LocalSubscriptionManagerTest {
-
- private static final int TIMEOUT_MS = 5000;
-
- @Test
- public void testSubscribeToEntityAttributeChange() {
- TestApplication app = new TestApplication()
- TestEntity entity = new TestEntity([owner:app])
- CountDownLatch latch = new CountDownLatch(1)
- app.subscribe(entity, TestEntity.SEQUENCE, { latch.countDown() } as SensorEventListener)
- entity.setSequenceValue(1234)
- if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
- fail "Timeout waiting for Event on TestEntity listener"
- }
- }
-
- @Test
- public void testSubscribeToEntityWithAttributeWildcard() {
- TestApplication app = new TestApplication()
- TestEntity entity = new TestEntity([owner:app])
- CountDownLatch latch = new CountDownLatch(1)
- app.subscribe(entity, null, { latch.countDown() } as SensorEventListener)
- entity.setSequenceValue(1234)
- if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
- fail "Timeout waiting for Event on TestEntity listener"
- }
- }
-
- @Test
- public void testSubscribeToAttributeChangeWithEntityWildcard() {
- TestApplication app = new TestApplication()
- TestEntity entity = new TestEntity([owner:app])
- CountDownLatch latch = new CountDownLatch(1)
- app.subscribe(null, TestEntity.SEQUENCE, { latch.countDown() } as SensorEventListener)
- entity.setSequenceValue(1234)
- if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
- fail "Timeout waiting for Event on TestEntity listener"
- }
- }
-
- @Test
- public void testSubscribeToChildAttributeChange() {
- TestApplication app = new TestApplication()
- TestEntity child = new TestEntity([owner:app])
- CountDownLatch latch = new CountDownLatch(1)
- app.subscribeToChildren(app, TestEntity.SEQUENCE, { latch.countDown() } as SensorEventListener)
- child.setSequenceValue(1234)
- if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
- fail "Timeout waiting for Event on child TestEntity listener"
- }
- }
-
- @Test
- public void testSubscribeToMemberAttributeChange() {
- TestApplication app = new TestApplication()
- AbstractGroup group = new AbstractGroup([owner:app]) {}
- TestEntity member = new TestEntity([owner:app])
- group.addMember(member);
-
- List<SensorEvent<Integer>> events = []
- CountDownLatch latch = new CountDownLatch(1)
- app.subscribeToMembers(group, TestEntity.SEQUENCE, { events.add(it); latch.countDown() } as SensorEventListener)
- member.setAttribute(TestEntity.SEQUENCE, 123)
-
- if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
- fail "Timeout waiting for Event on parent TestEntity listener"
- }
- assertEquals(events.size(), 1)
- assertEquals(events.getAt(0).value, 123)
- assertEquals(events.getAt(0).sensor, TestEntity.SEQUENCE)
- assertEquals(events.getAt(0).source.id, member.id)
- }
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cfce4732/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java b/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java
new file mode 100644
index 0000000..8eda1d8
--- /dev/null
+++ b/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java
@@ -0,0 +1,154 @@
+package brooklyn.management.internal;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.BasicGroup;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.management.SubscriptionHandle;
+import brooklyn.management.SubscriptionManager;
+import brooklyn.test.entity.TestApplication;
+import brooklyn.test.entity.TestEntity;
+
+/**
+ * testing the {@link SubscriptionManager} and associated classes.
+ */
+public class LocalSubscriptionManagerTest {
+
+ private static final int TIMEOUT_MS = 5000;
+
+ private TestApplication app;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ app = new TestApplication();
+ }
+
+ @Test
+ public void testSubscribeToEntityAttributeChange() throws Exception {
+ TestEntity entity = new TestEntity(app);
+ final CountDownLatch latch = new CountDownLatch(1);
+ app.subscribe(entity, TestEntity.SEQUENCE, new SensorEventListener<Object>() {
+ @Override public void onEvent(SensorEvent<Object> event) {
+ latch.countDown();
+ }});
+ entity.setSequenceValue(1234);
+ if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+ fail("Timeout waiting for Event on TestEntity listener");
+ }
+ }
+
+ @Test
+ public void testSubscribeToEntityWithAttributeWildcard() throws Exception {
+ TestEntity entity = new TestEntity(app);
+ final CountDownLatch latch = new CountDownLatch(1);
+ app.subscribe(entity, null, new SensorEventListener<Object>() {
+ @Override public void onEvent(SensorEvent<Object> event) {
+ latch.countDown();
+ }});
+ entity.setSequenceValue(1234);
+ if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+ fail("Timeout waiting for Event on TestEntity listener");
+ }
+ }
+
+ @Test
+ public void testSubscribeToAttributeChangeWithEntityWildcard() throws Exception {
+ TestEntity entity = new TestEntity(app);
+ final CountDownLatch latch = new CountDownLatch(1);
+ app.subscribe(null, TestEntity.SEQUENCE, new SensorEventListener<Object>() {
+ @Override public void onEvent(SensorEvent<Object> event) {
+ latch.countDown();
+ }});
+ entity.setSequenceValue(1234);
+ if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+ fail("Timeout waiting for Event on TestEntity listener");
+ }
+ }
+
+ @Test
+ public void testSubscribeToChildAttributeChange() throws Exception {
+ TestEntity child = new TestEntity(app);
+ final CountDownLatch latch = new CountDownLatch(1);
+ app.subscribeToChildren(app, TestEntity.SEQUENCE, new SensorEventListener<Object>() {
+ @Override public void onEvent(SensorEvent<Object> event) {
+ latch.countDown();
+ }});
+ child.setSequenceValue(1234);
+ if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+ fail("Timeout waiting for Event on child TestEntity listener");
+ }
+ }
+
+ @Test
+ public void testSubscribeToMemberAttributeChange() throws Exception {
+ BasicGroup group = new BasicGroup(app);
+ TestEntity member = new TestEntity(app);
+ group.addMember(member);
+
+ final List<SensorEvent<Integer>> events = new CopyOnWriteArrayList<SensorEvent<Integer>>();
+ final CountDownLatch latch = new CountDownLatch(1);
+ app.subscribeToMembers(group, TestEntity.SEQUENCE, new SensorEventListener<Integer>() {
+ @Override public void onEvent(SensorEvent<Integer> event) {
+ events.add(event);
+ latch.countDown();
+ }});
+ member.setAttribute(TestEntity.SEQUENCE, 123);
+
+ if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+ fail("Timeout waiting for Event on parent TestEntity listener");
+ }
+ assertEquals(events.size(), 1);
+ assertEquals(events.get(0).getValue(), (Integer)123);
+ assertEquals(events.get(0).getSensor(), TestEntity.SEQUENCE);
+ assertEquals(events.get(0).getSource().getId(), member.getId());
+ }
+
+ // Regression test for ConcurrentModificationException in issue #327
+ @Test(groups="Integration")
+ public void testConcurrentSubscribingAndPublishing() throws Exception {
+ final AtomicReference<Exception> threadException = new AtomicReference<Exception>();
+ TestEntity entity = new TestEntity(app);
+
+ // Repeatedly subscribe and unsubscribe, so listener-set constantly changing while publishing to it.
+ // First create a stable listener so it is always the same listener-set object.
+ Thread thread = new Thread() {
+ public void run() {
+ try {
+ SensorEventListener<Object> noopListener = new SensorEventListener<Object>() {
+ @Override public void onEvent(SensorEvent<Object> event) {
+ }
+ };
+ app.getSubscriptionContext().subscribe(null, TestEntity.SEQUENCE, noopListener);
+ while (!Thread.currentThread().isInterrupted()) {
+ SubscriptionHandle handle = app.getSubscriptionContext().subscribe(null, TestEntity.SEQUENCE, noopListener);
+ app.getSubscriptionContext().unsubscribe(handle);
+ }
+ } catch (Exception e) {
+ threadException.set(e);
+ }
+ }
+ };
+
+ try {
+ thread.start();
+ for (int i = 0; i < 10000; i++) {
+ entity.setAttribute(TestEntity.SEQUENCE, i);
+ }
+ } finally {
+ thread.interrupt();
+ }
+
+ if (threadException.get() != null) throw threadException.get();
+ }
+}