You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/23 21:08:34 UTC

svn commit: r614645 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/bugs/

Author: rajdavies
Date: Wed Jan 23 12:08:27 2008
New Revision: 614645

URL: http://svn.apache.org/viewvc?rev=614645&view=rev
Log:
resolution for https://issues.apache.org/activemq/browse/AMQ-1566

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Jan 23 12:08:27 2008
@@ -99,6 +99,12 @@
             this.active = true;
             this.context = context;
             this.info = info;
+            int prefetch = info.getPrefetchSize();
+            if (prefetch>0) {
+            prefetch += prefetch/2;
+            }
+            int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
+            this.pending.setMaxAuditDepth(depth);
             if (!keepDurableSubsActive) {
                 for (Iterator<Destination> iter = destinations.values()
                         .iterator(); iter.hasNext();) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jan 23 12:08:27 2008
@@ -127,25 +127,29 @@
     }
 
     public void add(MessageReference node) throws Exception {
-        boolean pendingEmpty = false;
-        synchronized(pendingLock) {
-            pendingEmpty = pending.isEmpty();
-        }
-        enqueueCounter++;
-        if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
-            dispatch(node);
-        } else {
-            optimizePrefetch();
-            synchronized(pendingLock) {
-                if (pending.isEmpty() && LOG.isDebugEnabled()) {
-                    LOG.debug("Prefetch limit.");
-                }
-                pending.addMessageLast(node);
-               
-            }
-            dispatchPending();
-        }
-    }
+		boolean pendingEmpty = false;
+		boolean dispatchPending = false;
+		synchronized (pendingLock) {
+			pendingEmpty = pending.isEmpty();
+			enqueueCounter++;
+			if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
+				pending.dispatched(node);
+				dispatch(node);
+			} else {
+				optimizePrefetch();
+				synchronized (pendingLock) {
+					if (pending.isEmpty() && LOG.isDebugEnabled()) {
+						LOG.debug("Prefetch limit.");
+					}
+					pending.addMessageLast(node);
+					dispatchPending = true;
+				}
+			}
+		}
+		if (dispatchPending) {
+			dispatchPending();
+		}
+	}
 
     public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
         synchronized(pendingLock) {
@@ -511,8 +515,7 @@
         final Message message = node.getMessage();
         if (message == null) {
             return false;
-        }
-                
+        }         
         // Make sure we can dispatch a message.
         if (canDispatch(node) && !isSlave()) {
             MessageDispatch md = createMessageDispatch(node, message);
@@ -520,11 +523,6 @@
             if (node != QueueMessageReference.NULL_MESSAGE) {
                 dispatchCounter++;
                 dispatched.add(node);
-                if(pending != null) {
-                   synchronized(pendingLock) {
-                        pending.dispatched(message);
-                    }
-                }
             } else {
                 prefetchExtension = Math.max(0, prefetchExtension - 1);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Wed Jan 23 12:08:27 2008
@@ -245,9 +245,26 @@
      * Mark a message as already dispatched
      * @param message
      */
-    public void dispatched(MessageReference message) {   
+    public void dispatched(MessageReference message) {
+    	//add it to the audit
+    	isDuplicate(message.getMessageId());
+    }
+    
+    /**
+     * set the audit
+     * @param audit
+     */
+    public void setMessageAudit(ActiveMQMessageAudit audit) {
+    	this.audit=audit;
+    }
+    
+    
+    /**
+     * @return the audit
+     */
+    public ActiveMQMessageAudit getMessageAudit() {
+    	return audit;
     }
-
 
     protected synchronized boolean  isDuplicate(MessageId messageId) {
         if (!this.enableAudit || this.audit==null) {
@@ -265,6 +282,4 @@
     protected synchronized boolean isStarted() {
         return started;
     }
-  
-   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Wed Jan 23 12:08:27 2008
@@ -19,10 +19,12 @@
 import java.io.IOException;
 import java.util.LinkedList;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.SystemUsage;
 
 /**
@@ -253,6 +255,18 @@
      * @param message
      */
     public void dispatched(MessageReference message);
+    
+    /**
+     * set the audit
+     * @param audit
+     */
+    public void setMessageAudit(ActiveMQMessageAudit audit);
+    
+    
+    /**
+     * @return the audit - could be null
+     */
+    public ActiveMQMessageAudit getMessageAudit();
 
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Wed Jan 23 12:08:27 2008
@@ -74,6 +74,7 @@
             started = true;
             super.start();
             for (PendingMessageCursor tsp : storePrefetches) {
+            	tsp.setMessageAudit(getMessageAudit());
                 tsp.start();
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Wed Jan 23 12:08:27 2008
@@ -66,7 +66,9 @@
             nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
             nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
         }
+        nonPersistent.setMessageAudit(getMessageAudit());
         nonPersistent.start();
+        persistent.setMessageAudit(getMessageAudit());
         persistent.start();
         pendingCount = persistent.size() + nonPersistent.size();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Jan 23 12:08:27 2008
@@ -30,7 +30,6 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
 import org.apache.commons.logging.Log;
@@ -51,7 +50,7 @@
     private String subscriberName;
     private Destination regionDestination;
     private boolean batchResetNeeded = true;
-    private boolean storeMayHaveMoreMessages = true;
+    private boolean storeHasMessages = false;
     private boolean started;
     private final Subscription subscription;
    
@@ -74,8 +73,8 @@
         if (!started) {
             started = true;
             super.start();
+            this.storeHasMessages = getStoreSize() > 0;
             getSystemUsage().getMemoryUsage().addUsageListener(this);
-            safeFillBatch();
         }
     }
 
@@ -104,14 +103,14 @@
 
     public synchronized void addMessageLast(MessageReference node) throws Exception {
         if (node != null) {
-            storeMayHaveMoreMessages=true;
+        	storeHasMessages=true;
             node.decrementReferenceCount();
         }
     }
 
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         if (node != null) {
-            storeMayHaveMoreMessages=true;
+        	storeHasMessages=true;
             node.decrementReferenceCount();
             rollback(node.getMessageId());
         }
@@ -168,8 +167,6 @@
                    
                 }
                 batchList.put(message.getMessageId(), message);
-            }else {
-                this.storeMayHaveMoreMessages=true;
             }
         }
         return true;
@@ -208,14 +205,13 @@
         if (batchResetNeeded) {
             this.store.resetBatching(clientId, subscriberName);
             this.batchResetNeeded = false;
-            this.storeMayHaveMoreMessages = true;
         }
-        while (this.batchList.isEmpty() && this.storeMayHaveMoreMessages) {
-            this.storeMayHaveMoreMessages = false;
+        while (this.batchList.isEmpty() && this.storeHasMessages) {
+            this.storeHasMessages = false;
             this.store.recoverNextMessages(clientId, subscriberName,
                     maxBatchSize, this);
             if (!this.batchList.isEmpty()) {
-                this.storeMayHaveMoreMessages=true;
+                this.storeHasMessages=true;
             }
         }
     }
@@ -240,7 +236,7 @@
     
     public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) {
         if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
-            storeMayHaveMoreMessages = true;
+        	storeHasMessages = true;
             try {
                 fillBatch();
             } catch (Exception e) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java Wed Jan 23 12:08:27 2008
@@ -39,7 +39,7 @@
  */
 public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest {
     
-    static final int NMSG = 100;
+    static final int NMSG = 200;
     static final int MSIZE = 256000;
     private static final transient Log LOG = LogFactory.getLog(JmsDurableTopicSlowReceiveTest.class);
     private static final String COUNT_PROPERY_NAME = "count";



Re: svn commit: r614645 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/bugs/

Posted by Glen Mazza <gl...@verizon.net>.
Am Mittwoch, den 23.01.2008, 20:08 +0000 schrieb rajdavies@apache.org:

> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=614645&r1=614644&r2=614645&view=diff
> ==============================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Wed Jan 23 12:08:27 2008

I think it would be more readable if this class were actually abstract, no?  Presently it is instantiable.


> }
> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=614645&r1=614644&r2=614645&view=diff
> ==============================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Wed Jan 23 12:08:27 2008
> @@ -74,6 +74,7 @@
>              started = true;
>              super.start();
>              for (PendingMessageCursor tsp : storePrefetches) {
> +            	tsp.setMessageAudit(getMessageAudit());
>                  tsp.start();
>              }
>          }

