You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC
svn commit: r563982 [21/32] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jm...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Wed Aug 8 11:56:59 2007
@@ -32,52 +32,54 @@
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicReferenceStore;
-public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore{
+public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
protected ListContainer<TopicSubAck> ackContainer;
private Map subscriberContainer;
private Store store;
- protected Map subscriberMessages=new ConcurrentHashMap();
+ protected Map subscriberMessages = new ConcurrentHashMap();
- public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer messageContainer,ListContainer ackContainer,
- MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
- super(adapter,messageContainer,destination);
- this.store=store;
- this.ackContainer=ackContainer;
- subscriberContainer=subsContainer;
+ public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
+ MapContainer messageContainer, ListContainer ackContainer,
+ MapContainer subsContainer, ActiveMQDestination destination)
+ throws IOException {
+ super(adapter, messageContainer, destination);
+ this.store = store;
+ this.ackContainer = ackContainer;
+ subscriberContainer = subsContainer;
// load all the Ack containers
- for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
- String key=(String) i.next();
+ for (Iterator i = subscriberContainer.keySet().iterator(); i.hasNext();) {
+ String key = (String)i.next();
addSubscriberMessageContainer(key);
}
}
- protected MessageId getMessageId(Object object){
+ protected MessageId getMessageId(Object object) {
return new MessageId(((ReferenceRecord)object).getMessageId());
}
- public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+ public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
- public synchronized Message getMessage(MessageId identity) throws IOException{
+ public synchronized Message getMessage(MessageId identity) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
-
- public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){
- final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
- final int subscriberCount=subscriberMessages.size();
- if(subscriberCount>0){
- final StoreEntry messageEntry=messageContainer.place(messageId,record);
+ public void addMessageReference(final ConnectionContext context, final MessageId messageId,
+ final ReferenceData data) {
+ final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+ final int subscriberCount = subscriberMessages.size();
+ if (subscriberCount > 0) {
+ final StoreEntry messageEntry = messageContainer.place(messageId, record);
addInterest(record);
- final TopicSubAck tsa=new TopicSubAck();
+ final TopicSubAck tsa = new TopicSubAck();
tsa.setCount(subscriberCount);
tsa.setMessageEntry(messageEntry);
- final StoreEntry ackEntry=ackContainer.placeLast(tsa);
- for(final Iterator i=subscriberMessages.values().iterator();i.hasNext();){
- final TopicSubContainer container=(TopicSubContainer)i.next();
- final ConsumerMessageRef ref=new ConsumerMessageRef();
+ final StoreEntry ackEntry = ackContainer.placeLast(tsa);
+ for (final Iterator i = subscriberMessages.values().iterator(); i.hasNext();) {
+ final TopicSubContainer container = (TopicSubContainer)i.next();
+ final ConsumerMessageRef ref = new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
ref.setMessageId(messageId);
@@ -86,154 +88,152 @@
}
}
- public ReferenceData getMessageReference(final MessageId identity) throws IOException{
- final ReferenceRecord result=messageContainer.get(identity);
- if(result==null)
+ public ReferenceData getMessageReference(final MessageId identity) throws IOException {
+ final ReferenceRecord result = messageContainer.get(identity);
+ if (result == null)
return null;
return result.getData();
}
- public void addReferenceFileIdsInUse(){
- for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
- TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
- if(subAck.getCount()>0){
- ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
+ public void addReferenceFileIdsInUse() {
+ for (StoreEntry entry = ackContainer.getFirst(); entry != null; entry = ackContainer.getNext(entry)) {
+ TopicSubAck subAck = (TopicSubAck)ackContainer.get(entry);
+ if (subAck.getCount() > 0) {
+ ReferenceRecord rr = (ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
addInterest(rr);
}
}
}
- protected ListContainer addSubscriberMessageContainer(String key) throws IOException{
- ListContainer container=store.getListContainer(destination,"topic-subs-references-"+key);
- Marshaller marshaller=new ConsumerMessageRefMarshaller();
+ protected ListContainer addSubscriberMessageContainer(String key) throws IOException {
+ ListContainer container = store.getListContainer(destination, "topic-subs-references-" + key);
+ Marshaller marshaller = new ConsumerMessageRefMarshaller();
container.setMarshaller(marshaller);
- TopicSubContainer tsc=new TopicSubContainer(container);
- subscriberMessages.put(key,tsc);
+ TopicSubContainer tsc = new TopicSubContainer(container);
+ subscriberMessages.put(key, tsc);
return container;
}
- public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
- MessageId messageId) throws IOException{
- String key=getSubscriptionKey(clientId,subscriptionName);
-
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- if(container!=null){
- ConsumerMessageRef ref=container.remove(messageId);
- if(ref!=null){
- TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
- if(tsa!=null){
- if(tsa.decrementCount()<=0){
- StoreEntry entry=ref.getAckEntry();
- entry=ackContainer.refresh(entry);
+ public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+ MessageId messageId) throws IOException {
+ String key = getSubscriptionKey(clientId, subscriptionName);
+
+ TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+ if (container != null) {
+ ConsumerMessageRef ref = container.remove(messageId);
+ if (ref != null) {
+ TopicSubAck tsa = (TopicSubAck)ackContainer.get(ref.getAckEntry());
+ if (tsa != null) {
+ if (tsa.decrementCount() <= 0) {
+ StoreEntry entry = ref.getAckEntry();
+ entry = ackContainer.refresh(entry);
ackContainer.remove(entry);
- ReferenceRecord rr=messageContainer.get(messageId);
- if(rr!=null){
- entry=tsa.getMessageEntry();
- entry=messageContainer.refresh(entry);
+ ReferenceRecord rr = messageContainer.get(messageId);
+ if (rr != null) {
+ entry = tsa.getMessageEntry();
+ entry = messageContainer.refresh(entry);
messageContainer.remove(entry);
removeInterest(rr);
}
- }else{
-
- ackContainer.update(ref.getAckEntry(),tsa);
+ } else {
+
+ ackContainer.update(ref.getAckEntry(), tsa);
}
}
}
}
}
- public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
- throws IOException{
- String key=getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
+ public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
+ String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
// if already exists - won't add it again as it causes data files
// to hang around
- if(!subscriberContainer.containsKey(key)){
- subscriberContainer.put(key,info);
+ if (!subscriberContainer.containsKey(key)) {
+ subscriberContainer.put(key, info);
adapter.addSubscriberState(info);
}
// add the subscriber
- ListContainer container=addSubscriberMessageContainer(key);
- if(retroactive){
+ ListContainer container = addSubscriberMessageContainer(key);
+ if (retroactive) {
/*
- for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
- TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
- ConsumerMessageRef ref=new ConsumerMessageRef();
- ref.setAckEntry(entry);
- ref.setMessageEntry(tsa.getMessageEntry());
- container.add(ref);
- }
- */
+ * for(StoreEntry
+ * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
+ * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
+ * ConsumerMessageRef ref=new ConsumerMessageRef();
+ * ref.setAckEntry(entry);
+ * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); }
+ */
}
}
- public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
- SubscriptionInfo info = lookupSubscription(clientId,subscriptionName);
+ public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+ SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
if (info != null) {
adapter.removeSubscriberState(info);
}
- String key=getSubscriptionKey(clientId,subscriptionName);
+ String key = getSubscriptionKey(clientId, subscriptionName);
removeSubscriberMessageContainer(key);
}
- public SubscriptionInfo[] getAllSubscriptions() throws IOException{
- return (SubscriptionInfo[])subscriberContainer.values().toArray(
- new SubscriptionInfo[subscriberContainer.size()]);
- }
-
- public int getMessageCount(String clientId,String subscriberName) throws IOException{
- String key=getSubscriptionKey(clientId,subscriberName);
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- return container != null ? container.size() : 0;
- }
-
- public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
- return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
- }
-
- public synchronized void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
- MessageRecoveryListener listener) throws Exception{
- String key=getSubscriptionKey(clientId,subscriptionName);
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- if(container!=null){
- int count=0;
- StoreEntry entry=container.getBatchEntry();
- if(entry==null){
- entry=container.getEntry();
- }else{
- entry=container.refreshEntry(entry);
- if(entry!=null){
- entry=container.getNextEntry(entry);
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ return (SubscriptionInfo[])subscriberContainer.values()
+ .toArray(new SubscriptionInfo[subscriberContainer.size()]);
+ }
+
+ public int getMessageCount(String clientId, String subscriberName) throws IOException {
+ String key = getSubscriptionKey(clientId, subscriberName);
+ TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+ return container != null ? container.size() : 0;
+ }
+
+ public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+ return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
+ }
+
+ public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
+ MessageRecoveryListener listener) throws Exception {
+ String key = getSubscriptionKey(clientId, subscriptionName);
+ TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+ if (container != null) {
+ int count = 0;
+ StoreEntry entry = container.getBatchEntry();
+ if (entry == null) {
+ entry = container.getEntry();
+ } else {
+ entry = container.refreshEntry(entry);
+ if (entry != null) {
+ entry = container.getNextEntry(entry);
}
}
-
- if(entry!=null){
- do{
- ConsumerMessageRef consumerRef=container.get(entry);
- ReferenceRecord msg=messageContainer.getValue(consumerRef.getMessageEntry());
- if(msg!=null){
- recoverReference(listener,msg);
+
+ if (entry != null) {
+ do {
+ ConsumerMessageRef consumerRef = container.get(entry);
+ ReferenceRecord msg = messageContainer.getValue(consumerRef.getMessageEntry());
+ if (msg != null) {
+ recoverReference(listener, msg);
count++;
- container.setBatchEntry(msg.getMessageId(),entry);
- }else {
+ container.setBatchEntry(msg.getMessageId(), entry);
+ } else {
container.reset();
}
-
- entry=container.getNextEntry(entry);
- }while(entry!=null&&count<maxReturned&&listener.hasSpace());
+
+ entry = container.getNextEntry(entry);
+ } while (entry != null && count < maxReturned && listener.hasSpace());
}
}
}
- public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
- throws Exception{
- String key=getSubscriptionKey(clientId,subscriptionName);
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
- if(container!=null){
- for(Iterator i=container.iterator();i.hasNext();){
- ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
- ReferenceRecord msg=messageContainer.get(ref.getMessageEntry());
- if(msg!=null){
- if(!recoverReference(listener,msg)){
+ public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
+ throws Exception {
+ String key = getSubscriptionKey(clientId, subscriptionName);
+ TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+ if (container != null) {
+ for (Iterator i = container.iterator(); i.hasNext();) {
+ ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
+ ReferenceRecord msg = messageContainer.get(ref.getMessageEntry());
+ if (msg != null) {
+ if (!recoverReference(listener, msg)) {
break;
}
}
@@ -241,37 +241,37 @@
}
}
- public synchronized void resetBatching(String clientId,String subscriptionName){
- String key=getSubscriptionKey(clientId,subscriptionName);
- TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
- if(topicSubContainer!=null){
+ public synchronized void resetBatching(String clientId, String subscriptionName) {
+ String key = getSubscriptionKey(clientId, subscriptionName);
+ TopicSubContainer topicSubContainer = (TopicSubContainer)subscriberMessages.get(key);
+ if (topicSubContainer != null) {
topicSubContainer.reset();
}
}
- protected void removeSubscriberMessageContainer(String key) throws IOException{
+ protected void removeSubscriberMessageContainer(String key) throws IOException {
subscriberContainer.remove(key);
- TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
- for(Iterator i=container.iterator();i.hasNext();){
- ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
- if(ref!=null){
- TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
- if(tsa!=null){
- if(tsa.decrementCount()<=0){
+ TopicSubContainer container = (TopicSubContainer)subscriberMessages.remove(key);
+ for (Iterator i = container.iterator(); i.hasNext();) {
+ ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
+ if (ref != null) {
+ TopicSubAck tsa = (TopicSubAck)ackContainer.get(ref.getAckEntry());
+ if (tsa != null) {
+ if (tsa.decrementCount() <= 0) {
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
- }else{
- ackContainer.update(ref.getAckEntry(),tsa);
+ } else {
+ ackContainer.update(ref.getAckEntry(), tsa);
}
}
}
}
- store.deleteListContainer(destination,"topic-subs-references-"+key);
+ store.deleteListContainer(destination, "topic-subs-references-" + key);
}
- protected String getSubscriptionKey(String clientId,String subscriberName){
- String result=clientId+":";
- result+=subscriberName!=null?subscriberName:"NOT_SET";
+ protected String getSubscriptionKey(String clientId, String subscriberName) {
+ String result = clientId + ":";
+ result += subscriberName != null ? subscriberName : "NOT_SET";
return result;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java Wed Aug 8 11:56:59 2007
@@ -26,78 +26,79 @@
import org.apache.activemq.store.MessageStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
/**
* Stores a messages/acknowledgements for a transaction
*
* @version $Revision: 1.4 $
*/
-class KahaTransaction{
- private static final Log log=LogFactory.getLog(KahaTransaction.class);
- protected List list=new ArrayList();
-
-
- void add(KahaMessageStore store,BaseCommand command){
- TxCommand tx=new TxCommand();
+class KahaTransaction {
+ private static final Log log = LogFactory.getLog(KahaTransaction.class);
+ protected List list = new ArrayList();
+
+ void add(KahaMessageStore store, BaseCommand command) {
+ TxCommand tx = new TxCommand();
tx.setCommand(command);
tx.setMessageStoreKey(store.getId());
list.add(tx);
}
- Message[] getMessages(){
- List result=new ArrayList();
- for(int i=0;i<list.size();i++){
- TxCommand command=(TxCommand) list.get(i);
- if(command.isAdd()){
+ Message[] getMessages() {
+ List result = new ArrayList();
+ for (int i = 0; i < list.size(); i++) {
+ TxCommand command = (TxCommand)list.get(i);
+ if (command.isAdd()) {
result.add(command.getCommand());
}
}
- Message[] messages=new Message[result.size()];
- return (Message[]) result.toArray(messages);
+ Message[] messages = new Message[result.size()];
+ return (Message[])result.toArray(messages);
}
- MessageAck[] getAcks(){
- List result=new ArrayList();
- for(int i=0;i<list.size();i++){
- TxCommand command=(TxCommand) list.get(i);
- if(command.isRemove()){
+ MessageAck[] getAcks() {
+ List result = new ArrayList();
+ for (int i = 0; i < list.size(); i++) {
+ TxCommand command = (TxCommand)list.get(i);
+ if (command.isRemove()) {
result.add(command.getCommand());
}
}
- MessageAck[] acks=new MessageAck[result.size()];
- return (MessageAck[]) result.toArray(acks);
+ MessageAck[] acks = new MessageAck[result.size()];
+ return (MessageAck[])result.toArray(acks);
}
- void prepare(){}
+ void prepare() {
+ }
- void rollback(){
+ void rollback() {
list.clear();
}
/**
* @throws IOException
*/
- void commit(KahaTransactionStore transactionStore) throws IOException{
- for(int i=0;i<list.size();i++){
- TxCommand command=(TxCommand) list.get(i);
- MessageStore ms=transactionStore.getStoreById(command.getMessageStoreKey());
- if(command.isAdd()){
- ms.addMessage(null,(Message) command.getCommand());
+ void commit(KahaTransactionStore transactionStore) throws IOException {
+ for (int i = 0; i < list.size(); i++) {
+ TxCommand command = (TxCommand)list.get(i);
+ MessageStore ms = transactionStore.getStoreById(command.getMessageStoreKey());
+ if (command.isAdd()) {
+ ms.addMessage(null, (Message)command.getCommand());
}
}
- for(int i=0;i<list.size();i++){
- TxCommand command=(TxCommand) list.get(i);
- MessageStore ms=transactionStore.getStoreById(command.getMessageStoreKey());
- if(command.isRemove()){
- ms.removeMessage(null,(MessageAck) command.getCommand());
+ for (int i = 0; i < list.size(); i++) {
+ TxCommand command = (TxCommand)list.get(i);
+ MessageStore ms = transactionStore.getStoreById(command.getMessageStoreKey());
+ if (command.isRemove()) {
+ ms.removeMessage(null, (MessageAck)command.getCommand());
}
}
}
-
- List getList(){
+
+ List getList() {
return new ArrayList(list);
}
-
- void setList(List list){
+
+ void setList(List list) {
this.list = list;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Wed Aug 8 11:56:59 2007
@@ -33,42 +33,43 @@
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import java.util.concurrent.ConcurrentHashMap;
+
/**
- * Provides a TransactionStore implementation that can create transaction aware MessageStore objects from non
- * transaction aware MessageStore objects.
+ * Provides a TransactionStore implementation that can create transaction aware
+ * MessageStore objects from non transaction aware MessageStore objects.
*
* @version $Revision: 1.4 $
*/
-public class KahaTransactionStore implements TransactionStore{
- private Map transactions=new ConcurrentHashMap();
+public class KahaTransactionStore implements TransactionStore {
+ private Map transactions = new ConcurrentHashMap();
private Map prepared;
private KahaPersistenceAdapter adaptor;
- KahaTransactionStore(KahaPersistenceAdapter adaptor,Map preparedMap){
- this.adaptor=adaptor;
- this.prepared=preparedMap;
+ KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
+ this.adaptor = adaptor;
+ this.prepared = preparedMap;
}
- public MessageStore proxy(MessageStore messageStore){
- return new ProxyMessageStore(messageStore){
- public void addMessage(ConnectionContext context,final Message send) throws IOException{
- KahaTransactionStore.this.addMessage(getDelegate(),send);
+ public MessageStore proxy(MessageStore messageStore) {
+ return new ProxyMessageStore(messageStore) {
+ public void addMessage(ConnectionContext context, final Message send) throws IOException {
+ KahaTransactionStore.this.addMessage(getDelegate(), send);
}
- public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
- KahaTransactionStore.this.removeMessage(getDelegate(),ack);
+ public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+ KahaTransactionStore.this.removeMessage(getDelegate(), ack);
}
};
}
- public TopicMessageStore proxy(TopicMessageStore messageStore){
- return new ProxyTopicMessageStore(messageStore){
- public void addMessage(ConnectionContext context,final Message send) throws IOException{
- KahaTransactionStore.this.addMessage(getDelegate(),send);
+ public TopicMessageStore proxy(TopicMessageStore messageStore) {
+ return new ProxyTopicMessageStore(messageStore) {
+ public void addMessage(ConnectionContext context, final Message send) throws IOException {
+ KahaTransactionStore.this.addMessage(getDelegate(), send);
}
- public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
- KahaTransactionStore.this.removeMessage(getDelegate(),ack);
+ public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+ KahaTransactionStore.this.removeMessage(getDelegate(), ack);
}
};
}
@@ -76,11 +77,11 @@
/**
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
- public void prepare(TransactionId txid){
- KahaTransaction tx=getTx(txid);
- if(tx!=null){
+ public void prepare(TransactionId txid) {
+ KahaTransaction tx = getTx(txid);
+ if (tx != null) {
tx.prepare();
- prepared.put(txid,tx);
+ prepared.put(txid, tx);
}
}
@@ -88,9 +89,9 @@
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
- KahaTransaction tx=getTx(txid);
- if(tx!=null){
+ public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+ KahaTransaction tx = getTx(txid);
+ if (tx != null) {
tx.commit(this);
removeTx(txid);
}
@@ -99,24 +100,26 @@
/**
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
- public void rollback(TransactionId txid){
- KahaTransaction tx=getTx(txid);
- if(tx!=null){
+ public void rollback(TransactionId txid) {
+ KahaTransaction tx = getTx(txid);
+ if (tx != null) {
tx.rollback();
removeTx(txid);
}
}
- public void start() throws Exception{}
+ public void start() throws Exception {
+ }
- public void stop() throws Exception{}
+ public void stop() throws Exception {
+ }
- synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
- for(Iterator i=prepared.entrySet().iterator();i.hasNext();){
- Map.Entry entry=(Entry) i.next();
- XATransactionId xid=(XATransactionId) entry.getKey();
- KahaTransaction kt=(KahaTransaction) entry.getValue();
- listener.recover(xid,kt.getMessages(),kt.getAcks());
+ synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
+ for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
+ Map.Entry entry = (Entry)i.next();
+ XATransactionId xid = (XATransactionId)entry.getKey();
+ KahaTransaction kt = (KahaTransaction)entry.getValue();
+ listener.recover(xid, kt.getMessages(), kt.getAcks());
}
}
@@ -124,12 +127,12 @@
* @param message
* @throws IOException
*/
- void addMessage(final MessageStore destination,final Message message) throws IOException{
- if(message.isInTransaction()){
- KahaTransaction tx=getOrCreateTx(message.getTransactionId());
- tx.add((KahaMessageStore) destination,message);
- }else{
- destination.addMessage(null,message);
+ void addMessage(final MessageStore destination, final Message message) throws IOException {
+ if (message.isInTransaction()) {
+ KahaTransaction tx = getOrCreateTx(message.getTransactionId());
+ tx.add((KahaMessageStore)destination, message);
+ } else {
+ destination.addMessage(null, message);
}
}
@@ -137,43 +140,43 @@
* @param ack
* @throws IOException
*/
- final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{
- if(ack.isInTransaction()){
- KahaTransaction tx=getOrCreateTx(ack.getTransactionId());
- tx.add((KahaMessageStore) destination,ack);
- }else{
- destination.removeMessage(null,ack);
+ final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
+ if (ack.isInTransaction()) {
+ KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
+ tx.add((KahaMessageStore)destination, ack);
+ } else {
+ destination.removeMessage(null, ack);
}
}
- protected synchronized KahaTransaction getTx(TransactionId key){
- KahaTransaction result=(KahaTransaction) transactions.get(key);
- if(result==null){
- result=(KahaTransaction) prepared.get(key);
+ protected synchronized KahaTransaction getTx(TransactionId key) {
+ KahaTransaction result = (KahaTransaction)transactions.get(key);
+ if (result == null) {
+ result = (KahaTransaction)prepared.get(key);
}
return result;
}
- protected synchronized KahaTransaction getOrCreateTx(TransactionId key){
- KahaTransaction result=(KahaTransaction) transactions.get(key);
- if(result==null){
- result=new KahaTransaction();
- transactions.put(key,result);
+ protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
+ KahaTransaction result = (KahaTransaction)transactions.get(key);
+ if (result == null) {
+ result = new KahaTransaction();
+ transactions.put(key, result);
}
return result;
}
- protected synchronized void removeTx(TransactionId key){
+ protected synchronized void removeTx(TransactionId key) {
transactions.remove(key);
prepared.remove(key);
}
- public void delete(){
+ public void delete() {
transactions.clear();
prepared.clear();
}
- protected MessageStore getStoreById(Object id){
+ protected MessageStore getStoreById(Object id) {
return adaptor.retrieveMessageStore(id);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java Wed Aug 8 11:56:59 2007
@@ -16,52 +16,48 @@
import org.apache.activemq.store.ReferenceStore.ReferenceData;
-public class ReferenceRecord{
+public class ReferenceRecord {
private String messageId;
private ReferenceData data;
- public ReferenceRecord(){
+ public ReferenceRecord() {
}
- public ReferenceRecord(String messageId,ReferenceData data){
- this.messageId=messageId;
- this.data=data;
+ public ReferenceRecord(String messageId, ReferenceData data) {
+ this.messageId = messageId;
+ this.data = data;
}
-
/**
* @return the data
*/
- public ReferenceData getData(){
+ public ReferenceData getData() {
return this.data;
}
-
/**
* @param data the data to set
*/
- public void setData(ReferenceData data){
- this.data=data;
+ public void setData(ReferenceData data) {
+ this.data = data;
}
-
/**
* @return the messageId
*/
- public String getMessageId(){
+ public String getMessageId() {
return this.messageId;
}
-
/**
* @param messageId the messageId to set
*/
- public void setMessageId(String messageId){
- this.messageId=messageId;
+ public void setMessageId(String messageId) {
+ this.messageId = messageId;
}
-
+
public String toString() {
- return "ReferenceRecord(id=" + messageId + ",data=" + data +")";
+ return "ReferenceRecord(id=" + messageId + ",data=" + data + ")";
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java Wed Aug 8 11:56:59 2007
@@ -20,10 +20,10 @@
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
-public class ReferenceRecordMarshaller implements Marshaller<ReferenceRecord>{
+public class ReferenceRecordMarshaller implements Marshaller<ReferenceRecord> {
- public ReferenceRecord readPayload(DataInput dataIn) throws IOException{
- ReferenceRecord rr=new ReferenceRecord();
+ public ReferenceRecord readPayload(DataInput dataIn) throws IOException {
+ ReferenceRecord rr = new ReferenceRecord();
rr.setMessageId(dataIn.readUTF());
ReferenceData referenceData = new ReferenceData();
referenceData.setFileId(dataIn.readInt());
@@ -37,9 +37,10 @@
* @param object
* @param dataOut
* @throws IOException
- * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput)
+ * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object,
+ * java.io.DataOutput)
*/
- public void writePayload(ReferenceRecord rr,DataOutput dataOut) throws IOException{
+ public void writePayload(ReferenceRecord rr, DataOutput dataOut) throws IOException {
dataOut.writeUTF(rr.getMessageId());
dataOut.writeInt(rr.getData().getFileId());
dataOut.writeInt(rr.getData().getOffset());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/StoreEntryMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/StoreEntryMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/StoreEntryMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/StoreEntryMarshaller.java Wed Aug 8 11:56:59 2007
@@ -22,22 +22,21 @@
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.impl.index.IndexItem;
-
/**
* Marshall a TopicSubAck
+ *
* @version $Revision: 1.10 $
*/
-public class StoreEntryMarshaller implements Marshaller{
-
+public class StoreEntryMarshaller implements Marshaller {
+
+ public void writePayload(Object object, DataOutput dataOut) throws IOException {
+ IndexItem item = (IndexItem)object;
+ dataOut.writeLong(item.getOffset());
+ item.write(dataOut);
- public void writePayload(Object object,DataOutput dataOut) throws IOException{
- IndexItem item = (IndexItem)object;
- dataOut.writeLong(item.getOffset());
- item.write(dataOut);
-
}
- public Object readPayload(DataInput dataIn) throws IOException{
+ public Object readPayload(DataInput dataIn) throws IOException {
IndexItem item = new IndexItem();
item.setOffset(dataIn.readLong());
item.read(dataIn);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java Wed Aug 8 11:56:59 2007
@@ -21,32 +21,32 @@
*
* @version $Revision: 1.10 $
*/
-public class TopicSubAck{
+public class TopicSubAck {
- private int count =0;
+ private int count = 0;
private StoreEntry messageEntry;
/**
* @return the count
*/
- public int getCount(){
+ public int getCount() {
return this.count;
}
/**
* @param count the count to set
*/
- public void setCount(int count){
- this.count=count;
+ public void setCount(int count) {
+ this.count = count;
}
-
+
/**
* @return the value of the count after it's decremented
*/
public int decrementCount() {
return --count;
}
-
+
/**
* @return the value of the count after it's incremented
*/
@@ -54,21 +54,18 @@
return ++count;
}
-
/**
* @return the messageEntry
*/
- public StoreEntry getMessageEntry(){
+ public StoreEntry getMessageEntry() {
return this.messageEntry;
}
-
/**
* @param messageEntry the messageEntry to set
*/
- public void setMessageEntry(StoreEntry storeEntry){
- this.messageEntry=storeEntry;
+ public void setMessageEntry(StoreEntry storeEntry) {
+ this.messageEntry = storeEntry;
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java Wed Aug 8 11:56:59 2007
@@ -22,24 +22,23 @@
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.impl.index.IndexItem;
-
/**
* Marshall a TopicSubAck
+ *
* @version $Revision: 1.10 $
*/
-public class TopicSubAckMarshaller implements Marshaller{
-
+public class TopicSubAckMarshaller implements Marshaller {
+
+ public void writePayload(Object object, DataOutput dataOut) throws IOException {
+ TopicSubAck tsa = (TopicSubAck)object;
+ dataOut.writeInt(tsa.getCount());
+ IndexItem item = (IndexItem)tsa.getMessageEntry();
+ dataOut.writeLong(item.getOffset());
+ item.write(dataOut);
- public void writePayload(Object object,DataOutput dataOut) throws IOException{
- TopicSubAck tsa = (TopicSubAck) object;
- dataOut.writeInt(tsa.getCount());
- IndexItem item = (IndexItem)tsa.getMessageEntry();
- dataOut.writeLong(item.getOffset());
- item.write(dataOut);
-
}
- public Object readPayload(DataInput dataIn) throws IOException{
+ public Object readPayload(DataInput dataIn) throws IOException {
TopicSubAck tsa = new TopicSubAck();
int count = dataIn.readInt();
tsa.setCount(count);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Wed Aug 8 11:56:59 2007
@@ -21,13 +21,13 @@
/**
* Holds information for the subscriber
- *
+ *
* @version $Revision: 1.10 $
*/
public class TopicSubContainer {
private transient ListContainer listContainer;
private transient StoreEntry batchEntry;
-
+
public TopicSubContainer(ListContainer container) {
this.listContainer = container;
}
@@ -40,10 +40,10 @@
}
/**
- * @param id
+ * @param id
* @param batchEntry the batchEntry to set
*/
- public void setBatchEntry(String id,StoreEntry batchEntry) {
+ public void setBatchEntry(String id, StoreEntry batchEntry) {
this.batchEntry = batchEntry;
}
@@ -59,28 +59,28 @@
return listContainer.placeLast(ref);
}
- public ConsumerMessageRef remove(MessageId id){
- ConsumerMessageRef result=null;
- if(!listContainer.isEmpty()){
- StoreEntry entry=listContainer.getFirst();
- while(entry!=null){
- ConsumerMessageRef ref=(ConsumerMessageRef)listContainer.get(entry);
+ public ConsumerMessageRef remove(MessageId id) {
+ ConsumerMessageRef result = null;
+ if (!listContainer.isEmpty()) {
+ StoreEntry entry = listContainer.getFirst();
+ while (entry != null) {
+ ConsumerMessageRef ref = (ConsumerMessageRef)listContainer.get(entry);
listContainer.remove(entry);
- if(listContainer!=null&&batchEntry!=null&&(listContainer.isEmpty()||batchEntry.equals(entry))){
+ if (listContainer != null && batchEntry != null && (listContainer.isEmpty() || batchEntry.equals(entry))) {
reset();
}
- if(ref!=null&&ref.getMessageId().equals(id)){
- result=ref;
+ if (ref != null && ref.getMessageId().equals(id)) {
+ result = ref;
break;
}
- entry=listContainer.getFirst();
+ entry = listContainer.getFirst();
}
}
return result;
}
public ConsumerMessageRef get(StoreEntry entry) {
- return (ConsumerMessageRef) listContainer.get(entry);
+ return (ConsumerMessageRef)listContainer.get(entry);
}
public StoreEntry getEntry() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java Wed Aug 8 11:56:59 2007
@@ -29,22 +29,24 @@
/**
* Marshall a Transaction
+ *
* @version $Revision: 1.10 $
*/
-public class TransactionMarshaller implements Marshaller{
-
+public class TransactionMarshaller implements Marshaller {
+
private WireFormat wireFormat;
- public TransactionMarshaller(WireFormat wireFormat){
+
+ public TransactionMarshaller(WireFormat wireFormat) {
this.wireFormat = wireFormat;
-
+
}
-
- public void writePayload(Object object,DataOutput dataOut) throws IOException{
- KahaTransaction kt = (KahaTransaction) object;
+
+ public void writePayload(Object object, DataOutput dataOut) throws IOException {
+ KahaTransaction kt = (KahaTransaction)object;
List list = kt.getList();
dataOut.writeInt(list.size());
- for (int i = 0; i < list.size(); i++){
- TxCommand tx = (TxCommand) list.get(i);
+ for (int i = 0; i < list.size(); i++) {
+ TxCommand tx = (TxCommand)list.get(i);
Object key = tx.getMessageStoreKey();
ByteSequence packet = wireFormat.marshal(key);
dataOut.writeInt(packet.length);
@@ -53,31 +55,30 @@
packet = wireFormat.marshal(command);
dataOut.writeInt(packet.length);
dataOut.write(packet.data, packet.offset, packet.length);
-
+
}
- }
+ }
-
- public Object readPayload(DataInput dataIn) throws IOException{
+ public Object readPayload(DataInput dataIn) throws IOException {
KahaTransaction result = new KahaTransaction();
List list = new ArrayList();
result.setList(list);
- int number=dataIn.readInt();
- for (int i = 0; i < number; i++){
+ int number = dataIn.readInt();
+ for (int i = 0; i < number; i++) {
TxCommand command = new TxCommand();
int size = dataIn.readInt();
- byte[] data=new byte[size];
+ byte[] data = new byte[size];
dataIn.readFully(data);
- Object key = wireFormat.unmarshal(new ByteSequence(data));
+ Object key = wireFormat.unmarshal(new ByteSequence(data));
command.setMessageStoreKey(key);
size = dataIn.readInt();
- data=new byte[size];
+ data = new byte[size];
dataIn.readFully(data);
- BaseCommand bc = (BaseCommand) wireFormat.unmarshal(new ByteSequence(data));
+ BaseCommand bc = (BaseCommand)wireFormat.unmarshal(new ByteSequence(data));
command.setCommand(bc);
list.add(command);
}
return result;
-
+
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java Wed Aug 8 11:56:59 2007
@@ -19,58 +19,55 @@
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.CommandTypes;
-
/**
- * Base class for messages/acknowledgements for a transaction
+ * Base class for messages/acknowledgements for a transaction
*
* @version $Revision: 1.4 $
*/
class TxCommand {
- protected Object messageStoreKey;
- protected BaseCommand command;
+ protected Object messageStoreKey;
+ protected BaseCommand command;
- /**
- * @return Returns the messageStoreKey.
- */
- public Object getMessageStoreKey(){
- return messageStoreKey;
- }
-
- /**
- * @param messageStoreKey The messageStoreKey to set.
- */
- public void setMessageStoreKey(Object messageStoreKey){
- this.messageStoreKey=messageStoreKey;
- }
-
- /**
- * @return Returns the command.
- */
- public BaseCommand getCommand(){
- return command;
- }
-
- /**
- * @param command The command to set.
- */
- public void setCommand(BaseCommand command){
- this.command=command;
- }
-
- /**
- * @return true if a Message command
- */
- public boolean isAdd(){
- return command != null && command.getDataStructureType() != CommandTypes.MESSAGE_ACK;
- }
-
- /**
- * @return true if a MessageAck command
- */
- public boolean isRemove(){
- return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK;
- }
+ /**
+ * @return Returns the messageStoreKey.
+ */
+ public Object getMessageStoreKey() {
+ return messageStoreKey;
+ }
+
+ /**
+ * @param messageStoreKey The messageStoreKey to set.
+ */
+ public void setMessageStoreKey(Object messageStoreKey) {
+ this.messageStoreKey = messageStoreKey;
+ }
+
+ /**
+ * @return Returns the command.
+ */
+ public BaseCommand getCommand() {
+ return command;
+ }
+
+ /**
+ * @param command The command to set.
+ */
+ public void setCommand(BaseCommand command) {
+ this.command = command;
+ }
+
+ /**
+ * @return true if a Message command
+ */
+ public boolean isAdd() {
+ return command != null && command.getDataStructureType() != CommandTypes.MESSAGE_ACK;
+ }
+
+ /**
+ * @return true if a MessageAck command
+ */
+ public boolean isRemove() {
+ return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK;
+ }
-
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Wed Aug 8 11:56:59 2007
@@ -31,128 +31,132 @@
import org.apache.activemq.store.MessageStore;
/**
- * An implementation of {@link org.apache.activemq.store.MessageStore} which uses a
+ * An implementation of {@link org.apache.activemq.store.MessageStore} which
+ * uses a
*
* @version $Revision: 1.7 $
*/
-public class MemoryMessageStore implements MessageStore{
+public class MemoryMessageStore implements MessageStore {
protected final ActiveMQDestination destination;
protected final Map messageTable;
protected MessageId lastBatchId;
- public MemoryMessageStore(ActiveMQDestination destination){
- this(destination,new LinkedHashMap());
+ public MemoryMessageStore(ActiveMQDestination destination) {
+ this(destination, new LinkedHashMap());
}
- public MemoryMessageStore(ActiveMQDestination destination,Map messageTable){
- this.destination=destination;
- this.messageTable=Collections.synchronizedMap(messageTable);
+ public MemoryMessageStore(ActiveMQDestination destination, Map messageTable) {
+ this.destination = destination;
+ this.messageTable = Collections.synchronizedMap(messageTable);
}
- public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
- synchronized(messageTable){
- messageTable.put(message.getMessageId(),message);
+ public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+ synchronized (messageTable) {
+ messageTable.put(message.getMessageId(), message);
}
}
-// public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
-// throws IOException{
-// synchronized(messageTable){
-// messageTable.put(messageId,messageRef);
-// }
-// }
+ // public void addMessageReference(ConnectionContext context,MessageId
+ // messageId,long expirationTime,String messageRef)
+ // throws IOException{
+ // synchronized(messageTable){
+ // messageTable.put(messageId,messageRef);
+ // }
+ // }
- public Message getMessage(MessageId identity) throws IOException{
+ public Message getMessage(MessageId identity) throws IOException {
return (Message)messageTable.get(identity);
}
-// public String getMessageReference(MessageId identity) throws IOException{
-// return (String)messageTable.get(identity);
-// }
+ // public String getMessageReference(MessageId identity) throws IOException{
+ // return (String)messageTable.get(identity);
+ // }
- public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
+ public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
removeMessage(ack.getLastMessageId());
}
- public void removeMessage(MessageId msgId) throws IOException{
- synchronized(messageTable){
+ public void removeMessage(MessageId msgId) throws IOException {
+ synchronized (messageTable) {
messageTable.remove(msgId);
- if((lastBatchId!=null && lastBatchId.equals(msgId)) || messageTable.isEmpty()){
- lastBatchId=null;
+ if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
+ lastBatchId = null;
}
}
}
- public void recover(MessageRecoveryListener listener) throws Exception{
- // the message table is a synchronizedMap - so just have to synchronize here
- synchronized(messageTable){
- for(Iterator iter=messageTable.values().iterator();iter.hasNext();){
- Object msg=(Object)iter.next();
- if(msg.getClass()==MessageId.class){
+ public void recover(MessageRecoveryListener listener) throws Exception {
+ // the message table is a synchronizedMap - so just have to synchronize
+ // here
+ synchronized (messageTable) {
+ for (Iterator iter = messageTable.values().iterator(); iter.hasNext();) {
+ Object msg = (Object)iter.next();
+ if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId)msg);
- }else{
+ } else {
listener.recoverMessage((Message)msg);
}
}
}
}
- public void start(){
+ public void start() {
}
- public void stop(){
+ public void stop() {
}
- public void removeAllMessages(ConnectionContext context) throws IOException{
- synchronized(messageTable){
+ public void removeAllMessages(ConnectionContext context) throws IOException {
+ synchronized (messageTable) {
messageTable.clear();
}
}
- public ActiveMQDestination getDestination(){
+ public ActiveMQDestination getDestination() {
return destination;
}
- public void delete(){
- synchronized(messageTable){
+ public void delete() {
+ synchronized (messageTable) {
messageTable.clear();
}
}
/**
- * @param usageManager The UsageManager that is controlling the destination's memory usage.
+ * @param usageManager The UsageManager that is controlling the
+ * destination's memory usage.
*/
- public void setUsageManager(UsageManager usageManager){
+ public void setUsageManager(UsageManager usageManager) {
}
- public int getMessageCount(){
+ public int getMessageCount() {
return messageTable.size();
}
- public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
- synchronized(messageTable){
- boolean pastLackBatch=lastBatchId==null;
- int count=0;
- for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
- Map.Entry entry=(Entry)iter.next();
- if(pastLackBatch){
+ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+ synchronized (messageTable) {
+ boolean pastLackBatch = lastBatchId == null;
+ int count = 0;
+ for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry entry = (Entry)iter.next();
+ if (pastLackBatch) {
count++;
- Object msg=entry.getValue();
- lastBatchId=(MessageId)entry.getKey();
- if(msg.getClass()==MessageId.class){
+ Object msg = entry.getValue();
+ lastBatchId = (MessageId)entry.getKey();
+ if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId)msg);
- }else{
+ } else {
listener.recoverMessage((Message)msg);
}
- }else{
- pastLackBatch=entry.getKey().equals(lastBatchId);
+ } else {
+ pastLackBatch = entry.getKey().equals(lastBatchId);
}
}
}
}
- public void resetBatching(){
- lastBatchId=null;
+ public void resetBatching() {
+ lastBatchId = null;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Wed Aug 8 11:56:59 2007
@@ -37,7 +37,6 @@
/**
* @org.apache.xbean.XBean
- *
* @version $Revision: 1.4 $
*/
public class MemoryPersistenceAdapter implements PersistenceAdapter {
@@ -47,14 +46,14 @@
ConcurrentHashMap topics = new ConcurrentHashMap();
ConcurrentHashMap queues = new ConcurrentHashMap();
private boolean useExternalMessageReferences;
-
+
public Set getDestinations() {
- Set rc = new HashSet(queues.size()+topics.size());
+ Set rc = new HashSet(queues.size() + topics.size());
for (Iterator iter = queues.keySet().iterator(); iter.hasNext();) {
- rc.add( iter.next() );
+ rc.add(iter.next());
}
for (Iterator iter = topics.keySet().iterator(); iter.hasNext();) {
- rc.add( iter.next() );
+ rc.add(iter.next());
}
return rc;
}
@@ -62,12 +61,12 @@
public static MemoryPersistenceAdapter newInstance(File file) {
return new MemoryPersistenceAdapter();
}
-
+
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = (MessageStore)queues.get(destination);
- if(rc==null) {
+ if (rc == null) {
rc = new MemoryMessageStore(destination);
- if( transactionStore !=null ) {
+ if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
queues.put(destination, rc);
@@ -77,9 +76,9 @@
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore rc = (TopicMessageStore)topics.get(destination);
- if(rc==null) {
+ if (rc == null) {
rc = new MemoryTopicMessageStore(destination);
- if( transactionStore !=null ) {
+ if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
topics.put(destination, rc);
@@ -88,7 +87,7 @@
}
public TransactionStore createTransactionStore() throws IOException {
- if( transactionStore==null ) {
+ if (transactionStore == null) {
transactionStore = new MemoryTransactionStore();
}
return transactionStore;
@@ -108,7 +107,7 @@
public void stop() throws Exception {
}
-
+
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
@@ -142,28 +141,29 @@
protected MemoryMessageStore asMemoryMessageStore(Object value) {
if (value instanceof MemoryMessageStore) {
- return (MemoryMessageStore) value;
+ return (MemoryMessageStore)value;
}
log.warn("Expected an instance of MemoryMessageStore but was: " + value);
return null;
}
/**
- * @param usageManager The UsageManager that is controlling the broker's memory usage.
+ * @param usageManager The UsageManager that is controlling the broker's
+ * memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
}
-
- public String toString(){
+
+ public String toString() {
return "MemoryPersistenceAdapter";
}
- public void setBrokerName(String brokerName){
+ public void setBrokerName(String brokerName) {
}
- public void setDirectory(File dir){
+ public void setDirectory(File dir) {
}
- public void checkpoint(boolean sync) throws IOException{
+ public void checkpoint(boolean sync) throws IOException {
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Wed Aug 8 11:56:59 2007
@@ -33,104 +33,100 @@
/**
* @version $Revision: 1.5 $
*/
-public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{
+public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
private Map subscriberDatabase;
private Map topicSubMap;
- public MemoryTopicMessageStore(ActiveMQDestination destination){
- this(destination,new LRUCache(100,100,0.75f,false),makeMap());
+ public MemoryTopicMessageStore(ActiveMQDestination destination) {
+ this(destination, new LRUCache(100, 100, 0.75f, false), makeMap());
}
- protected static Map makeMap(){
+ protected static Map makeMap() {
return Collections.synchronizedMap(new HashMap());
}
- public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase){
- super(destination,messageTable);
- this.subscriberDatabase=subscriberDatabase;
- this.topicSubMap=makeMap();
+ public MemoryTopicMessageStore(ActiveMQDestination destination, Map messageTable, Map subscriberDatabase) {
+ super(destination, messageTable);
+ this.subscriberDatabase = subscriberDatabase;
+ this.topicSubMap = makeMap();
}
- public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
- super.addMessage(context,message);
- for(Iterator i=topicSubMap.values().iterator();i.hasNext();){
- MemoryTopicSub sub=(MemoryTopicSub)i.next();
- sub.addMessage(message.getMessageId(),message);
+ public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+ super.addMessage(context, message);
+ for (Iterator i = topicSubMap.values().iterator(); i.hasNext();) {
+ MemoryTopicSub sub = (MemoryTopicSub)i.next();
+ sub.addMessage(message.getMessageId(), message);
}
}
- public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
- MessageId messageId) throws IOException{
- SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
- MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(key);
- if(sub!=null){
+ public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+ SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+ MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(key);
+ if (sub != null) {
sub.removeMessage(messageId);
}
}
- public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
- return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
+ public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+ return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
}
- public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
- throws IOException{
- SubscriptionKey key=new SubscriptionKey(info);
- MemoryTopicSub sub=new MemoryTopicSub();
- topicSubMap.put(key,sub);
- if(retroactive){
- for(Iterator i=messageTable.entrySet().iterator();i.hasNext();){
- Map.Entry entry=(Entry)i.next();
- sub.addMessage((MessageId)entry.getKey(),(Message)entry.getValue());
+ public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
+ SubscriptionKey key = new SubscriptionKey(info);
+ MemoryTopicSub sub = new MemoryTopicSub();
+ topicSubMap.put(key, sub);
+ if (retroactive) {
+ for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
+ Map.Entry entry = (Entry)i.next();
+ sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
}
}
- subscriberDatabase.put(key,info);
+ subscriberDatabase.put(key, info);
}
- public void deleteSubscription(String clientId,String subscriptionName){
- org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+ public void deleteSubscription(String clientId, String subscriptionName) {
+ org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
subscriberDatabase.remove(key);
topicSubMap.remove(key);
}
- public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
- throws Exception{
- MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
- if(sub!=null){
+ public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+ MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
+ if (sub != null) {
sub.recoverSubscription(listener);
}
}
- public void delete(){
+ public void delete() {
super.delete();
subscriberDatabase.clear();
topicSubMap.clear();
}
- public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
- public synchronized int getMessageCount(String clientId,String subscriberName) throws IOException{
- int result=0;
- MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriberName));
- if(sub!=null){
- result=sub.size();
+ public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
+ int result = 0;
+ MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
+ if (sub != null) {
+ result = sub.size();
}
return result;
}
- public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
- MessageRecoveryListener listener) throws Exception{
- MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
- if(sub!=null){
- sub.recoverNextMessages(maxReturned,listener);
+ public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
+ MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
+ if (sub != null) {
+ sub.recoverNextMessages(maxReturned, listener);
}
}
- public void resetBatching(String clientId,String subscriptionName){
- MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
- if(sub!=null){
+ public void resetBatching(String clientId, String subscriptionName) {
+ MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
+ if (sub != null) {
sub.resetBatching();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Wed Aug 8 11:56:59 2007
@@ -27,65 +27,66 @@
*
* @version $Revision: 1.7 $
*/
-class MemoryTopicSub{
+class MemoryTopicSub {
- private Map map=new LinkedHashMap();
+ private Map map = new LinkedHashMap();
private MessageId lastBatch;
- void addMessage(MessageId id,Message message){
- map.put(id,message);
+ void addMessage(MessageId id, Message message) {
+ map.put(id, message);
}
- void removeMessage(MessageId id){
+ void removeMessage(MessageId id) {
map.remove(id);
if (map.isEmpty()) {
- lastBatch=null;
+ lastBatch = null;
}
}
- int size(){
+ int size() {
return map.size();
}
- void recoverSubscription(MessageRecoveryListener listener) throws Exception{
- for(Iterator iter=map.entrySet().iterator();iter.hasNext();){
- Map.Entry entry=(Entry)iter.next();
- Object msg=entry.getValue();
- if(msg.getClass()==MessageId.class){
+ void recoverSubscription(MessageRecoveryListener listener) throws Exception {
+ for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry entry = (Entry)iter.next();
+ Object msg = entry.getValue();
+ if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId)msg);
- }else{
+ } else {
listener.recoverMessage((Message)msg);
}
}
}
- void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
- boolean pastLackBatch=lastBatch==null;
- MessageId lastId=null;
- // the message table is a synchronizedMap - so just have to synchronize here
- int count=0;
- for(Iterator iter=map.entrySet().iterator();iter.hasNext()&&count<maxReturned;){
- Map.Entry entry=(Entry)iter.next();
- if(pastLackBatch){
+ void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+ boolean pastLackBatch = lastBatch == null;
+ MessageId lastId = null;
+ // the message table is a synchronizedMap - so just have to synchronize
+ // here
+ int count = 0;
+ for (Iterator iter = map.entrySet().iterator(); iter.hasNext() && count < maxReturned;) {
+ Map.Entry entry = (Entry)iter.next();
+ if (pastLackBatch) {
count++;
- Object msg=entry.getValue();
- lastId=(MessageId)entry.getKey();
- if(msg.getClass()==MessageId.class){
+ Object msg = entry.getValue();
+ lastId = (MessageId)entry.getKey();
+ if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId)msg);
- }else{
+ } else {
listener.recoverMessage((Message)msg);
}
- }else{
- pastLackBatch=entry.getKey().equals(lastBatch);
+ } else {
+ pastLackBatch = entry.getKey().equals(lastBatch);
}
}
- if(lastId!=null){
- lastBatch=lastId;
+ if (lastId != null) {
+ lastBatch = lastId;
}
}
- void resetBatching(){
- lastBatch=null;
+ void resetBatching() {
+ lastBatch = null;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Wed Aug 8 11:56:59 2007
@@ -65,20 +65,20 @@
public Message[] getMessages() {
Message rc[] = new Message[messages.size()];
- int count=0;
+ int count = 0;
for (Iterator iter = messages.iterator(); iter.hasNext();) {
- AddMessageCommand cmd = (AddMessageCommand) iter.next();
- rc[count++] = cmd.getMessage();
+ AddMessageCommand cmd = (AddMessageCommand)iter.next();
+ rc[count++] = cmd.getMessage();
}
return rc;
}
public MessageAck[] getAcks() {
MessageAck rc[] = new MessageAck[acks.size()];
- int count=0;
+ int count = 0;
for (Iterator iter = acks.iterator(); iter.hasNext();) {
- RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
- rc[count++] = cmd.getMessageAck();
+ RemoveMessageCommand cmd = (RemoveMessageCommand)iter.next();
+ rc[count++] = cmd.getMessageAck();
}
return rc;
}
@@ -87,14 +87,14 @@
* @throws IOException
*/
public void commit() throws IOException {
- // Do all the message adds.
+ // Do all the message adds.
for (Iterator iter = messages.iterator(); iter.hasNext();) {
- AddMessageCommand cmd = (AddMessageCommand) iter.next();
+ AddMessageCommand cmd = (AddMessageCommand)iter.next();
cmd.run();
}
// And removes..
for (Iterator iter = acks.iterator(); iter.hasNext();) {
- RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
+ RemoveMessageCommand cmd = (RemoveMessageCommand)iter.next();
cmd.run();
}
}
@@ -102,49 +102,52 @@
public interface AddMessageCommand {
Message getMessage();
+
void run() throws IOException;
}
public interface RemoveMessageCommand {
MessageAck getMessageAck();
+
void run() throws IOException;
}
public MessageStore proxy(MessageStore messageStore) {
- return new ProxyMessageStore(messageStore) {
- public void addMessage(ConnectionContext context, final Message send) throws IOException {
- MemoryTransactionStore.this.addMessage(getDelegate(), send);
- }
-
- public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
- MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
- }
- };
+ return new ProxyMessageStore(messageStore) {
+ public void addMessage(ConnectionContext context, final Message send) throws IOException {
+ MemoryTransactionStore.this.addMessage(getDelegate(), send);
+ }
+
+ public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+ MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
+ }
+ };
}
public TopicMessageStore proxy(TopicMessageStore messageStore) {
- return new ProxyTopicMessageStore(messageStore) {
- public void addMessage(ConnectionContext context, final Message send) throws IOException {
- MemoryTransactionStore.this.addMessage(getDelegate(), send);
- }
- public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
- MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
- }
- };
+ return new ProxyTopicMessageStore(messageStore) {
+ public void addMessage(ConnectionContext context, final Message send) throws IOException {
+ MemoryTransactionStore.this.addMessage(getDelegate(), send);
+ }
+
+ public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+ MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
+ }
+ };
}
/**
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) {
- Tx tx = (Tx) inflightTransactions.remove(txid);
+ Tx tx = (Tx)inflightTransactions.remove(txid);
if (tx == null)
return;
preparedTransactions.put(txid, tx);
}
public Tx getTx(Object txid) {
- Tx tx = (Tx) inflightTransactions.get(txid);
+ Tx tx = (Tx)inflightTransactions.get(txid);
if (tx == null) {
tx = new Tx();
inflightTransactions.put(txid, tx);
@@ -157,18 +160,18 @@
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
-
+
Tx tx;
- if( wasPrepared ) {
- tx = (Tx) preparedTransactions.remove(txid);
+ if (wasPrepared) {
+ tx = (Tx)preparedTransactions.remove(txid);
} else {
- tx = (Tx) inflightTransactions.remove(txid);
+ tx = (Tx)inflightTransactions.remove(txid);
}
-
- if( tx == null )
+
+ if (tx == null)
return;
tx.commit();
-
+
}
/**
@@ -187,14 +190,14 @@
synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
// All the inflight transactions get rolled back..
- inflightTransactions.clear();
+ inflightTransactions.clear();
this.doingRecover = true;
try {
- for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
- Object txid = (Object) iter.next();
- Tx tx = (Tx) preparedTransactions.get(txid);
- listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
- }
+ for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
+ Object txid = (Object)iter.next();
+ Tx tx = (Tx)preparedTransactions.get(txid);
+ listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
+ }
} finally {
this.doingRecover = false;
}
@@ -205,16 +208,17 @@
* @throws IOException
*/
void addMessage(final MessageStore destination, final Message message) throws IOException {
-
- if( doingRecover )
+
+ if (doingRecover)
return;
-
- if (message.getTransactionId()!=null) {
+
+ if (message.getTransactionId() != null) {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand() {
public Message getMessage() {
return message;
}
+
public void run() throws IOException {
destination.addMessage(null, message);
}
@@ -223,13 +227,13 @@
destination.addMessage(null, message);
}
}
-
+
/**
* @param ack
* @throws IOException
*/
- final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException {
- if( doingRecover )
+ final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
+ if (doingRecover)
return;
if (ack.isInTransaction()) {
@@ -238,6 +242,7 @@
public MessageAck getMessageAck() {
return ack;
}
+
public void run() throws IOException {
destination.removeMessage(null, ack);
}
@@ -250,7 +255,7 @@
public void delete() {
inflightTransactions.clear();
preparedTransactions.clear();
- doingRecover=false;
+ doingRecover = false;
}
-
+
}