You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/21 09:11:04 UTC
svn commit: r477567 [2/2] - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/kahadaptor/
main/java/org/apache/activemq/store/rapid/
test/java/org/apache/activemq/perf/
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=477567&r1=477566&r2=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Tue Nov 21 00:11:03 2006
@@ -15,32 +15,27 @@
package org.apache.activemq.store.rapid;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
-import org.apache.activemq.kaha.StringMarshaller;
+import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.util.SubscriptionKey;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.activemq.store.kahadaptor.ConsumerMessageRef;
+import org.apache.activemq.store.kahadaptor.ConsumerMessageRefMarshaller;
+import org.apache.activemq.store.kahadaptor.TopicSubAck;
+import org.apache.activemq.store.kahadaptor.TopicSubContainer;
/**
@@ -50,78 +45,61 @@
*/
public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore{
- private static final Log log=LogFactory.getLog(RapidTopicMessageStore.class);
- private HashMap ackedLastAckLocations=new HashMap();
- private final MapContainer subscriberContainer;
- private final MapContainer ackContainer;
- private final Store store;
- private Map subscriberAcks=new ConcurrentHashMap();
-
- public RapidTopicMessageStore(RapidPersistenceAdapter adapter,ActiveMQTopic destination,
- MapContainer messageContainer,MapContainer subsContainer,MapContainer ackContainer) throws IOException{
- super(adapter,destination,messageContainer);
- this.subscriberContainer=subsContainer;
+ private ListContainer ackContainer;
+ private Map subscriberContainer;
+ private Store store;
+ private Map subscriberMessages=new ConcurrentHashMap();
+
+ public RapidTopicMessageStore(RapidPersistenceAdapter adapter, Store store,ListContainer messageContainer,ListContainer ackContainer,
+ MapContainer subsContainer,ActiveMQDestination destination,int maximumCacheSize) throws IOException{
+ super(adapter,destination,messageContainer,maximumCacheSize);
+ this.store=store;
this.ackContainer=ackContainer;
- this.store=adapter.getStore();
+ subscriberContainer=subsContainer;
+ // load all the Ack containers
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
Object key=i.next();
- addSubscriberAckContainer(key);
+ addSubscriberMessageContainer(key);
}
}
- public void recoverSubscription(String clientId,String subscriptionName,final MessageRecoveryListener listener)
- throws Exception{
- String key=getSubscriptionKey(clientId,subscriptionName);
- ListContainer list=(ListContainer)subscriberAcks.get(key);
- if(list!=null){
- for(Iterator i=list.iterator();i.hasNext();){
- Object msg=messageContainer.get(i.next());
- if(msg!=null){
- if(msg.getClass()==String.class){
- listener.recoverMessageReference((String)msg);
- }else{
- listener.recoverMessage((Message)msg);
- }
- }
- listener.finished();
+ public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+ int subscriberCount=subscriberMessages.size();
+ if(subscriberCount>0){
+ final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
+ final RapidMessageReference md = new RapidMessageReference(message, location);
+ StoreEntry messageEntry=messageContainer.placeLast(md);
+ TopicSubAck tsa=new TopicSubAck();
+ tsa.setCount(subscriberCount);
+ tsa.setMessageEntry(messageEntry);
+ StoreEntry ackEntry=ackContainer.placeLast(tsa);
+ for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+ TopicSubContainer container=(TopicSubContainer)i.next();
+ ConsumerMessageRef ref=new ConsumerMessageRef();
+ ref.setAckEntry(ackEntry);
+ ref.setMessageEntry(messageEntry);
+ container.getListContainer().add(ref);
}
- }else{
- listener.finished();
}
}
- public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
- MessageRecoveryListener listener) throws Exception{
- String key=getSubscriptionKey(clientId,subscriptionName);
- ListContainer list=(ListContainer)subscriberAcks.get(key);
- if(list!=null){
- boolean startFound=false;
- int count=0;
- for(Iterator i=list.iterator();i.hasNext()&&count<maxReturned;){
- Object msg=messageContainer.get(i.next());
- if(msg!=null){
- if(msg.getClass()==String.class){
- String ref=msg.toString();
- if(startFound||lastMessageId==null){
- listener.recoverMessageReference(ref);
- count++;
- }else if(!startFound||ref.equals(lastMessageId.toString())){
- startFound=true;
- }
+ public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
+ MessageId messageId) throws IOException{
+ String subcriberId=getSubscriptionKey(clientId,subscriptionName);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
+ if(container!=null){
+ ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+ if(ref!=null){
+ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+ if(tsa!=null){
+ if(tsa.decrementCount()<=0){
+ ackContainer.remove(ref.getAckEntry());
+ messageContainer.remove(tsa.getMessageEntry());
}else{
- Message message=(Message)msg;
- if(startFound||lastMessageId==null){
- listener.recoverMessage(message);
- count++;
- }else if(!startFound&&message.getMessageId().equals(lastMessageId)){
- startFound=true;
- }
+ ackContainer.update(ref.getAckEntry(),tsa);
}
}
- listener.finished();
}
- }else{
- listener.finished();
}
}
@@ -129,7 +107,7 @@
return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
}
- public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
@@ -142,122 +120,82 @@
if(!subscriberContainer.containsKey(key)){
subscriberContainer.put(key,info);
}
- addSubscriberAckContainer(key);
- }
-
- public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
- int subscriberCount=subscriberAcks.size();
- if(subscriberCount>0){
- String id=message.getMessageId().toString();
- ackContainer.put(id,new AtomicInteger(subscriberCount));
- for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
- Object key=i.next();
- ListContainer container=store.getListContainer(key,"durable-subs");
- container.add(id);
- }
- super.addMessage(context,message);
- }
+ addSubscriberMessageContainer(key);
}
- /**
- */
- public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId)
- throws IOException{
- final boolean debug=log.isDebugEnabled();
- JournalTopicAck ack=new JournalTopicAck();
- ack.setDestination(destination);
- ack.setMessageId(messageId);
- ack.setMessageSequenceId(messageId.getBrokerSequenceId());
- ack.setSubscritionName(subscriptionName);
- ack.setClientId(clientId);
- ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
- final Location location=peristenceAdapter.writeCommand(ack,false);
- final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
- if(!context.isInTransaction()){
- if(debug)
- log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
- acknowledge(messageId,location,key);
- }else{
- if(debug)
- log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
- synchronized(this){
- inFlightTxLocations.add(location);
- }
- transactionStore.acknowledge(this,ack,location);
- context.getTransaction().addSynchronization(new Synchronization(){
-
- public void afterCommit() throws Exception{
- if(debug)
- log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
- synchronized(RapidTopicMessageStore.this){
- inFlightTxLocations.remove(location);
- acknowledge(messageId,location,key);
- }
- }
-
- public void afterRollback() throws Exception{
- if(debug)
- log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
- synchronized(RapidTopicMessageStore.this){
- inFlightTxLocations.remove(location);
+ public synchronized void deleteSubscription(String clientId,String subscriptionName){
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ subscriberContainer.remove(key);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+ for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+ ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+ if(ref!=null){
+ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+ if(tsa!=null){
+ if(tsa.decrementCount()<=0){
+ ackContainer.remove(ref.getAckEntry());
+ messageContainer.remove(tsa.getMessageEntry());
+ }else{
+ ackContainer.update(ref.getAckEntry(),tsa);
}
}
- });
+ }
}
}
- public void replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,MessageId messageId){
- try{
- synchronized(this){
- String subcriberId=getSubscriptionKey(clientId,subscritionName);
- String id=messageId.toString();
- ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
- if(container!=null){
- // container.remove(id);
- container.removeFirst();
- AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
- if(count!=null){
- if(count.decrementAndGet()>0){
- ackContainer.put(id,count);
- }else{
- // no more references to message messageContainer so remove it
- messageContainer.remove(messageId.toString());
- }
- }
+ public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
+ throws Exception{
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+ if(container!=null){
+ for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+ ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+ RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(ref
+ .getMessageEntry());
+ if(messageReference!=null){
+ Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
+ listener.recoverMessage(m);
}
}
- }catch(Throwable e){
- log.debug("Could not replay acknowledge for message '"+messageId
- +"'. Message may have already been acknowledged. reason: "+e);
}
+ listener.finished();
}
- /**
- * @param messageId
- * @param location
- * @param key
- */
- private void acknowledge(MessageId messageId,Location location,SubscriptionKey key){
- synchronized(this){
- lastLocation=location;
- ackedLastAckLocations.put(key,messageId);
- String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName());
- String id=messageId.toString();
- ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
- if(container!=null){
- // container.remove(id);
- container.removeFirst();
- AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
- if(count!=null){
- if(count.decrementAndGet()>0){
- ackContainer.put(id,count);
- }else{
- // no more references to message messageContainer so remove it
- messageContainer.remove(messageId.toString());
+ public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+ MessageRecoveryListener listener) throws Exception{
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+ if(container!=null){
+ int count=0;
+ StoreEntry entry=container.getBatchEntry();
+ if(entry==null){
+ entry=container.getListContainer().getFirst();
+ }else{
+ entry=container.getListContainer().refresh(entry);
+ entry=container.getListContainer().getNext(entry);
+ }
+ if(entry!=null){
+ do{
+ ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
+ RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(consumerRef
+ .getMessageEntry());
+ if(messageReference!=null){
+ Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
+ listener.recoverMessage(m);
+ count++;
}
- }
+ container.setBatchEntry(entry);
+ entry=container.getListContainer().getNext(entry);
+ }while(entry!=null&&count<maxReturned);
}
}
+ listener.finished();
+ }
+
+
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+ return (SubscriptionInfo[])subscriberContainer.values().toArray(
+ new SubscriptionInfo[subscriberContainer.size()]);
}
protected String getSubscriptionKey(String clientId,String subscriberName){
@@ -266,73 +204,102 @@
return result;
}
- public Location checkpoint() throws IOException{
- ArrayList cpAckedLastAckLocations;
- // swap out the hash maps..
- synchronized(this){
- cpAckedLastAckLocations=new ArrayList(this.ackedLastAckLocations.values());
- this.ackedLastAckLocations=new HashMap();
- }
- Location rc=super.checkpoint();
- if(!cpAckedLastAckLocations.isEmpty()){
- Collections.sort(cpAckedLastAckLocations);
- Location t=(Location)cpAckedLastAckLocations.get(0);
- if(rc==null||t.compareTo(rc)<0){
- rc=t;
- }
- }
- return rc;
+ protected void addSubscriberMessageContainer(Object key) throws IOException{
+ ListContainer container=store.getListContainer(key,"topic-subs");
+ Marshaller marshaller=new ConsumerMessageRefMarshaller();
+ container.setMarshaller(marshaller);
+ TopicSubContainer tsc=new TopicSubContainer(container);
+ subscriberMessages.put(key,tsc);
}
- public void deleteSubscription(String clientId,String subscriptionName) throws IOException{
- String key=getSubscriptionKey(clientId,subscriptionName);
- subscriberContainer.remove(key);
- ListContainer list=(ListContainer)subscriberAcks.get(key);
- for(Iterator i=list.iterator();i.hasNext();){
- String id=i.next().toString();
- AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
- if(count!=null){
- if(count.decrementAndGet()>0){
- ackContainer.put(id,count);
- }else{
- // no more references to message messageContainer so remove it
- messageContainer.remove(id);
- }
- }
- }
+ public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ String key=getSubscriptionKey(clientId,subscriberName);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+ return container.getListContainer().size();
}
- public SubscriptionInfo[] getAllSubscriptions() throws IOException{
- return (SubscriptionInfo[])subscriberContainer.values().toArray(
- new SubscriptionInfo[subscriberContainer.size()]);
+ /**
+ * @param context
+ * @param messageId
+ * @param expirationTime
+ * @param messageRef
+ * @throws IOException
+ * @see org.apache.activemq.store.MessageStore#addMessageReference(org.apache.activemq.broker.ConnectionContext,
+ * org.apache.activemq.command.MessageId, long, java.lang.String)
+ */
+ public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
+ throws IOException{
+ throw new IOException("Not supported");
}
- protected void addSubscriberAckContainer(Object key) throws IOException{
- ListContainer container=store.getListContainer(key,"durable-subs");
- Marshaller marshaller=new StringMarshaller();
- container.setMarshaller(marshaller);
- subscriberAcks.put(key,container);
+
+
+ /**
+ * @param identity
+ * @return String
+ * @throws IOException
+ * @see org.apache.activemq.store.MessageStore#getMessageReference(org.apache.activemq.command.MessageId)
+ */
+ public String getMessageReference(MessageId identity) throws IOException{
+ return null;
}
+
- public int getMessageCount(String clientId,String subscriberName) throws IOException{
- String key=getSubscriptionKey(clientId,subscriberName);
- ListContainer list=(ListContainer)subscriberAcks.get(key);
- return list.size();
+ /**
+ * @param context
+ * @throws IOException
+ * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
+ */
+ public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
+ messageContainer.clear();
+ ackContainer.clear();
+ for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+ TopicSubContainer container=(TopicSubContainer)i.next();
+ container.getListContainer().clear();
+ }
}
- public void resetBatching(String clientId,String subscriptionName,MessageId nextId){
+
+ public synchronized void resetBatching(String clientId,String subscriptionName){
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
+ if(topicSubContainer!=null){
+ topicSubContainer.reset();
+ }
}
-
- public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
-
-
+
+ public Location checkpoint() throws IOException{
+ return null;
}
-
- public void resetBatching(String clientId,String subscriptionName){
-
-
+
+ public synchronized void replayAcknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId){
+ String subcriberId=getSubscriptionKey(clientId,subscriptionName);
+ TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
+ if(container!=null){
+ ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+ if(ref!=null){
+ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+ if(tsa!=null){
+ if(tsa.decrementCount()<=0){
+ ackContainer.remove(ref.getAckEntry());
+ messageContainer.remove(tsa.getMessageEntry());
+ }else{
+ ackContainer.update(ref.getAckEntry(),tsa);
+ }
+ }
+ }
+ }
}
-}
\ No newline at end of file
+}
+
+
+
+
+
+
+
+
+
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java?view=auto&rev=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java Tue Nov 21 00:11:03 2006
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.perf;
+
+import java.io.File;
+import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.rapid.RapidPersistenceAdapter;
+/**
+ * @version $Revision: 1.3 $
+ */
+public class RapidStoreQueueTest extends SimpleQueueTest{
+
+
+ protected void configureBroker(BrokerService answer) throws Exception{
+
+ File dataFileDir = new File("activemq-data/perfTest");
+ File journalDir = new File(dataFileDir, "journal").getCanonicalFile();
+ JournalImpl journal = new JournalImpl(journalDir, 3, 1024*1024*20);
+
+ RapidPersistenceAdapter adaptor = new RapidPersistenceAdapter(journal,answer.getTaskRunnerFactory());
+
+
+ answer.setPersistenceAdapter(adaptor);
+ answer.addConnector(bindAddress);
+ answer.setDeleteAllMessagesOnStartup(true);
+
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native