This class has a "private boolean started" value -- I think it would be
better if it could rely on the "started/isStarted()" values already in
its AbstractPendingMessageCursor base class.  Having a "started" value
in both classes can be confusing, especially since some of the
public/protected methods in the parent abstract class rely on a
"started" variable.

Also, in each of the subclasses of AbstractPendingMessageCursor, in the
method start(), the code sets the value of started=true *before*
actually doing any initialization--you may wish to consider moving the
started=true assignments to after the initialization, in case something
goes wrong with that process.



> 
> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=614645&r1=614644&r2=614645&view=diff
> ==============================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Wed Jan 23 12:08:27 2008
> @@ -66,7 +66,9 @@
>              nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
>              nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
>          }
> +        nonPersistent.setMessageAudit(getMessageAudit());
>          nonPersistent.start();
> +        persistent.setMessageAudit(getMessageAudit());
>          persistent.start();
>          pendingCount = persistent.size() + nonPersistent.size();
>      }
> 


In this class, in the addMessageLast() and addMessageFirst() methods,
the code adds to the nonPersistent cursor (and increments the pending
count) if and only if the StoreQueueCursor has been started, but in
these same methods the code adds to the Persistent cursor regardless of
whether the StoreQueueCursor has been started.  Is this difference
intentional?


> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=614645&r1=614644&r2=614645&view=diff
> ==============================================================================
>  
>      public synchronized void addMessageLast(MessageReference node) throws Exception {
>          if (node != null) {
> -            storeMayHaveMoreMessages=true;
> +        	storeHasMessages=true;
>              node.decrementReferenceCount();
>          }
>      }
>  
>      public synchronized void addMessageFirst(MessageReference node) throws Exception {
>          if (node != null) {
> -            storeMayHaveMoreMessages=true;
> +        	storeHasMessages=true;
>              node.decrementReferenceCount();
>              rollback(node.getMessageId());
>          }

> @@ -240,7 +236,7 @@
>      
>      public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) {
>          if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
> -            storeMayHaveMoreMessages = true;
> +        	storeHasMessages = true;
>              try {
>                  fillBatch();
>              } catch (Exception e) {
> 

>>From the above it looks like you might be having problems with inserting
spaces for tabs (or vice-versa) within your IDE.

Also, in this class, a couple of things look strange with
onUsageChanged:

    public void onUsageChanged(Usage usage, int oldPercentUsage,int
newPercentUsage) {
        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90)
{
        	storeHasMessages = true;
            try {
                fillBatch();
            } catch (Exception e) {
                LOG.error("Failed to fill batch ", e);
            }
        }
    }

1.) I'm not certain here what the code is doing, but shouldn't it be
newPercentUsage > oldPercentUsage?

2.) General comment--AFAICT storeHasMessages is very accurately set by
addMessageLast() and addMessageFirst(); it would seem to be a potential
source of bugs by having this listener also set it.

Regards,
Glen