You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/07 20:18:28 UTC

svn commit: r383969 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: DurableTopicSubscription.java PrefetchSubscription.java QueueBrowserSubscription.java QueueSubscription.java

Author: chirino
Date: Tue Mar  7 11:18:28 2006
New Revision: 383969

URL: http://svn.apache.org/viewcvs?rev=383969&view=rev
Log:
renamed matched to pending.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=383969&r1=383968&r2=383969&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Mar  7 11:18:28 2006
@@ -97,12 +97,12 @@
             
             iter.remove();
         }
-        for (Iterator iter = matched.iterator(); iter.hasNext();) {
+        for (Iterator iter = pending.iterator(); iter.hasNext();) {
             MessageReference node = (MessageReference) iter.next();
             // node.decrementTargetCount();
             iter.remove();
         }        
-        delivered=0;
+        prefetchExtension=0;
     }
 
     protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
@@ -156,8 +156,8 @@
             " consumer="+info.getConsumerId()+
             ", destinations="+destinations.size()+
             ", dispatched="+dispatched.size()+
-            ", delivered="+this.delivered+
-            ", matched="+this.matched.size();
+            ", delivered="+this.prefetchExtension+
+            ", pending="+this.pending.size();
     }
 
     public String getClientId() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=383969&r1=383968&r2=383969&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Mar  7 11:18:28 2006
@@ -42,17 +42,17 @@
 abstract public class PrefetchSubscription extends AbstractSubscription{
     
     static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
-    final protected LinkedList matched=new LinkedList();
+    final protected LinkedList pending=new LinkedList();
     final protected LinkedList dispatched=new LinkedList();
     
-    protected int delivered=0;
+    protected int prefetchExtension=0;
     int preLoadLimit=1024*100;
     int preLoadSize=0;
     boolean dispatching=false;
     
     long enqueueCounter;
     long dispatchCounter;
-    long aknowledgedCounter;
+    long dequeueCounter;
     
     public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
                     throws InvalidSelectorException{
@@ -64,15 +64,15 @@
         if(!isFull()&&!isSlaveBroker()){
             dispatch(node);
         }else{
-            synchronized(matched){
-                matched.addLast(node);
+            synchronized(pending){
+                pending.addLast(node);
             }
         }
     }
 
     public void processMessageDispatchNotification(MessageDispatchNotification mdn){
-        synchronized(matched){
-            for(Iterator i=matched.iterator();i.hasNext();){
+        synchronized(pending){
+            for(Iterator i=pending.iterator();i.hasNext();){
                 MessageReference node=(MessageReference) i.next();
                 if(node.getMessageId().equals(mdn.getMessageId())){
                     i.remove();
@@ -106,16 +106,16 @@
                 if(inAckRange){
                     // Don't remove the nodes until we are committed.
                     if(!context.isInTransaction()){
-                    	aknowledgedCounter++;
+                    	dequeueCounter++;
                         iter.remove();
                     }else{
                         // setup a Synchronization to remove nodes from the dispatched list.
                         context.getTransaction().addSynchronization(new Synchronization(){
                             public void afterCommit() throws Exception{
                                 synchronized(PrefetchSubscription.this){
-                                	aknowledgedCounter++;
+                                	dequeueCounter++;
                                     dispatched.remove(node);
-                                    delivered--;
+                                    prefetchExtension--;
                                 }
                             }
                         });
@@ -124,9 +124,9 @@
                     acknowledge(context,ack,node);
                     if(ack.getLastMessageId().equals(messageId)){
                         if(context.isInTransaction())
-                            delivered=Math.max(delivered,index+1);
+                            prefetchExtension=Math.max(prefetchExtension,index+1);
                         else
-                            delivered=Math.max(0,delivered-(index+1));
+                            prefetchExtension=Math.max(0,prefetchExtension-(index+1));
                         if(wasFull&&!isFull()){
                             dispatchMatched();
                         }
@@ -144,7 +144,7 @@
             for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
                 final MessageReference node=(MessageReference) iter.next();
                 if(ack.getLastMessageId().equals(node.getMessageId())){
-                    delivered=Math.max(delivered,index+1);
+                    prefetchExtension=Math.max(prefetchExtension,index+1);
                     if(wasFull&&!isFull()){
                         dispatchMatched();
                     }
@@ -184,11 +184,11 @@
                         node.decrementReferenceCount();
                     }
                     iter.remove();
-                    aknowledgedCounter++;
+                    dequeueCounter++;
                     index++;
                     acknowledge(context,ack,node);
                     if(ack.getLastMessageId().equals(messageId)){
-                        delivered=Math.max(0,delivered-(index+1));
+                        prefetchExtension=Math.max(0,prefetchExtension-(index+1));
                         if(wasFull&&!isFull()){
                             dispatchMatched();
                         }
@@ -202,11 +202,11 @@
     }
 
     protected boolean isFull(){
-        return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
+        return dispatched.size()-prefetchExtension>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
     }
     
     synchronized public int getPendingQueueSize(){
-        return matched.size();
+        return pending.size();
     }
     
     synchronized public int getDispatchedQueueSize(){
@@ -214,7 +214,7 @@
     }
     
     synchronized public long getDequeueCounter(){
-        return aknowledgedCounter;
+        return dequeueCounter;
     }
     
     synchronized public long getDispatchedCounter() {
@@ -230,7 +230,7 @@
         if(!dispatching){
             dispatching=true;
             try{
-                for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
+                for(Iterator iter=pending.iterator();iter.hasNext()&&!isFull();){
                     MessageReference node=(MessageReference) iter.next();
                     iter.remove();
                     dispatch(node);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=383969&r1=383968&r2=383969&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Tue Mar  7 11:18:28 2006
@@ -45,8 +45,8 @@
             " consumer="+info.getConsumerId()+
             ", destinations="+destinations.size()+
             ", dispatched="+dispatched.size()+
-            ", delivered="+this.delivered+
-            ", matched="+this.matched.size();
+            ", delivered="+this.prefetchExtension+
+            ", pending="+this.pending.size();
     }
 
     public void browseDone() throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=383969&r1=383968&r2=383969&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Tue Mar  7 11:18:28 2006
@@ -126,8 +126,8 @@
             " consumer="+info.getConsumerId()+
             ", destinations="+destinations.size()+
             ", dispatched="+dispatched.size()+
-            ", delivered="+this.delivered+
-            ", matched="+this.matched.size();
+            ", delivered="+this.prefetchExtension+
+            ", pending="+this.pending.size();
     }
 
     public int getLockPriority() {