You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/29 21:21:11 UTC

svn commit: r491089 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: kahadaptor/KahaTopicMessageStore.java kahadaptor/TopicSubContainer.java memory/MemoryTopicSub.java rapid/RapidTopicMessageStore.java

Author: rajdavies
Date: Fri Dec 29 12:21:10 2006
New Revision: 491089

URL: http://svn.apache.org/viewvc?view=rev&rev=491089
Log:
clear last batch id if no more messages left to dispatch to a durable subscriber

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=491089&r1=491088&r2=491089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Fri Dec 29 12:21:10 2006
@@ -21,7 +21,6 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.kaha.ListContainer;
@@ -29,10 +28,8 @@
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.rapid.RapidMessageReference;
 
 /**
  * @version $Revision: 1.5 $
@@ -70,7 +67,7 @@
                 ConsumerMessageRef ref=new ConsumerMessageRef();
                 ref.setAckEntry(ackEntry);
                 ref.setMessageEntry(messageEntry);
-                container.getListContainer().add(ref);
+                container.add(ref);
             }
         }
     }
@@ -80,7 +77,10 @@
         String subcriberId=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
         if(container!=null){
-            ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+            ConsumerMessageRef ref=container.remove();
+            if(container.isEmpty()){
+                container.reset();
+            }
             if(ref!=null){
                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
                 if(tsa!=null){
@@ -112,7 +112,7 @@
         if(!subscriberContainer.containsKey(key)){
             subscriberContainer.put(key,info);
         }
-        //add the subscriber
+        // add the subscriber
         ListContainer container=addSubscriberMessageContainer(key);
         if(retroactive){
             for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
@@ -135,7 +135,7 @@
         String key=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
         if(container!=null){
-            for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+            for(Iterator i=container.iterator();i.hasNext();){
                 ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
                 Object msg=messageContainer.get(ref.getMessageEntry());
                 if(msg!=null){
@@ -158,14 +158,16 @@
             int count=0;
             StoreEntry entry=container.getBatchEntry();
             if(entry==null){
-                entry=container.getListContainer().getFirst();
+                entry=container.getEntry();
             }else{
-                entry=container.getListContainer().refresh(entry);
-                entry=container.getListContainer().getNext(entry);
+                entry=container.refreshEntry(entry);
+                if(entry!=null){
+                    entry=container.getNextEntry(entry);
+                }
             }
             if(entry!=null){
                 do{
-                    ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
+                    ConsumerMessageRef consumerRef=container.get(entry);
                     Object msg=messageContainer.get(consumerRef.getMessageEntry());
                     if(msg!=null){
                         if(msg.getClass()==String.class){
@@ -178,7 +180,7 @@
                         count++;
                     }
                     container.setBatchEntry(entry);
-                    entry=container.getListContainer().getNext(entry);
+                    entry=container.getNextEntry(entry);
                 }while(entry!=null&&count<maxReturned&&listener.hasSpace());
             }
         }
@@ -210,11 +212,11 @@
         subscriberMessages.put(key,tsc);
         return container;
     }
-    
-    protected void removeSubscriberMessageContainer(Object key) throws IOException {
+
+    protected void removeSubscriberMessageContainer(Object key) throws IOException{
         subscriberContainer.remove(key);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
-        for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+        for(Iterator i=container.iterator();i.hasNext();){
             ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
             if(ref!=null){
                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
@@ -234,7 +236,7 @@
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         String key=getSubscriptionKey(clientId,subscriberName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        return container.getListContainer().size();
+        return container.size();
     }
 
     /**
@@ -251,8 +253,6 @@
         messageContainer.add(messageRef);
     }
 
-   
-
     /**
      * @param identity
      * @return String
@@ -263,7 +263,6 @@
         return null;
     }
 
-  
     /**
      * @param context
      * @throws IOException
@@ -274,11 +273,10 @@
         ackContainer.clear();
         for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
             TopicSubContainer container=(TopicSubContainer)i.next();
-            container.getListContainer().clear();
+            container.clear();
         }
     }
 
-   
     public synchronized void resetBatching(String clientId,String subscriptionName){
         String key=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=491089&r1=491088&r2=491089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Fri Dec 29 12:21:10 2006
@@ -14,6 +14,7 @@
 
 package org.apache.activemq.store.kahadaptor;
 
+import java.util.Iterator;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.StoreEntry;
 
@@ -44,22 +45,54 @@
         this.batchEntry=batchEntry;
     }
     
-    /**
-     * @return the listContainer
-     */
-     public ListContainer getListContainer(){
-        return this.listContainer;
+    
+     public void reset() {
+        batchEntry = null;
+    }
+     
+    public boolean isEmpty() {
+        return listContainer.isEmpty();
     }
     
-    /**
-     * @param listContainer the listContainer to set
-     */
-     public void setListContainer(ListContainer container){
-        this.listContainer=container;
+    public void add(ConsumerMessageRef ref) {
+        listContainer.add(ref);
     }
     
-     public void reset() {
-        batchEntry = null;
+    public ConsumerMessageRef remove() {
+        ConsumerMessageRef result =  (ConsumerMessageRef)listContainer.removeFirst();
+        if (listContainer.isEmpty()) {
+            reset();
+        }
+        return result;
+    }
+    
+    public ConsumerMessageRef get(StoreEntry entry) {
+        return (ConsumerMessageRef)listContainer.get(entry);
+    }
+    
+    public StoreEntry getEntry() {
+        return listContainer.getFirst();
+    }
+    
+    public StoreEntry refreshEntry(StoreEntry entry) {
+        return listContainer.refresh(entry);
+    }
+    
+    public StoreEntry getNextEntry(StoreEntry entry) {
+        return listContainer.getNext(entry);
+    }
+        
+    public Iterator iterator() {
+        return listContainer.iterator();
+    }
+    
+    public int size() {
+        return listContainer.size();
+    }
+    
+    public void clear() {
+        reset();
+        listContainer.clear();
     }
    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?view=diff&rev=491089&r1=491088&r2=491089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Fri Dec 29 12:21:10 2006
@@ -38,6 +38,9 @@
 
     void removeMessage(MessageId id){
         map.remove(id);
+        if (map.isEmpty()) {
+            lastBatch=null;
+        }
     }
 
     int size(){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=491089&r1=491088&r2=491089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri Dec 29 12:21:10 2006
@@ -22,7 +22,6 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.kaha.ListContainer;
@@ -78,7 +77,7 @@
                 ConsumerMessageRef ref=new ConsumerMessageRef();
                 ref.setAckEntry(ackEntry);
                 ref.setMessageEntry(messageEntry);
-                container.getListContainer().add(ref);
+                container.add(ref);
             }
         }
     }
@@ -88,7 +87,7 @@
         String subcriberId=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
         if(container!=null){
-            ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+            ConsumerMessageRef ref=(ConsumerMessageRef)container.remove();
             if(ref!=null){
                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
                 if(tsa!=null){
@@ -142,7 +141,7 @@
         String key=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
         if(container!=null){
-            for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+            for(Iterator i=container.iterator();i.hasNext();){
                 ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
                 RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(ref
                         .getMessageEntry());
@@ -163,14 +162,14 @@
             int count=0;
             StoreEntry entry=container.getBatchEntry();
             if(entry==null){
-                entry=container.getListContainer().getFirst();
+                entry=container.getEntry();
             }else{
-                entry=container.getListContainer().refresh(entry);
-                entry=container.getListContainer().getNext(entry);
+                entry=container.refreshEntry(entry);
+                entry=container.getNextEntry(entry);
             }
             if(entry!=null){
                 do{
-                    ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
+                    ConsumerMessageRef consumerRef=container.get(entry);
                     RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(consumerRef
                             .getMessageEntry());
                     if(messageReference!=null){
@@ -179,7 +178,7 @@
                         count++;
                     }
                     container.setBatchEntry(entry);
-                    entry=container.getListContainer().getNext(entry);
+                    entry=container.getNextEntry(entry);
                 }while(entry!=null&&count<maxReturned && listener.hasSpace());
             }
         }
@@ -210,7 +209,7 @@
     protected void removeSubscriberMessageContainer(Object key) throws IOException {
         subscriberContainer.remove(key);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
-        for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+        for(Iterator i=container.iterator();i.hasNext();){
             ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
             if(ref!=null){
                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
@@ -230,7 +229,7 @@
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         String key=getSubscriptionKey(clientId,subscriberName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        return container.getListContainer().size();
+        return container.size();
     }
 
     /**
@@ -271,7 +270,7 @@
         ackContainer.clear();
         for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
             TopicSubContainer container=(TopicSubContainer)i.next();
-            container.getListContainer().clear();
+            container.clear();
         }
     }
 
@@ -294,7 +293,7 @@
         String subcriberId=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
         if(container!=null){
-            ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+            ConsumerMessageRef ref=(ConsumerMessageRef)container.remove();
             if(ref!=null){
                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
                 if(tsa!=null){