You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/28 21:50:05 UTC
svn commit: r490794 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory:
MemoryTopicMessageStore.java MemoryTopicSub.java
Author: rajdavies
Date: Thu Dec 28 12:50:04 2006
New Revision: 490794
URL: http://svn.apache.org/viewvc?view=rev&rev=490794
Log:
Provide a more robust implementation, and one that deletes messages after they have been consumed
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=490794&r1=490793&r2=490794
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Thu Dec 28 12:50:04 2006
@@ -18,7 +18,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
@@ -28,6 +27,7 @@
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.SubscriptionKey;
/**
@@ -35,42 +35,45 @@
*/
public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{
- private Map ackDatabase;
private Map subscriberDatabase;
- private Map batchDatabase;
- MessageId lastMessageId;
+ private Map topicSubMap;
public MemoryTopicMessageStore(ActiveMQDestination destination){
- this(destination,new LinkedHashMap(),makeMap(),makeMap(),makeMap());
+ this(destination,new LRUCache(100,100,0.75f,false),makeMap());
}
protected static Map makeMap(){
return Collections.synchronizedMap(new HashMap());
}
- public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase,
- Map ackDatabase, Map batchDatabase){
+ public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase){
super(destination,messageTable);
this.subscriberDatabase=subscriberDatabase;
- this.ackDatabase=ackDatabase;
- this.batchDatabase=batchDatabase;
+ this.topicSubMap=makeMap();
}
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
super.addMessage(context,message);
- lastMessageId=message.getMessageId();
+ for(Iterator i=topicSubMap.values().iterator();i.hasNext();){
+ MemoryTopicSub sub=(MemoryTopicSub)i.next();
+ sub.addMessage(message.getMessageId(),message);
+ }
}
- public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
- throws IOException{
- ackDatabase.put(new SubscriptionKey(clientId,subscriptionName),messageId);
+ public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
+ MessageId messageId) throws IOException{
+ SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+ MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(key);
+ if(sub!=null){
+ sub.removeMessage(messageId);
+ }
}
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
}
- public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
@@ -78,112 +81,62 @@
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
- subscriberDatabase.put(key,info);
- MessageId l=retroactive?null:lastMessageId;
- if(l!=null){
- ackDatabase.put(key,l);
+ MemoryTopicSub sub=new MemoryTopicSub();
+ topicSubMap.put(key,sub);
+ if(retroactive){
+ for(Iterator i=messageTable.entrySet().iterator();i.hasNext();){
+ Map.Entry entry=(Entry)i.next();
+ sub.addMessage((MessageId)entry.getKey(),(Message)entry.getValue());
+ }
}
+ subscriberDatabase.put(key,info);
}
public void deleteSubscription(String clientId,String subscriptionName){
org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
- ackDatabase.remove(key);
subscriberDatabase.remove(key);
+ topicSubMap.remove(key);
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
- MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
- boolean pastLastAck=lastAck==null;
- // the message table is a synchronizedMap - so just have to synchronize here
- synchronized(messageTable){
- for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
- Map.Entry entry=(Entry)iter.next();
- if(pastLastAck){
- Object msg=entry.getValue();
- if(msg.getClass()==String.class){
- listener.recoverMessageReference((String)msg);
- }else{
- listener.recoverMessage((Message)msg);
- }
- }else{
- pastLastAck=entry.getKey().equals(lastAck);
- }
- }
- listener.finished();
+ MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
+ if(sub!=null){
+ sub.recoverSubscription(listener);
}
-
}
-
public void delete(){
super.delete();
- ackDatabase.clear();
subscriberDatabase.clear();
- lastMessageId=null;
+ topicSubMap.clear();
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
-
- public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ public synchronized int getMessageCount(String clientId,String subscriberName) throws IOException{
int result=0;
- MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
- // the message table is a synchronizedMap - so just have to synchronize here
- synchronized(messageTable){
- result=messageTable.size();
- if(lastAck!=null){
- for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
- Map.Entry entry=(Entry)iter.next();
- if(entry.getKey().equals(lastAck)){
- break;
- }
- result--;
- }
- }
+ MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriberName));
+ if(sub!=null){
+ result=sub.size();
}
return result;
}
-
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception{
- SubscriptionKey key = new SubscriptionKey(clientId,subscriptionName);
- MessageId lastBatch = (MessageId)batchDatabase.get(key);
- if (lastBatch==null) {
- //if last batch null - start from last ack
- lastBatch = (MessageId)ackDatabase.get(key);
- }
- boolean pastLackBatch=lastBatch==null;
- MessageId lastId = null;
- // the message table is a synchronizedMap - so just have to synchronize here
- int count = 0;
- synchronized(messageTable){
- for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() &&count < maxReturned ;){
- Map.Entry entry=(Entry)iter.next();
- if(pastLackBatch){
- count++;
- Object msg=entry.getValue();
- lastId = (MessageId)entry.getKey();
- if(msg.getClass()==String.class){
- listener.recoverMessageReference((String)msg);
- }else{
- listener.recoverMessage((Message)msg);
- }
- }else{
- pastLackBatch=entry.getKey().equals(lastBatch);
- }
- }
- if (lastId != null) {
- batchDatabase.put(key,lastId);
- }
- listener.finished();
+ MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
+ if(sub!=null){
+ sub.recoverNextMessages(maxReturned,listener);
}
}
public void resetBatching(String clientId,String subscriptionName){
- batchDatabase.remove(new SubscriptionKey(clientId,subscriptionName));
+ MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
+ if(sub!=null){
+ sub.resetBatching();
+ }
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?view=auto&rev=490794
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Thu Dec 28 12:50:04 2006
@@ -0,0 +1,89 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.memory;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+
+/**
+ * A holder for a durable subscriber
+ *
+ * @version $Revision: 1.7 $
+ */
+class MemoryTopicSub{
+
+ private Map map=new LinkedHashMap();
+ private MessageId lastBatch;
+
+ void addMessage(MessageId id,Message message){
+ map.put(id,message);
+ }
+
+ void removeMessage(MessageId id){
+ map.remove(id);
+ }
+
+ int size(){
+ return map.size();
+ }
+
+ void recoverSubscription(MessageRecoveryListener listener) throws Exception{
+ for(Iterator iter=map.entrySet().iterator();iter.hasNext();){
+ Map.Entry entry=(Entry)iter.next();
+ Object msg=entry.getValue();
+ if(msg.getClass()==String.class){
+ listener.recoverMessageReference((String)msg);
+ }else{
+ listener.recoverMessage((Message)msg);
+ }
+ }
+ listener.finished();
+ }
+
+ void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ boolean pastLackBatch=lastBatch==null;
+ MessageId lastId=null;
+ // the message table is a synchronizedMap - so just have to synchronize here
+ int count=0;
+ for(Iterator iter=map.entrySet().iterator();iter.hasNext()&&count<maxReturned;){
+ Map.Entry entry=(Entry)iter.next();
+ if(pastLackBatch){
+ count++;
+ Object msg=entry.getValue();
+ lastId=(MessageId)entry.getKey();
+ if(msg.getClass()==String.class){
+ listener.recoverMessageReference((String)msg);
+ }else{
+ listener.recoverMessage((Message)msg);
+ }
+ }else{
+ pastLackBatch=entry.getKey().equals(lastBatch);
+ }
+ }
+ if(lastId!=null){
+ lastBatch=lastId;
+ }
+ listener.finished();
+ }
+
+ void resetBatching(){
+ lastBatch=null;
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
------------------------------------------------------------------------------
svn:executable = *