You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/16 16:27:15 UTC

activemq git commit: Fix test failure in CI

Repository: activemq
Updated Branches:
  refs/heads/master 5ba867908 -> 9ac5f8347


Fix test failure in CI

Has race condition on the ArrayList it uses to track subs and 
Fix the unreliable sleep used to track locked messages in subs
Ensure Broker is shut down on test completion.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9ac5f834
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9ac5f834
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9ac5f834

Branch: refs/heads/master
Commit: 9ac5f83473a64a61f8928b678645b67c83f870a2
Parents: 5ba8679
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jun 16 12:26:50 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jun 16 12:26:50 2016 -0400

----------------------------------------------------------------------
 .../region/SubscriptionAddRemoveQueueTest.java  | 147 +++++++++----------
 1 file changed, 70 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9ac5f834/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
index 207ecda..cea40d5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
@@ -1,20 +1,4 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -32,14 +16,19 @@
  */
 package org.apache.activemq.broker.region;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.InvalidSelectorException;
 import javax.management.ObjectName;
 
@@ -61,28 +50,31 @@ import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
-import junit.framework.TestCase;
-
-public class SubscriptionAddRemoveQueueTest extends TestCase {
-
-    Queue queue;
-
-    ConsumerInfo info = new ConsumerInfo();
-    List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>();
-    ConnectionContext context = new ConnectionContext();
-    ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
-    ProducerInfo producerInfo = new ProducerInfo();
-    ProducerState producerState = new ProducerState(producerInfo);
-    ActiveMQDestination destination = new ActiveMQQueue("TEST");
-    int numSubscriptions = 1000;
-    boolean working = true;
-    int senders = 20;
-
-
-    @Override
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SubscriptionAddRemoveQueueTest {
+
+    private BrokerService brokerService;
+    private Queue queue;
+    private ConsumerInfo info = new ConsumerInfo();
+    private List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>();
+    private ConnectionContext context = new ConnectionContext();
+    private ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
+    private ProducerInfo producerInfo = new ProducerInfo();
+    private ProducerState producerState = new ProducerState(producerInfo);
+    private ActiveMQDestination destination = new ActiveMQQueue("TEST");
+    private int numSubscriptions = 1000;
+    private boolean working = true;
+    private int senders = 20;
+
+    @Before
     public void setUp() throws Exception {
-        BrokerService brokerService = new BrokerService();
+        brokerService = new BrokerService();
         brokerService.start();
+
         DestinationStatistics parentStats = new DestinationStatistics();
         parentStats.setEnabled(true);
 
@@ -99,6 +91,15 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         queue.initialize();
     }
 
+    @After
+    public void tearDown() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    @Test(timeout = 120000)
     public void testNoDispatchToRemovedConsumers() throws Exception {
         final AtomicInteger producerId = new AtomicInteger();
         Runnable sender = new Runnable() {
@@ -134,21 +135,34 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             }
         };
 
-        for (int i=0;i<numSubscriptions; i++) {
+        for (int i = 0; i < numSubscriptions; i++) {
             SimpleImmediateDispatchSubscription sub = new SimpleImmediateDispatchSubscription();
             subs.add(sub);
             queue.addSubscription(context, sub);
         }
+
         assertEquals("there are X subscriptions", numSubscriptions, queue.getDestinationStatistics().getConsumers().getCount());
         ExecutorService executor = Executors.newCachedThreadPool();
-        for (int i=0; i<senders ; i++) {
+        for (int i = 0; i < senders; i++) {
             executor.submit(sender);
         }
 
-        Thread.sleep(1000);
-        for (SimpleImmediateDispatchSubscription sub : subs) {
-            assertTrue("There are some locked messages in the subscription", hasSomeLocks(sub.dispatched));
-        }
+        assertTrue("All subs should have some locks", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                boolean allHaveLocks = true;
+
+                for (SimpleImmediateDispatchSubscription sub : subs) {
+                    if (!hasSomeLocks(sub.dispatched)) {
+                        allHaveLocks = false;
+                        break;
+                    }
+                }
+
+                return allHaveLocks;
+            }
+        }));
 
         Future<?> result = executor.submit(subRemover);
         result.get();
@@ -158,12 +172,11 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         for (SimpleImmediateDispatchSubscription sub : subs) {
             assertTrue("There are no locked messages in any removed subscriptions", !hasSomeLocks(sub.dispatched));
         }
-
     }
 
     private boolean hasSomeLocks(List<MessageReference> dispatched) {
         boolean hasLock = false;
-        for (MessageReference mr: dispatched) {
+        for (MessageReference mr : dispatched) {
             QueueMessageReference qmr = (QueueMessageReference) mr;
             if (qmr.getLockOwner() != null) {
                 hasLock = true;
@@ -173,21 +186,19 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         return hasLock;
     }
 
-    public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
+    private class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
 
         private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
-        List<MessageReference> dispatched =
-                Collections.synchronizedList(new ArrayList<MessageReference>());
+        List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
 
         @Override
-        public void acknowledge(ConnectionContext context, MessageAck ack)
-                throws Exception {
+        public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
         }
 
         @Override
         public void add(MessageReference node) throws Exception {
             // immediate dispatch
-            QueueMessageReference  qmr = (QueueMessageReference)node;
+            QueueMessageReference qmr = (QueueMessageReference) node;
             qmr.lock(this);
             dispatched.add(qmr);
         }
@@ -234,8 +245,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         }
 
         @Override
-        public void add(ConnectionContext context, Destination destination)
-                throws Exception {
+        public void add(ConnectionContext context, Destination destination) throws Exception {
         }
 
         @Override
@@ -331,13 +341,8 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             return false;
         }
 
-        public boolean isSlave() {
-            return false;
-        }
-
         @Override
-        public boolean matches(MessageReference node,
-                MessageEvaluationContext context) throws IOException {
+        public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
             return true;
         }
 
@@ -347,13 +352,11 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         }
 
         @Override
-        public void processMessageDispatchNotification(
-                MessageDispatchNotification mdn) throws Exception {
+        public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
         }
 
         @Override
-        public Response pullMessage(ConnectionContext context, MessagePull pull)
-                throws Exception {
+        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
             return null;
         }
 
@@ -363,8 +366,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         }
 
         @Override
-        public List<MessageReference> remove(ConnectionContext context,
-                Destination destination) throws Exception {
+        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
             return new ArrayList<MessageReference>(dispatched);
         }
 
@@ -373,8 +375,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         }
 
         @Override
-        public void setSelector(String selector)
-                throws InvalidSelectorException, UnsupportedOperationException {
+        public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
         }
 
         @Override
@@ -382,8 +383,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         }
 
         @Override
-        public boolean addRecoveredMessage(ConnectionContext context,
-                MessageReference message) throws Exception {
+        public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
             return false;
         }
 
@@ -402,12 +402,6 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             return false;
         }
 
-        public void addDestination(Destination destination) {
-        }
-
-        public void removeDestination(Destination destination) {
-        }
-
         @Override
         public int countBeforeFull() {
             return 10;
@@ -422,6 +416,5 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         public long getInFlightMessageSize() {
             return subscriptionStatistics.getInflightMessageSize().getTotalSize();
         }
-
     }
 }