You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/05/23 12:30:09 UTC
activemq git commit: AMQ-4181 - revert mod to
testQueueBrowserWith2ConsumersInterleaved which cause intermittent ci failure
- browse is a snapshot at time of creation. tidy up some gaps in pagein logic
sync
Repository: activemq
Updated Branches:
refs/heads/master b1d8e66ea -> b4e35fe8a
AMQ-4181 - revert mod to testQueueBrowserWith2ConsumersInterleaved which cause intermittent ci failure - browse is a snapshot at time of creation. tidy up some gaps in pagein logic sync
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b4e35fe8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b4e35fe8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b4e35fe8
Branch: refs/heads/master
Commit: b4e35fe8a355c0fd5fe8935ab583e2886121946b
Parents: b1d8e66
Author: gtully <ga...@gmail.com>
Authored: Mon May 23 13:29:25 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon May 23 13:29:39 2016 +0100
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 37 ++++++++------------
.../org/apache/activemq/broker/BrokerTest.java | 9 ++++-
2 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/b4e35fe8/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index f025998..318f558 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -410,14 +410,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
browser.incrementQueueRef();
}
- void done() {
- try {
- browser.decrementQueueRef();
- } catch (Exception e) {
- LOG.warn("decrement ref on browser: " + browser, e);
- }
- }
-
public QueueBrowserSubscription getBrowser() {
return browser;
}
@@ -1602,12 +1594,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pagedInPendingDispatchLock.readLock().unlock();
}
- // Perhaps we should page always into the pagedInPendingDispatch
- // list if
- // !messages.isEmpty(), and then if
- // !pagedInPendingDispatch.isEmpty()
- // then we do a dispatch.
- boolean hasBrowsers = browserDispatches.size() > 0;
+ boolean hasBrowsers = !browserDispatches.isEmpty();
if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) {
try {
@@ -1618,12 +1605,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
if (hasBrowsers) {
- PendingList alreadyDispatchedMessages = isPrioritizedMessages() ?
+ PendingList messagesInMemory = isPrioritizedMessages() ?
new PrioritizedPendingList() : new OrderedPendingList();
pagedInMessagesLock.readLock().lock();
- try{
- alreadyDispatchedMessages.addAll(pagedInMessages);
- }finally {
+ try {
+ messagesInMemory.addAll(pagedInMessages);
+ } finally {
pagedInMessagesLock.readLock().unlock();
}
@@ -1636,9 +1623,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
QueueBrowserSubscription browser = browserDispatch.getBrowser();
- LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size());
+ LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size());
boolean added = false;
- for (MessageReference node : alreadyDispatchedMessages) {
+ for (MessageReference node : messagesInMemory) {
if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) {
msgContext.setMessageReference(node);
if (browser.matches(node, msgContext)) {
@@ -1902,7 +1889,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
List<QueueMessageReference> result = null;
PendingList resultList = null;
- int toPageIn = Math.min(maxPageSize, messages.size());
+ int toPageIn = maxPageSize;
+ messagesLock.readLock().lock();
+ try {
+ toPageIn = Math.min(toPageIn, messages.size());
+ } finally {
+ messagesLock.readLock().unlock();
+ }
int pagedInPendingSize = 0;
pagedInPendingDispatchLock.readLock().lock();
try {
@@ -1913,7 +1906,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (isLazyDispatch() && !force) {
// Only page in the minimum number of messages which can be
// dispatched immediately.
- toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
+ toPageIn = Math.min(toPageIn, getConsumerMessageCountBeforeFull());
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/b4e35fe8/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
index 769cdbf..200d215 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
@@ -265,7 +265,9 @@ public class BrokerTest extends BrokerTestSupport {
messages.add(m1);
}
- for (int i = 0; i < 4; i++) {
+ // a browse is a snapshot - only guarantee to see messages produced before
+ // the browser
+ for (int i = 0; i < 1; i++) {
Message m1 = messages.get(i);
Message m2 = receiveMessage(connection2);
assertNotNull("m2 is null for index: " + i, m2);
@@ -275,6 +277,11 @@ public class BrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection1);
assertNoMessagesLeft(connection2);
+
+ connection1.request(closeConnectionInfo(connectionInfo1));
+ connection1.stop();
+ connection2.request(closeConnectionInfo(connectionInfo2));
+ connection2.stop();
}