You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/05/23 12:30:09 UTC

activemq git commit: AMQ-4181 - revert mod to testQueueBrowserWith2ConsumersInterleaved which cause intermittent ci failure - browse is a snapshot at time of creation. tidy up some gaps in pagein logic sync

Repository: activemq
Updated Branches:
  refs/heads/master b1d8e66ea -> b4e35fe8a


AMQ-4181 - revert mod to testQueueBrowserWith2ConsumersInterleaved which cause intermittent ci failure - browse is a snapshot at time of creation. tidy up some gaps in pagein logic sync


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b4e35fe8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b4e35fe8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b4e35fe8

Branch: refs/heads/master
Commit: b4e35fe8a355c0fd5fe8935ab583e2886121946b
Parents: b1d8e66
Author: gtully <ga...@gmail.com>
Authored: Mon May 23 13:29:25 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon May 23 13:29:39 2016 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 37 ++++++++------------
 .../org/apache/activemq/broker/BrokerTest.java  |  9 ++++-
 2 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b4e35fe8/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index f025998..318f558 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -410,14 +410,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             browser.incrementQueueRef();
         }
 
-        void done() {
-            try {
-                browser.decrementQueueRef();
-            } catch (Exception e) {
-                LOG.warn("decrement ref on browser: " + browser, e);
-            }
-        }
-
         public QueueBrowserSubscription getBrowser() {
             return browser;
         }
@@ -1602,12 +1594,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                 pagedInPendingDispatchLock.readLock().unlock();
             }
 
-            // Perhaps we should page always into the pagedInPendingDispatch
-            // list if
-            // !messages.isEmpty(), and then if
-            // !pagedInPendingDispatch.isEmpty()
-            // then we do a dispatch.
-            boolean hasBrowsers = browserDispatches.size() > 0;
+            boolean hasBrowsers = !browserDispatches.isEmpty();
 
             if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) {
                 try {
@@ -1618,12 +1605,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             }
 
             if (hasBrowsers) {
-                PendingList alreadyDispatchedMessages = isPrioritizedMessages() ?
+                PendingList messagesInMemory = isPrioritizedMessages() ?
                         new PrioritizedPendingList() : new OrderedPendingList();
                 pagedInMessagesLock.readLock().lock();
-                try{
-                    alreadyDispatchedMessages.addAll(pagedInMessages);
-                }finally {
+                try {
+                    messagesInMemory.addAll(pagedInMessages);
+                } finally {
                     pagedInMessagesLock.readLock().unlock();
                 }
 
@@ -1636,9 +1623,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
 
                         QueueBrowserSubscription browser = browserDispatch.getBrowser();
 
-                        LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size());
+                        LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size());
                         boolean added = false;
-                        for (MessageReference node : alreadyDispatchedMessages) {
+                        for (MessageReference node : messagesInMemory) {
                             if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) {
                                 msgContext.setMessageReference(node);
                                 if (browser.matches(node, msgContext)) {
@@ -1902,7 +1889,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
         List<QueueMessageReference> result = null;
         PendingList resultList = null;
 
-        int toPageIn = Math.min(maxPageSize, messages.size());
+        int toPageIn = maxPageSize;
+        messagesLock.readLock().lock();
+        try {
+            toPageIn = Math.min(toPageIn, messages.size());
+        } finally {
+            messagesLock.readLock().unlock();
+        }
         int pagedInPendingSize = 0;
         pagedInPendingDispatchLock.readLock().lock();
         try {
@@ -1913,7 +1906,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
         if (isLazyDispatch() && !force) {
             // Only page in the minimum number of messages which can be
             // dispatched immediately.
-            toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
+            toPageIn = Math.min(toPageIn, getConsumerMessageCountBeforeFull());
         }
 
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/b4e35fe8/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
index 769cdbf..200d215 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
@@ -265,7 +265,9 @@ public class BrokerTest extends BrokerTestSupport {
             messages.add(m1);
         }
 
-        for (int i = 0; i < 4; i++) {
+        // a browse is a snapshot - only guarantee to see messages produced before
+        // the browser
+        for (int i = 0; i < 1; i++) {
             Message m1 = messages.get(i);
             Message m2 = receiveMessage(connection2);
             assertNotNull("m2 is null for index: " + i, m2);
@@ -275,6 +277,11 @@ public class BrokerTest extends BrokerTestSupport {
 
         assertNoMessagesLeft(connection1);
         assertNoMessagesLeft(connection2);
+
+        connection1.request(closeConnectionInfo(connectionInfo1));
+        connection1.stop();
+        connection2.request(closeConnectionInfo(connectionInfo2));
+        connection2.stop();
     }