You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/08/15 16:29:28 UTC

svn commit: r686236 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: AbstractSubscription.java BaseDestination.java Queue.java Subscription.java Topic.java policy/PolicyEntry.java

Author: rajdavies
Date: Fri Aug 15 07:29:27 2008
New Revision: 686236

URL: http://svn.apache.org/viewvc?rev=686236&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1878

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Fri Aug 15 07:29:27 2008
@@ -198,7 +198,7 @@
     public void addDestination(Destination destination) {
         
     }
-    
+       
     
     /**
      * Remove a destination
@@ -207,6 +207,10 @@
     public void removeDestination(Destination destination) {
         
     }
+    
+    public int countBeforeFull() {
+        return getDispatchedQueueSize() - info.getPrefetchSize();
+    }
 
     protected void doAddRecoveredMessage(MessageReference message) throws Exception {
         add(message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Aug 15 07:29:27 2008
@@ -34,10 +34,10 @@
  */
 public abstract class BaseDestination implements Destination {
     /**
-     * The default number of messages to page in to the destination
+     * The maximum number of messages to page in to the destination
      * from persistent storage
      */
-    public static final int DEFAULT_PAGE_SIZE=200;
+    public static final int MAX_PAGE_SIZE=200;
    
     protected final ActiveMQDestination destination;
     protected final Broker broker;
@@ -48,7 +48,7 @@
     private int maxProducersToAudit=1024;
     private int maxAuditDepth=2048;
     private boolean enableAudit=true;
-    private int maxPageSize=DEFAULT_PAGE_SIZE;
+    private int maxPageSize=MAX_PAGE_SIZE;
     private boolean useCache=true;
     private int minimumMessageSize=1024;
     private boolean lazyDispatch=false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Aug 15 07:29:27 2008
@@ -228,7 +228,6 @@
             // duplicates
             // etc.
             doPageIn(false);
-//            msgContext.setDestination(destination);
 
             synchronized (pagedInMessages) {
                 RecoveryDispatch rd = new RecoveryDispatch();
@@ -240,13 +239,17 @@
             if( sub instanceof QueueBrowserSubscription ) {
                 ((QueueBrowserSubscription)sub).incrementQueueRef();
             }
-            
+            if (!this.optimizedDispatch) {
+                    wakeup();
+            }
         }finally {
             dispatchLock.unlock();
         }
+        if (this.optimizedDispatch) {
         // Outside of dispatchLock() to maintain the lock hierarchy of
         // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
-        wakeup();
+            wakeup();
+        }
     }
 
     public void removeSubscription(ConnectionContext context, Subscription sub)
@@ -300,12 +303,17 @@
             if (consumers.isEmpty()) {
                 messages.gc();
             }
+            if (!this.optimizedDispatch) {
+                wakeup();
+            }
         }finally {
             dispatchLock.unlock();
         }
-        // Outside of dispatchLock() to maintain the lock hierarchy of
-        // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
-        wakeup();
+        if (this.optimizedDispatch) {
+            // Outside of dispatchLock() to maintain the lock hierarchy of
+            // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
+            wakeup();
+        }
     }
 
     public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
@@ -1099,6 +1107,7 @@
         dispatchLock.lock();
         try{
             int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
+            toPageIn = Math.min(toPageIn,getMaxPageSize());
             if (isLazyDispatch()&& !force) {
              // Only page in the minimum number of messages which can be dispatched immediately.
              toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
@@ -1142,7 +1151,7 @@
         dispatchLock.lock();
         try {
             if(!pagedInPendingDispatch.isEmpty()) {
-//                System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size());
+ //              System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size());
                 // Try to first dispatch anything that had not been dispatched before.
                 pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
 //                System.out.println(getName()+": new pending list1: "+pagedInPendingDispatch.size());
@@ -1237,9 +1246,8 @@
         boolean zeroPrefetch = false;
         synchronized (consumers) {
             for (Subscription s : consumers) {
-            	PrefetchSubscription ps = (PrefetchSubscription) s;
-            	zeroPrefetch |= ps.getPrefetchSize() == 0;
-            	int countBeforeFull = ps.countBeforeFull();
+            	zeroPrefetch |= s.getPrefetchSize() == 0;
+            	int countBeforeFull = s.countBeforeFull();
                 total += countBeforeFull;
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Fri Aug 15 07:29:27 2008
@@ -216,4 +216,9 @@
      * @return true if a browser
      */
     boolean isBrowser();
+    
+    /**
+     * @return the number of messages this subscription can accept before its full
+     */
+    int countBeforeFull();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Aug 15 07:29:27 2008
@@ -220,11 +220,8 @@
                             if (subscription.matches(message, msgContext)) {
                                 subscription.add(message);
                             }
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
                         } catch (IOException e) {
-                            // TODO: Need to handle this better.
-                            e.printStackTrace();
+                           LOG.error("Failed to recover this message " + message);
                         }
                         return true;
                     }
@@ -570,7 +567,6 @@
                     return;
                 }
             }
-            
             MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
             msgContext.setDestination(destination);
             msgContext.setMessageReference(message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Aug 15 07:29:27 2008
@@ -58,7 +58,7 @@
     private boolean enableAudit=true;
     private boolean producerFlowControl = true;
     private boolean optimizedDispatch=false;
-    private int maxPageSize=100;
+    private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
     private boolean useCache=true;
     private long minimumMessageSize=1024;
     private boolean useConsumerPriority=true;