You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/10/13 10:57:07 UTC

svn commit: r703975 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors: AbstractPendingMessageCursor.java AbstractStoreCursor.java PendingMessageCursor.java StoreDurableSubscriberCursor.java TopicStorePrefetch.java

Author: rajdavies
Date: Mon Oct 13 01:57:07 2008
New Revision: 703975

URL: http://svn.apache.org/viewvc?rev=703975&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1971

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=703975&r1=703974&r2=703975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Mon Oct 13 01:57:07 2008
@@ -246,15 +246,7 @@
         return false;
     }
     
-    /**
-     * Mark a message as already dispatched
-     * @param message
-     */
-    public void dispatched(MessageReference message) {
-    	//add it to the audit
-    	isDuplicate(message.getMessageId());
-    }
-    
+       
     /**
      * set the audit
      * @param audit

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=703975&r1=703974&r2=703975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Mon Oct 13 01:57:07 2008
@@ -37,11 +37,11 @@
 public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
     private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
     protected final Destination regionDestination;
-    protected final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
+    private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
+    private Iterator<Entry<MessageId, Message>> iterator = null;
     protected boolean cacheEnabled=false;
     protected boolean batchResetNeeded = true;
     protected boolean storeHasMessages = false;
-    protected Iterator<Entry<MessageId, Message>> iterator = null;
     protected int size;
     
     protected AbstractStoreCursor(Destination destination) {
@@ -84,6 +84,7 @@
             }
             message.incrementReferenceCount();
             batchList.put(message.getMessageId(), message);
+            clearIterator(true);
         } else {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Ignoring batched duplicated from store: " + message);
@@ -102,11 +103,25 @@
                 throw new RuntimeException(e);
             }
         }
-        this.iterator = this.batchList.entrySet().iterator();
+        clearIterator(true);
     }
     
-    public void release() {
+    public synchronized void release() {
+        clearIterator(false);
+    }
+    
+    private synchronized void clearIterator(boolean ensureIterator) {
+        boolean haveIterator = this.iterator != null;
         this.iterator=null;
+        if(haveIterator&&ensureIterator) {
+            ensureIterator();
+        }
+    }
+    
+    private synchronized void ensureIterator() {
+        if(this.iterator==null) {
+            this.iterator=this.batchList.entrySet().iterator();
+        }
     }
 
 
@@ -117,16 +132,12 @@
         if (batchList.isEmpty()) {
             try {
                 fillBatch();
-                this.iterator = this.batchList.entrySet().iterator();
             } catch (Exception e) {
                 LOG.error("Failed to fill batch", e);
                 throw new RuntimeException(e);
             }
-        }else {
-            if (this.iterator==null) {
-                this.iterator=this.batchList.entrySet().iterator();
-            }
         }
+        ensureIterator();
         return this.iterator.hasNext();
     }
     
@@ -192,6 +203,7 @@
             msg.decrementReferenceCount();
         }
         batchList.clear();
+        clearIterator(false);
         batchResetNeeded = true;
         this.cacheEnabled=false;
         if (isStarted()) { 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=703975&r1=703974&r2=703975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Mon Oct 13 01:57:07 2008
@@ -255,11 +255,6 @@
      */
     public boolean isTransient();
     
-    /**
-     * Mark a message as already dispatched
-     * @param message
-     */
-    public void dispatched(MessageReference message);
     
     /**
      * set the audit

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=703975&r1=703974&r2=703975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Oct 13 01:57:07 2008
@@ -300,17 +300,6 @@
         }
     }
     
-    /**
-     * Mark a message as already dispatched
-     * @param message
-     */
-    public synchronized void dispatched(MessageReference message) {
-        super.dispatched(message);
-        for (PendingMessageCursor cursor : storePrefetches) {
-            cursor.dispatched(message);
-        }
-    }
-
     protected synchronized PendingMessageCursor getNextCursor() throws Exception {
         if (currentCursor == null || currentCursor.isEmpty()) {
             currentCursor = null;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=703975&r1=703974&r2=703975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Oct 13 01:57:07 2008
@@ -64,20 +64,7 @@
         throw new RuntimeException("Not supported");
     }
     
-    /**
-     * Mark a message as already dispatched
-     * @param message
-     */  
-    public synchronized void dispatched(MessageReference message) {
-        if (this.audit != null) {
-            isDuplicate(message.getMessageId());
-            Message removed = this.batchList.remove(message.getMessageId());
-            if (removed != null) {
-                removed.decrementReferenceCount();
-            }
-        }
-    }
-    
+        
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
         MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
         messageEvaluationContext.setMessageReference(message);