You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/10/23 18:52:27 UTC

svn commit: r707415 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Thu Oct 23 09:52:27 2008
New Revision: 707415

URL: http://svn.apache.org/viewvc?rev=707415&view=rev
Log:
fix AMQ-1984 - hanging consumer receive after multiple consumer disconnect/reconnect

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=707415&r1=707414&r2=707415&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Oct 23 09:52:27 2008
@@ -104,6 +104,7 @@
             }
         }
         clearIterator(true);
+        size();
     }
     
     public synchronized void release() {
@@ -166,7 +167,7 @@
 
     public final synchronized void remove() {
         size--;
-        if (size==0 && isStarted() && cacheEnabled) {
+        if (size==0 && isStarted() && useCache) {
             cacheEnabled=true;
         }
         if (iterator!=null) {
@@ -177,6 +178,7 @@
     public final synchronized void remove(MessageReference node) {
         size--;
         cacheEnabled=false;
+        batchList.remove(node.getMessageId());
     }
     
            
@@ -234,7 +236,8 @@
     }
     
     public final synchronized boolean isEmpty() {
-        return size <= 0;
+        // negative means more messages added to store through queue.send since last reset
+        return size == 0;
     }
 
     public final synchronized boolean hasMessagesBufferedToDeliver() {
@@ -242,12 +245,10 @@
     }
 
     public final synchronized int size() {
-        if (isStarted()) {
-            return size;
+        if (size < 0) {
+            this.size = getStoreSize();
         }
-        this.size = getStoreSize();
         return size;
-        
     }
     
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=707415&r1=707414&r2=707415&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Thu Oct 23 09:52:27 2008
@@ -122,18 +122,13 @@
     }
 
     public synchronized boolean hasNext() {
-
-        boolean result = true;//pendingCount > 0;
-        if (result) {
-            try {
-                currentCursor = getNextCursor();
-            } catch (Exception e) {
-                LOG.error("Failed to get current cursor ", e);
-                throw new RuntimeException(e);
-            }
-            result = currentCursor != null ? currentCursor.hasNext() : false;
-        }
-        return result;
+        try {
+            getNextCursor();
+        } catch (Exception e) {
+            LOG.error("Failed to get current cursor ", e);
+            throw new RuntimeException(e);
+       }
+       return currentCursor != null ? currentCursor.hasNext() : false;
     }
 
     public synchronized MessageReference next() {
@@ -160,6 +155,7 @@
     public synchronized void reset() {
         nonPersistent.reset();
         persistent.reset();
+        pendingCount = persistent.size() + nonPersistent.size();        
     }
     
     public void release() {
@@ -169,11 +165,15 @@
 
 
     public synchronized int size() {
+        if (pendingCount < 0) {
+            pendingCount = persistent.size() + nonPersistent.size();
+        }
         return pendingCount;
     }
 
     public synchronized boolean isEmpty() {
-        return pendingCount <= 0;
+        // if negative, more messages arrived in store since last reset so non empty
+        return pendingCount == 0;
     }
 
     /**
@@ -259,6 +259,7 @@
         if (nonPersistent != null) {
             nonPersistent.gc();
         }
+        pendingCount = persistent.size() + nonPersistent.size();
     }
 
     public void setSystemUsage(SystemUsage usageManager) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java?rev=707415&r1=707414&r2=707415&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java Thu Oct 23 09:52:27 2008
@@ -61,7 +61,6 @@
         // give the acks a chance to flow
         Thread.sleep(2000);
         Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next(); 
-        System.err.println(internalQueue);
         
         assertTrue("All messages are consumed and acked from source:" + internalQueue, internalQueue.getMessages().isEmpty());
         assertEquals("messages source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getMessages().getCount());