You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/28 21:50:05 UTC

svn commit: r490794 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory: MemoryTopicMessageStore.java MemoryTopicSub.java

Author: rajdavies
Date: Thu Dec 28 12:50:04 2006
New Revision: 490794

URL: http://svn.apache.org/viewvc?view=rev&rev=490794
Log:
Provide a more robust implementation, and one that deletes messages after they have been consumed

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=490794&r1=490793&r2=490794
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Thu Dec 28 12:50:04 2006
@@ -18,7 +18,6 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.activemq.broker.ConnectionContext;
@@ -28,6 +27,7 @@
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.util.LRUCache;
 import org.apache.activemq.util.SubscriptionKey;
 
 /**
@@ -35,42 +35,45 @@
  */
 public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{
 
-    private Map ackDatabase;
     private Map subscriberDatabase;
-    private Map batchDatabase;
-    MessageId lastMessageId;
+    private Map topicSubMap;
 
     public MemoryTopicMessageStore(ActiveMQDestination destination){
-        this(destination,new LinkedHashMap(),makeMap(),makeMap(),makeMap());
+        this(destination,new LRUCache(100,100,0.75f,false),makeMap());
     }
 
     protected static Map makeMap(){
         return Collections.synchronizedMap(new HashMap());
     }
 
-    public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase,
-            Map ackDatabase, Map batchDatabase){
+    public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase){
         super(destination,messageTable);
         this.subscriberDatabase=subscriberDatabase;
-        this.ackDatabase=ackDatabase;
-        this.batchDatabase=batchDatabase;
+        this.topicSubMap=makeMap();
     }
 
     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
         super.addMessage(context,message);
-        lastMessageId=message.getMessageId();
+        for(Iterator i=topicSubMap.values().iterator();i.hasNext();){
+            MemoryTopicSub sub=(MemoryTopicSub)i.next();
+            sub.addMessage(message.getMessageId(),message);
+        }
     }
 
-    public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
-            throws IOException{
-        ackDatabase.put(new SubscriptionKey(clientId,subscriptionName),messageId);
+    public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
+            MessageId messageId) throws IOException{
+        SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+        MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(key);
+        if(sub!=null){
+            sub.removeMessage(messageId);
+        }
     }
 
     public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
         return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
     }
 
-    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+    public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
             throws IOException{
         SubscriptionInfo info=new SubscriptionInfo();
         info.setDestination(destination);
@@ -78,112 +81,62 @@
         info.setSelector(selector);
         info.setSubcriptionName(subscriptionName);
         SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
-        subscriberDatabase.put(key,info);
-        MessageId l=retroactive?null:lastMessageId;
-        if(l!=null){
-            ackDatabase.put(key,l);
+        MemoryTopicSub sub=new MemoryTopicSub();
+        topicSubMap.put(key,sub);
+        if(retroactive){
+            for(Iterator i=messageTable.entrySet().iterator();i.hasNext();){
+                Map.Entry entry=(Entry)i.next();
+                sub.addMessage((MessageId)entry.getKey(),(Message)entry.getValue());
+            }
         }
+        subscriberDatabase.put(key,info);
     }
 
     public void deleteSubscription(String clientId,String subscriptionName){
         org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
-        ackDatabase.remove(key);
         subscriberDatabase.remove(key);
+        topicSubMap.remove(key);
     }
 
     public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
             throws Exception{
-        MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
-        boolean pastLastAck=lastAck==null;
-        // the message table is a synchronizedMap - so just have to synchronize here
-        synchronized(messageTable){
-            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
-                Map.Entry entry=(Entry)iter.next();
-                if(pastLastAck){
-                    Object msg=entry.getValue();
-                    if(msg.getClass()==String.class){
-                        listener.recoverMessageReference((String)msg);
-                    }else{
-                        listener.recoverMessage((Message)msg);
-                    }
-                }else{
-                    pastLastAck=entry.getKey().equals(lastAck);
-                }
-            }
-            listener.finished();
+        MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
+        if(sub!=null){
+            sub.recoverSubscription(listener);
         }
-       
     }
 
-   
     public void delete(){
         super.delete();
-        ackDatabase.clear();
         subscriberDatabase.clear();
-        lastMessageId=null;
+        topicSubMap.clear();
     }
 
     public SubscriptionInfo[] getAllSubscriptions() throws IOException{
         return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
     }
 
-
-    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+    public synchronized int getMessageCount(String clientId,String subscriberName) throws IOException{
         int result=0;
-        MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
-        // the message table is a synchronizedMap - so just have to synchronize here
-        synchronized(messageTable){
-            result=messageTable.size();
-            if(lastAck!=null){
-                for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
-                    Map.Entry entry=(Entry)iter.next();
-                    if(entry.getKey().equals(lastAck)){
-                        break;
-                    }
-                    result--;
-                }
-            }
+        MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriberName));
+        if(sub!=null){
+            result=sub.size();
         }
         return result;
     }
 
