You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/10/23 18:52:27 UTC
svn commit: r707415 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/cursors/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Thu Oct 23 09:52:27 2008
New Revision: 707415
URL: http://svn.apache.org/viewvc?rev=707415&view=rev
Log:
fix AMQ-1984 - hanging consumer receive after multiple consumer disconnect/reconnect
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=707415&r1=707414&r2=707415&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Oct 23 09:52:27 2008
@@ -104,6 +104,7 @@
}
}
clearIterator(true);
+ size();
}
public synchronized void release() {
@@ -166,7 +167,7 @@
public final synchronized void remove() {
size--;
- if (size==0 && isStarted() && cacheEnabled) {
+ if (size==0 && isStarted() && useCache) {
cacheEnabled=true;
}
if (iterator!=null) {
@@ -177,6 +178,7 @@
public final synchronized void remove(MessageReference node) {
size--;
cacheEnabled=false;
+ batchList.remove(node.getMessageId());
}
@@ -234,7 +236,8 @@
}
public final synchronized boolean isEmpty() {
- return size <= 0;
+ // negative means more messages added to store through queue.send since last reset
+ return size == 0;
}
public final synchronized boolean hasMessagesBufferedToDeliver() {
@@ -242,12 +245,10 @@
}
public final synchronized int size() {
- if (isStarted()) {
- return size;
+ if (size < 0) {
+ this.size = getStoreSize();
}
- this.size = getStoreSize();
return size;
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=707415&r1=707414&r2=707415&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Thu Oct 23 09:52:27 2008
@@ -122,18 +122,13 @@
}
public synchronized boolean hasNext() {
-
- boolean result = true;//pendingCount > 0;
- if (result) {
- try {
- currentCursor = getNextCursor();
- } catch (Exception e) {
- LOG.error("Failed to get current cursor ", e);
- throw new RuntimeException(e);
- }
- result = currentCursor != null ? currentCursor.hasNext() : false;
- }
- return result;
+ try {
+ getNextCursor();
+ } catch (Exception e) {
+ LOG.error("Failed to get current cursor ", e);
+ throw new RuntimeException(e);
+ }
+ return currentCursor != null ? currentCursor.hasNext() : false;
}
public synchronized MessageReference next() {
@@ -160,6 +155,7 @@
public synchronized void reset() {
nonPersistent.reset();
persistent.reset();
+ pendingCount = persistent.size() + nonPersistent.size();
}
public void release() {
@@ -169,11 +165,15 @@
public synchronized int size() {
+ if (pendingCount < 0) {
+ pendingCount = persistent.size() + nonPersistent.size();
+ }
return pendingCount;
}
public synchronized boolean isEmpty() {
- return pendingCount <= 0;
+ // if negative, more messages arrived in store since last reset so non empty
+ return pendingCount == 0;
}
/**
@@ -259,6 +259,7 @@
if (nonPersistent != null) {
nonPersistent.gc();
}
+ pendingCount = persistent.size() + nonPersistent.size();
}
public void setSystemUsage(SystemUsage usageManager) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java?rev=707415&r1=707414&r2=707415&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java Thu Oct 23 09:52:27 2008
@@ -61,7 +61,6 @@
// give the acks a chance to flow
Thread.sleep(2000);
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
- System.err.println(internalQueue);
assertTrue("All messages are consumed and acked from source:" + internalQueue, internalQueue.getMessages().isEmpty());
assertEquals("messages source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getMessages().getCount());