You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/29 21:21:11 UTC
svn commit: r491089 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store:
kahadaptor/KahaTopicMessageStore.java kahadaptor/TopicSubContainer.java
memory/MemoryTopicSub.java rapid/RapidTopicMessageStore.java
Author: rajdavies
Date: Fri Dec 29 12:21:10 2006
New Revision: 491089
URL: http://svn.apache.org/viewvc?view=rev&rev=491089
Log:
clear last batch id if no more messages left to dispatch to a durable subscriber
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=491089&r1=491088&r2=491089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Fri Dec 29 12:21:10 2006
@@ -21,7 +21,6 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
@@ -29,10 +28,8 @@
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.rapid.RapidMessageReference;
/**
* @version $Revision: 1.5 $
@@ -70,7 +67,7 @@
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
- container.getListContainer().add(ref);
+ container.add(ref);
}
}
}
@@ -80,7 +77,10 @@
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
- ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+ ConsumerMessageRef ref=container.remove();
+ if(container.isEmpty()){
+ container.reset();
+ }
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
@@ -112,7 +112,7 @@
if(!subscriberContainer.containsKey(key)){
subscriberContainer.put(key,info);
}
- //add the subscriber
+ // add the subscriber
ListContainer container=addSubscriberMessageContainer(key);
if(retroactive){
for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
@@ -135,7 +135,7 @@
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
- for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+ for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
Object msg=messageContainer.get(ref.getMessageEntry());
if(msg!=null){
@@ -158,14 +158,16 @@
int count=0;
StoreEntry entry=container.getBatchEntry();
if(entry==null){
- entry=container.getListContainer().getFirst();
+ entry=container.getEntry();
}else{
- entry=container.getListContainer().refresh(entry);
- entry=container.getListContainer().getNext(entry);
+ entry=container.refreshEntry(entry);
+ if(entry!=null){
+ entry=container.getNextEntry(entry);
+ }
}
if(entry!=null){
do{
- ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
+ ConsumerMessageRef consumerRef=container.get(entry);
Object msg=messageContainer.get(consumerRef.getMessageEntry());
if(msg!=null){
if(msg.getClass()==String.class){
@@ -178,7 +180,7 @@
count++;
}
container.setBatchEntry(entry);
- entry=container.getListContainer().getNext(entry);
+ entry=container.getNextEntry(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
}
@@ -210,11 +212,11 @@
subscriberMessages.put(key,tsc);
return container;
}
-
- protected void removeSubscriberMessageContainer(Object key) throws IOException {
+
+ protected void removeSubscriberMessageContainer(Object key) throws IOException{
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
- for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+ for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
@@ -234,7 +236,7 @@
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- return container.getListContainer().size();
+ return container.size();
}
/**
@@ -251,8 +253,6 @@
messageContainer.add(messageRef);
}
-
-
/**
* @param identity
* @return String
@@ -263,7 +263,6 @@
return null;
}
-
/**
* @param context
* @throws IOException
@@ -274,11 +273,10 @@
ackContainer.clear();
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
- container.getListContainer().clear();
+ container.clear();
}
}
-
public synchronized void resetBatching(String clientId,String subscriptionName){
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=491089&r1=491088&r2=491089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Fri Dec 29 12:21:10 2006
@@ -14,6 +14,7 @@
package org.apache.activemq.store.kahadaptor;
+import java.util.Iterator;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.StoreEntry;
@@ -44,22 +45,54 @@
this.batchEntry=batchEntry;
}
- /**
- * @return the listContainer
- */
- public ListContainer getListContainer(){
- return this.listContainer;
+
+ public void reset() {
+ batchEntry = null;
+ }
+
+ public boolean isEmpty() {
+ return listContainer.isEmpty();
}
- /**
- * @param listContainer the listContainer to set
- */
- public void setListContainer(ListContainer container){
- this.listContainer=container;
+ public void add(ConsumerMessageRef ref) {
+ listContainer.add(ref);
}
- public void reset() {
- batchEntry = null;
+ public ConsumerMessageRef remove() {
+ ConsumerMessageRef result = (ConsumerMessageRef)listContainer.removeFirst();
+ if (listContainer.isEmpty()) {
+ reset();
+ }
+ return result;
+ }
+
+ public ConsumerMessageRef get(StoreEntry entry) {
+ return (ConsumerMessageRef)listContainer.get(entry);
+ }
+
+ public StoreEntry getEntry() {
+ return listContainer.getFirst();
+ }
+
+ public StoreEntry refreshEntry(StoreEntry entry) {
+ return listContainer.refresh(entry);
+ }
+
+ public StoreEntry getNextEntry(StoreEntry entry) {
+ return listContainer.getNext(entry);
+ }
+
+ public Iterator iterator() {
+ return listContainer.iterator();
+ }
+
+ public int size() {
+ return listContainer.size();
+ }
+
+ public void clear() {
+ reset();
+ listContainer.clear();
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?view=diff&rev=491089&r1=491088&r2=491089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Fri Dec 29 12:21:10 2006
@@ -38,6 +38,9 @@
void removeMessage(MessageId id){
map.remove(id);
+ if (map.isEmpty()) {
+ lastBatch=null;
+ }
}
int size(){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=491089&r1=491088&r2=491089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri Dec 29 12:21:10 2006
@@ -22,7 +22,6 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
@@ -78,7 +77,7 @@
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
- container.getListContainer().add(ref);
+ container.add(ref);
}
}
}
@@ -88,7 +87,7 @@
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
- ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+ ConsumerMessageRef ref=(ConsumerMessageRef)container.remove();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
@@ -142,7 +141,7 @@
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
- for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+ for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(ref
.getMessageEntry());
@@ -163,14 +162,14 @@
int count=0;
StoreEntry entry=container.getBatchEntry();
if(entry==null){
- entry=container.getListContainer().getFirst();
+ entry=container.getEntry();
}else{
- entry=container.getListContainer().refresh(entry);
- entry=container.getListContainer().getNext(entry);
+ entry=container.refreshEntry(entry);
+ entry=container.getNextEntry(entry);
}
if(entry!=null){
do{
- ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
+ ConsumerMessageRef consumerRef=container.get(entry);
RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(consumerRef
.getMessageEntry());
if(messageReference!=null){
@@ -179,7 +178,7 @@
count++;
}
container.setBatchEntry(entry);
- entry=container.getListContainer().getNext(entry);
+ entry=container.getNextEntry(entry);
}while(entry!=null&&count<maxReturned && listener.hasSpace());
}
}
@@ -210,7 +209,7 @@
protected void removeSubscriberMessageContainer(Object key) throws IOException {
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
- for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+ for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
@@ -230,7 +229,7 @@
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- return container.getListContainer().size();
+ return container.size();
}
/**
@@ -271,7 +270,7 @@
ackContainer.clear();
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
- container.getListContainer().clear();
+ container.clear();
}
}
@@ -294,7 +293,7 @@
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
- ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+ ConsumerMessageRef ref=(ConsumerMessageRef)container.remove();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){