-    
     public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
             MessageRecoveryListener listener) throws Exception{
-        SubscriptionKey key = new SubscriptionKey(clientId,subscriptionName);
-        MessageId lastBatch = (MessageId)batchDatabase.get(key);
-        if (lastBatch==null) {
-            //if last batch null - start from last ack
-            lastBatch = (MessageId)ackDatabase.get(key);
-        }
-        boolean pastLackBatch=lastBatch==null;
-        MessageId lastId = null;
-        // the message table is a synchronizedMap - so just have to synchronize here
-        int count = 0;
-        synchronized(messageTable){
-            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() &&count < maxReturned ;){
-                Map.Entry entry=(Entry)iter.next();
-                if(pastLackBatch){
-                    count++;
-                    Object msg=entry.getValue();
-                    lastId = (MessageId)entry.getKey();
-                    if(msg.getClass()==String.class){
-                        listener.recoverMessageReference((String)msg);
-                    }else{
-                        listener.recoverMessage((Message)msg);
-                    }
-                }else{
-                    pastLackBatch=entry.getKey().equals(lastBatch);
-                }
-            }
-            if (lastId != null) {
-                batchDatabase.put(key,lastId);
-            }
-            listener.finished();
+        MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
+        if(sub!=null){
+            sub.recoverNextMessages(maxReturned,listener);
         }
     }
 
     public void resetBatching(String clientId,String subscriptionName){
-        batchDatabase.remove(new SubscriptionKey(clientId,subscriptionName));
+        MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
+        if(sub!=null){
+            sub.resetBatching();
+        }
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?view=auto&rev=490794
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Thu Dec 28 12:50:04 2006
@@ -0,0 +1,89 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.memory;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+
+/**
+ * A holder for a durable subscriber
+ * 
+ * @version $Revision: 1.7 $
+ */
+class MemoryTopicSub{
+
+    private Map map=new LinkedHashMap();
+    private MessageId lastBatch;
+
+    void addMessage(MessageId id,Message message){
+        map.put(id,message);
+    }
+
+    void removeMessage(MessageId id){
+        map.remove(id);
+    }
+
+    int size(){
+        return map.size();
+    }
+
+    void recoverSubscription(MessageRecoveryListener listener) throws Exception{
+        for(Iterator iter=map.entrySet().iterator();iter.hasNext();){
+            Map.Entry entry=(Entry)iter.next();
+            Object msg=entry.getValue();
+            if(msg.getClass()==String.class){
+                listener.recoverMessageReference((String)msg);
+            }else{
+                listener.recoverMessage((Message)msg);
+            }
+        }
+        listener.finished();
+    }
+
+    void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+        boolean pastLackBatch=lastBatch==null;
+        MessageId lastId=null;
+        // the message table is a synchronizedMap - so just have to synchronize here
+        int count=0;
+        for(Iterator iter=map.entrySet().iterator();iter.hasNext()&&count<maxReturned;){
+            Map.Entry entry=(Entry)iter.next();
+            if(pastLackBatch){
+                count++;
+                Object msg=entry.getValue();
+                lastId=(MessageId)entry.getKey();
+                if(msg.getClass()==String.class){
+                    listener.recoverMessageReference((String)msg);
+                }else{
+                    listener.recoverMessage((Message)msg);
+                }
+            }else{
+                pastLackBatch=entry.getKey().equals(lastBatch);
+            }
+        }
+        if(lastId!=null){
+            lastBatch=lastId;
+        }
+        listener.finished();
+    }
+
+    void resetBatching(){
+        lastBatch=null;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
------------------------------------------------------------------------------
    svn:executable = *