You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:48:40 UTC

[10/50] brooklyn-server git commit: Issue 327: fix ConcurrentModificationException in subscriptions

Issue 327: fix ConcurrentModificationException in subscriptions

Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/cfce4732
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/cfce4732
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/cfce4732

Branch: refs/heads/0.4.0
Commit: cfce47322c63fcf023b90223de36a9e3537527d6
Parents: 114e99f
Author: Aled Sage <al...@gmail.com>
Authored: Thu Sep 27 23:34:44 2012 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Thu Sep 27 23:34:44 2012 +0100

----------------------------------------------------------------------
 .../internal/LocalSubscriptionManager.java      |   2 +-
 .../LocalSubscriptionManagerTest.groovy         |  91 -----------
 .../internal/LocalSubscriptionManagerTest.java  | 154 +++++++++++++++++++
 3 files changed, 155 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cfce4732/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java b/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java
index cfe49e8..7d48850 100644
--- a/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java
@@ -207,7 +207,7 @@ public class LocalSubscriptionManager implements SubscriptionManager {
         return (Set<SubscriptionHandle>) ((Set<?>) elvis(subscriptionsBySubscriber.get(subscriber), Collections.emptySet()));
     }
 
-    public Set<SubscriptionHandle> getSubscriptionsForEntitySensor(Entity source, Sensor<?> sensor) {
+    public synchronized Set<SubscriptionHandle> getSubscriptionsForEntitySensor(Entity source, Sensor<?> sensor) {
         Set<SubscriptionHandle> subscriptions = new LinkedHashSet<SubscriptionHandle>();
         subscriptions.addAll(elvis(subscriptionsByToken.get(makeEntitySensorToken(source, sensor)), Collections.emptySet()));
         subscriptions.addAll(elvis(subscriptionsByToken.get(makeEntitySensorToken(null, sensor)), Collections.emptySet()));

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cfce4732/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy b/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy
deleted file mode 100644
index d64d368..0000000
--- a/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy
+++ /dev/null
@@ -1,91 +0,0 @@
-package brooklyn.management.internal;
-
-import static org.testng.Assert.*
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-
-import org.testng.annotations.Test
-
-import brooklyn.entity.basic.AbstractGroup
-import brooklyn.event.SensorEvent
-import brooklyn.event.SensorEventListener
-import brooklyn.test.entity.TestApplication
-import brooklyn.test.entity.TestEntity
-
-/**
- * testing the {@link SubscriptionManager} and associated classes.
- */
-public class LocalSubscriptionManagerTest {
-    
-    private static final int TIMEOUT_MS = 5000;
-    
-    @Test
-    public void testSubscribeToEntityAttributeChange() {
-        TestApplication app = new TestApplication()
-        TestEntity entity = new TestEntity([owner:app])
-        CountDownLatch latch = new CountDownLatch(1)
-        app.subscribe(entity, TestEntity.SEQUENCE, { latch.countDown() } as SensorEventListener) 
-        entity.setSequenceValue(1234)
-        if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
-            fail "Timeout waiting for Event on TestEntity listener"
-        }
-    }
-    
-    @Test
-    public void testSubscribeToEntityWithAttributeWildcard() {
-        TestApplication app = new TestApplication()
-        TestEntity entity = new TestEntity([owner:app])
-        CountDownLatch latch = new CountDownLatch(1)
-        app.subscribe(entity, null, { latch.countDown() } as SensorEventListener) 
-        entity.setSequenceValue(1234)
-        if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
-            fail "Timeout waiting for Event on TestEntity listener"
-        }
-    }
-    
-    @Test
-    public void testSubscribeToAttributeChangeWithEntityWildcard() {
-        TestApplication app = new TestApplication()
-        TestEntity entity = new TestEntity([owner:app])
-        CountDownLatch latch = new CountDownLatch(1)
-        app.subscribe(null, TestEntity.SEQUENCE, { latch.countDown() } as SensorEventListener) 
-        entity.setSequenceValue(1234)
-        if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
-            fail "Timeout waiting for Event on TestEntity listener"
-        }
-    }
-    
-    @Test
-    public void testSubscribeToChildAttributeChange() {
-        TestApplication app = new TestApplication()
-        TestEntity child = new TestEntity([owner:app])
-        CountDownLatch latch = new CountDownLatch(1)
-        app.subscribeToChildren(app, TestEntity.SEQUENCE, { latch.countDown() } as SensorEventListener) 
-        child.setSequenceValue(1234)
-        if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
-            fail "Timeout waiting for Event on child TestEntity listener"
-        }
-    }
-    
-    @Test
-    public void testSubscribeToMemberAttributeChange() {
-        TestApplication app = new TestApplication()
-        AbstractGroup group = new AbstractGroup([owner:app]) {}
-        TestEntity member = new TestEntity([owner:app])
-        group.addMember(member);
-
-        List<SensorEvent<Integer>> events = []
-        CountDownLatch latch = new CountDownLatch(1)
-        app.subscribeToMembers(group, TestEntity.SEQUENCE, { events.add(it); latch.countDown() } as SensorEventListener)
-        member.setAttribute(TestEntity.SEQUENCE, 123)
-
-        if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
-            fail "Timeout waiting for Event on parent TestEntity listener"
-        }
-        assertEquals(events.size(), 1)
-        assertEquals(events.getAt(0).value, 123)
-        assertEquals(events.getAt(0).sensor, TestEntity.SEQUENCE)
-        assertEquals(events.getAt(0).source.id, member.id)
-    }
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cfce4732/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java b/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java
new file mode 100644
index 0000000..8eda1d8
--- /dev/null
+++ b/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java
@@ -0,0 +1,154 @@
+package brooklyn.management.internal;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.BasicGroup;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.management.SubscriptionHandle;
+import brooklyn.management.SubscriptionManager;
+import brooklyn.test.entity.TestApplication;
+import brooklyn.test.entity.TestEntity;
+
+/**
+ * testing the {@link SubscriptionManager} and associated classes.
+ */
+public class LocalSubscriptionManagerTest {
+    
+    private static final int TIMEOUT_MS = 5000;
+    
+    private TestApplication app;
+    
+    @BeforeMethod
+    public void setUp() throws Exception {
+        app = new TestApplication();
+    }
+    
+    @Test
+    public void testSubscribeToEntityAttributeChange() throws Exception {
+        TestEntity entity = new TestEntity(app);
+        final CountDownLatch latch = new CountDownLatch(1);
+        app.subscribe(entity, TestEntity.SEQUENCE, new SensorEventListener<Object>() {
+                @Override public void onEvent(SensorEvent<Object> event) {
+                    latch.countDown();
+                }});
+        entity.setSequenceValue(1234);
+        if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+            fail("Timeout waiting for Event on TestEntity listener");
+        }
+    }
+    
+    @Test
+    public void testSubscribeToEntityWithAttributeWildcard() throws Exception {
+        TestEntity entity = new TestEntity(app);
+        final CountDownLatch latch = new CountDownLatch(1);
+        app.subscribe(entity, null, new SensorEventListener<Object>() {
+            @Override public void onEvent(SensorEvent<Object> event) {
+                latch.countDown();
+            }});
+        entity.setSequenceValue(1234);
+        if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+            fail("Timeout waiting for Event on TestEntity listener");
+        }
+    }
+    
+    @Test
+    public void testSubscribeToAttributeChangeWithEntityWildcard() throws Exception {
+        TestEntity entity = new TestEntity(app);
+        final CountDownLatch latch = new CountDownLatch(1);
+        app.subscribe(null, TestEntity.SEQUENCE, new SensorEventListener<Object>() {
+                @Override public void onEvent(SensorEvent<Object> event) {
+                    latch.countDown();
+                }});
+        entity.setSequenceValue(1234);
+        if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+            fail("Timeout waiting for Event on TestEntity listener");
+        }
+    }
+    
+    @Test
+    public void testSubscribeToChildAttributeChange() throws Exception {
+        TestEntity child = new TestEntity(app);
+        final CountDownLatch latch = new CountDownLatch(1);
+        app.subscribeToChildren(app, TestEntity.SEQUENCE, new SensorEventListener<Object>() {
+            @Override public void onEvent(SensorEvent<Object> event) {
+                latch.countDown();
+            }});
+        child.setSequenceValue(1234);
+        if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+            fail("Timeout waiting for Event on child TestEntity listener");
+        }
+    }
+    
+    @Test
+    public void testSubscribeToMemberAttributeChange() throws Exception {
+        BasicGroup group = new BasicGroup(app);
+        TestEntity member = new TestEntity(app);
+        group.addMember(member);
+
+        final List<SensorEvent<Integer>> events = new CopyOnWriteArrayList<SensorEvent<Integer>>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        app.subscribeToMembers(group, TestEntity.SEQUENCE, new SensorEventListener<Integer>() {
+            @Override public void onEvent(SensorEvent<Integer> event) {
+                events.add(event);
+                latch.countDown();
+            }});
+        member.setAttribute(TestEntity.SEQUENCE, 123);
+
+        if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+            fail("Timeout waiting for Event on parent TestEntity listener");
+        }
+        assertEquals(events.size(), 1);
+        assertEquals(events.get(0).getValue(), (Integer)123);
+        assertEquals(events.get(0).getSensor(), TestEntity.SEQUENCE);
+        assertEquals(events.get(0).getSource().getId(), member.getId());
+    }
+    
+    // Regression test for ConcurrentModificationException in issue #327
+    @Test(groups="Integration")
+    public void testConcurrentSubscribingAndPublishing() throws Exception {
+        final AtomicReference<Exception> threadException = new AtomicReference<Exception>();
+        TestEntity entity = new TestEntity(app);
+        
+        // Repeatedly subscribe and unsubscribe, so listener-set constantly changing while publishing to it.
+        // First create a stable listener so it is always the same listener-set object.
+        Thread thread = new Thread() {
+            public void run() {
+                try {
+                    SensorEventListener<Object> noopListener = new SensorEventListener<Object>() {
+                        @Override public void onEvent(SensorEvent<Object> event) {
+                        }
+                    };
+                    app.getSubscriptionContext().subscribe(null, TestEntity.SEQUENCE, noopListener);
+                    while (!Thread.currentThread().isInterrupted()) {
+                        SubscriptionHandle handle = app.getSubscriptionContext().subscribe(null, TestEntity.SEQUENCE, noopListener);
+                        app.getSubscriptionContext().unsubscribe(handle);
+                    }
+                } catch (Exception e) {
+                    threadException.set(e);
+                }
+            }
+        };
+        
+        try {
+            thread.start();
+            for (int i = 0; i < 10000; i++) {
+                entity.setAttribute(TestEntity.SEQUENCE, i);
+            }
+        } finally {
+            thread.interrupt();
+        }
+
+        if (threadException.get() != null) throw threadException.get();
+    }
+}