You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC

svn commit: r563982 [21/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Wed Aug  8 11:56:59 2007
@@ -32,52 +32,54 @@
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicReferenceStore;
 
-public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore{
+public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
 
     protected ListContainer<TopicSubAck> ackContainer;
     private Map subscriberContainer;
     private Store store;
-    protected Map subscriberMessages=new ConcurrentHashMap();
+    protected Map subscriberMessages = new ConcurrentHashMap();
 
-    public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer messageContainer,ListContainer ackContainer,
-            MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
-        super(adapter,messageContainer,destination);
-        this.store=store;
-        this.ackContainer=ackContainer;
-        subscriberContainer=subsContainer;
+    public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
+                                   MapContainer messageContainer, ListContainer ackContainer,
+                                   MapContainer subsContainer, ActiveMQDestination destination)
+        throws IOException {
+        super(adapter, messageContainer, destination);
+        this.store = store;
+        this.ackContainer = ackContainer;
+        subscriberContainer = subsContainer;
         // load all the Ack containers
-        for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
-            String key=(String) i.next();
+        for (Iterator i = subscriberContainer.keySet().iterator(); i.hasNext();) {
+            String key = (String)i.next();
             addSubscriberMessageContainer(key);
         }
     }
 
-    protected MessageId getMessageId(Object object){
+    protected MessageId getMessageId(Object object) {
         return new MessageId(((ReferenceRecord)object).getMessageId());
     }
 
-    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    public synchronized Message getMessage(MessageId identity) throws IOException{
+    public synchronized Message getMessage(MessageId identity) throws IOException {
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    
-    public void addMessageReference(final ConnectionContext context,final MessageId messageId,final ReferenceData data){
-        final ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
-        final int subscriberCount=subscriberMessages.size();
-        if(subscriberCount>0){
-            final StoreEntry messageEntry=messageContainer.place(messageId,record);
+    public void addMessageReference(final ConnectionContext context, final MessageId messageId,
+                                    final ReferenceData data) {
+        final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+        final int subscriberCount = subscriberMessages.size();
+        if (subscriberCount > 0) {
+            final StoreEntry messageEntry = messageContainer.place(messageId, record);
             addInterest(record);
-            final TopicSubAck tsa=new TopicSubAck();
+            final TopicSubAck tsa = new TopicSubAck();
             tsa.setCount(subscriberCount);
             tsa.setMessageEntry(messageEntry);
-            final StoreEntry ackEntry=ackContainer.placeLast(tsa);
-            for(final Iterator i=subscriberMessages.values().iterator();i.hasNext();){
-                final TopicSubContainer container=(TopicSubContainer)i.next();
-                final ConsumerMessageRef ref=new ConsumerMessageRef();
+            final StoreEntry ackEntry = ackContainer.placeLast(tsa);
+            for (final Iterator i = subscriberMessages.values().iterator(); i.hasNext();) {
+                final TopicSubContainer container = (TopicSubContainer)i.next();
+                final ConsumerMessageRef ref = new ConsumerMessageRef();
                 ref.setAckEntry(ackEntry);
                 ref.setMessageEntry(messageEntry);
                 ref.setMessageId(messageId);
@@ -86,154 +88,152 @@
         }
     }
 
-    public ReferenceData getMessageReference(final MessageId identity) throws IOException{
-        final ReferenceRecord result=messageContainer.get(identity);
-        if(result==null)
+    public ReferenceData getMessageReference(final MessageId identity) throws IOException {
+        final ReferenceRecord result = messageContainer.get(identity);
+        if (result == null)
             return null;
         return result.getData();
     }
 
-    public void addReferenceFileIdsInUse(){
-        for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
-            TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
-            if(subAck.getCount()>0){
-                ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
+    public void addReferenceFileIdsInUse() {
+        for (StoreEntry entry = ackContainer.getFirst(); entry != null; entry = ackContainer.getNext(entry)) {
+            TopicSubAck subAck = (TopicSubAck)ackContainer.get(entry);
+            if (subAck.getCount() > 0) {
+                ReferenceRecord rr = (ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
                 addInterest(rr);
             }
         }
     }
 
-    protected ListContainer addSubscriberMessageContainer(String key) throws IOException{
-        ListContainer container=store.getListContainer(destination,"topic-subs-references-"+key);
-        Marshaller marshaller=new ConsumerMessageRefMarshaller();
+    protected ListContainer addSubscriberMessageContainer(String key) throws IOException {
+        ListContainer container = store.getListContainer(destination, "topic-subs-references-" + key);
+        Marshaller marshaller = new ConsumerMessageRefMarshaller();
         container.setMarshaller(marshaller);
-        TopicSubContainer tsc=new TopicSubContainer(container);
-        subscriberMessages.put(key,tsc);
+        TopicSubContainer tsc = new TopicSubContainer(container);
+        subscriberMessages.put(key, tsc);
         return container;
     }
 
-    public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
-            MessageId messageId) throws IOException{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        if(container!=null){
-            ConsumerMessageRef ref=container.remove(messageId);
-            if(ref!=null){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
-                if(tsa!=null){
-                    if(tsa.decrementCount()<=0){
-                        StoreEntry entry=ref.getAckEntry();
-                        entry=ackContainer.refresh(entry);
+    public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+                                         MessageId messageId) throws IOException {
+        String key = getSubscriptionKey(clientId, subscriptionName);
+
+        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        if (container != null) {
+            ConsumerMessageRef ref = container.remove(messageId);
+            if (ref != null) {
+                TopicSubAck tsa = (TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if (tsa != null) {
+                    if (tsa.decrementCount() <= 0) {
+                        StoreEntry entry = ref.getAckEntry();
+                        entry = ackContainer.refresh(entry);
                         ackContainer.remove(entry);
-                        ReferenceRecord rr=messageContainer.get(messageId);
-                        if(rr!=null){
-                            entry=tsa.getMessageEntry();
-                            entry=messageContainer.refresh(entry);
+                        ReferenceRecord rr = messageContainer.get(messageId);
+                        if (rr != null) {
+                            entry = tsa.getMessageEntry();
+                            entry = messageContainer.refresh(entry);
                             messageContainer.remove(entry);
                             removeInterest(rr);
                         }
-                    }else{
-                       
-                        ackContainer.update(ref.getAckEntry(),tsa);
+                    } else {
+
+                        ackContainer.update(ref.getAckEntry(), tsa);
                     }
                 }
             }
         }
     }
 
-    public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
-            throws IOException{
-        String key=getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
+    public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
+        String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
         // if already exists - won't add it again as it causes data files
         // to hang around
-        if(!subscriberContainer.containsKey(key)){
-            subscriberContainer.put(key,info);
+        if (!subscriberContainer.containsKey(key)) {
+            subscriberContainer.put(key, info);
             adapter.addSubscriberState(info);
         }
         // add the subscriber
-        ListContainer container=addSubscriberMessageContainer(key);
-        if(retroactive){
+        ListContainer container = addSubscriberMessageContainer(key);
+        if (retroactive) {
             /*
-            for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
-                ConsumerMessageRef ref=new ConsumerMessageRef();
-                ref.setAckEntry(entry);
-                ref.setMessageEntry(tsa.getMessageEntry());
-                container.add(ref);
-            }
-            */
+             * for(StoreEntry
+             * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
+             * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
+             * ConsumerMessageRef ref=new ConsumerMessageRef();
+             * ref.setAckEntry(entry);
+             * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); }
+             */
         }
     }
 
-    public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
-        SubscriptionInfo info = lookupSubscription(clientId,subscriptionName);
+    public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+        SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
         if (info != null) {
             adapter.removeSubscriberState(info);
         }
-        String key=getSubscriptionKey(clientId,subscriptionName);
+        String key = getSubscriptionKey(clientId, subscriptionName);
         removeSubscriberMessageContainer(key);
     }
 
-    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
-        return (SubscriptionInfo[])subscriberContainer.values().toArray(
-                new SubscriptionInfo[subscriberContainer.size()]);
-    }
-
-    public int getMessageCount(String clientId,String subscriberName) throws IOException{
-        String key=getSubscriptionKey(clientId,subscriberName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        return  container != null ? container.size() : 0;
-    }
-
-    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
-        return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
-    }
-
-    public synchronized void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
-            MessageRecoveryListener listener) throws Exception{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        if(container!=null){
-            int count=0;
-            StoreEntry entry=container.getBatchEntry();
-            if(entry==null){
-                entry=container.getEntry();
-            }else{
-                entry=container.refreshEntry(entry);
-                if(entry!=null){
-                    entry=container.getNextEntry(entry);
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+        return (SubscriptionInfo[])subscriberContainer.values()
+            .toArray(new SubscriptionInfo[subscriberContainer.size()]);
+    }
+
+    public int getMessageCount(String clientId, String subscriberName) throws IOException {
+        String key = getSubscriptionKey(clientId, subscriberName);
+        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        return container != null ? container.size() : 0;
+    }
+
+    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+        return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
+    }
+
+    public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
+                                                 MessageRecoveryListener listener) throws Exception {
+        String key = getSubscriptionKey(clientId, subscriptionName);
+        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        if (container != null) {
+            int count = 0;
+            StoreEntry entry = container.getBatchEntry();
+            if (entry == null) {
+                entry = container.getEntry();
+            } else {
+                entry = container.refreshEntry(entry);
+                if (entry != null) {
+                    entry = container.getNextEntry(entry);
                 }
             }
-           
-            if(entry!=null){
-                do{
-                    ConsumerMessageRef consumerRef=container.get(entry);
-                    ReferenceRecord msg=messageContainer.getValue(consumerRef.getMessageEntry());
-                    if(msg!=null){
-                        recoverReference(listener,msg);
+
+            if (entry != null) {
+                do {
+                    ConsumerMessageRef consumerRef = container.get(entry);
+                    ReferenceRecord msg = messageContainer.getValue(consumerRef.getMessageEntry());
+                    if (msg != null) {
+                        recoverReference(listener, msg);
                         count++;
-                        container.setBatchEntry(msg.getMessageId(),entry);
-                    }else {
+                        container.setBatchEntry(msg.getMessageId(), entry);
+                    } else {
                         container.reset();
                     }
-                    
-                    entry=container.getNextEntry(entry);
-                }while(entry!=null&&count<maxReturned&&listener.hasSpace());
+
+                    entry = container.getNextEntry(entry);
+                } while (entry != null && count < maxReturned && listener.hasSpace());
             }
         }
     }
 
-    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
-            throws Exception{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        if(container!=null){
-            for(Iterator i=container.iterator();i.hasNext();){
-                ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
-                ReferenceRecord msg=messageContainer.get(ref.getMessageEntry());
-                if(msg!=null){
-                    if(!recoverReference(listener,msg)){
+    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
+        throws Exception {
+        String key = getSubscriptionKey(clientId, subscriptionName);
+        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        if (container != null) {
+            for (Iterator i = container.iterator(); i.hasNext();) {
+                ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
+                ReferenceRecord msg = messageContainer.get(ref.getMessageEntry());
+                if (msg != null) {
+                    if (!recoverReference(listener, msg)) {
                         break;
                     }
                 }
@@ -241,37 +241,37 @@
         }
     }
 
-    public synchronized void resetBatching(String clientId,String subscriptionName){
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
-        if(topicSubContainer!=null){
+    public synchronized void resetBatching(String clientId, String subscriptionName) {
+        String key = getSubscriptionKey(clientId, subscriptionName);
+        TopicSubContainer topicSubContainer = (TopicSubContainer)subscriberMessages.get(key);
+        if (topicSubContainer != null) {
             topicSubContainer.reset();
         }
     }
 
-    protected void removeSubscriberMessageContainer(String key) throws IOException{
+    protected void removeSubscriberMessageContainer(String key) throws IOException {
         subscriberContainer.remove(key);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
-        for(Iterator i=container.iterator();i.hasNext();){
-            ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
-            if(ref!=null){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
-                if(tsa!=null){
-                    if(tsa.decrementCount()<=0){
+        TopicSubContainer container = (TopicSubContainer)subscriberMessages.remove(key);
+        for (Iterator i = container.iterator(); i.hasNext();) {
+            ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
+            if (ref != null) {
+                TopicSubAck tsa = (TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if (tsa != null) {
+                    if (tsa.decrementCount() <= 0) {
                         ackContainer.remove(ref.getAckEntry());
                         messageContainer.remove(tsa.getMessageEntry());
-                    }else{
-                        ackContainer.update(ref.getAckEntry(),tsa);
+                    } else {
+                        ackContainer.update(ref.getAckEntry(), tsa);
                     }
                 }
             }
         }
-        store.deleteListContainer(destination,"topic-subs-references-"+key);
+        store.deleteListContainer(destination, "topic-subs-references-" + key);
     }
 
-    protected String getSubscriptionKey(String clientId,String subscriberName){
-        String result=clientId+":";
-        result+=subscriberName!=null?subscriberName:"NOT_SET";
+    protected String getSubscriptionKey(String clientId, String subscriberName) {
+        String result = clientId + ":";
+        result += subscriberName != null ? subscriberName : "NOT_SET";
         return result;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java Wed Aug  8 11:56:59 2007
@@ -26,78 +26,79 @@
 import org.apache.activemq.store.MessageStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 /**
  * Stores a messages/acknowledgements for a transaction
  * 
  * @version $Revision: 1.4 $
  */
-class KahaTransaction{
-    private static final Log log=LogFactory.getLog(KahaTransaction.class);
-    protected List list=new ArrayList();
-
-    
-     void add(KahaMessageStore store,BaseCommand command){
-        TxCommand tx=new TxCommand();
+class KahaTransaction {
+    private static final Log log = LogFactory.getLog(KahaTransaction.class);
+    protected List list = new ArrayList();
+
+    void add(KahaMessageStore store, BaseCommand command) {
+        TxCommand tx = new TxCommand();
         tx.setCommand(command);
         tx.setMessageStoreKey(store.getId());
         list.add(tx);
     }
 
-    Message[] getMessages(){
-        List result=new ArrayList();
-        for(int i=0;i<list.size();i++){
-            TxCommand command=(TxCommand) list.get(i);
-            if(command.isAdd()){
+    Message[] getMessages() {
+        List result = new ArrayList();
+        for (int i = 0; i < list.size(); i++) {
+            TxCommand command = (TxCommand)list.get(i);
+            if (command.isAdd()) {
                 result.add(command.getCommand());
             }
         }
-        Message[] messages=new Message[result.size()];
-        return (Message[]) result.toArray(messages);
+        Message[] messages = new Message[result.size()];
+        return (Message[])result.toArray(messages);
     }
 
-    MessageAck[] getAcks(){
-        List result=new ArrayList();
-        for(int i=0;i<list.size();i++){
-            TxCommand command=(TxCommand) list.get(i);
-            if(command.isRemove()){
+    MessageAck[] getAcks() {
+        List result = new ArrayList();
+        for (int i = 0; i < list.size(); i++) {
+            TxCommand command = (TxCommand)list.get(i);
+            if (command.isRemove()) {
                 result.add(command.getCommand());
             }
         }
-        MessageAck[] acks=new MessageAck[result.size()];
-        return (MessageAck[]) result.toArray(acks);
+        MessageAck[] acks = new MessageAck[result.size()];
+        return (MessageAck[])result.toArray(acks);
     }
 
-    void prepare(){}
+    void prepare() {
+    }
 
-    void rollback(){
+    void rollback() {
         list.clear();
     }
 
     /**
      * @throws IOException
      */
-    void commit(KahaTransactionStore transactionStore) throws IOException{
-        for(int i=0;i<list.size();i++){
-            TxCommand command=(TxCommand) list.get(i);
-            MessageStore ms=transactionStore.getStoreById(command.getMessageStoreKey());
-            if(command.isAdd()){
-                ms.addMessage(null,(Message) command.getCommand());
+    void commit(KahaTransactionStore transactionStore) throws IOException {
+        for (int i = 0; i < list.size(); i++) {
+            TxCommand command = (TxCommand)list.get(i);
+            MessageStore ms = transactionStore.getStoreById(command.getMessageStoreKey());
+            if (command.isAdd()) {
+                ms.addMessage(null, (Message)command.getCommand());
             }
         }
-        for(int i=0;i<list.size();i++){
-            TxCommand command=(TxCommand) list.get(i);
-            MessageStore ms=transactionStore.getStoreById(command.getMessageStoreKey());
-            if(command.isRemove()){
-                ms.removeMessage(null,(MessageAck) command.getCommand());
+        for (int i = 0; i < list.size(); i++) {
+            TxCommand command = (TxCommand)list.get(i);
+            MessageStore ms = transactionStore.getStoreById(command.getMessageStoreKey());
+            if (command.isRemove()) {
+                ms.removeMessage(null, (MessageAck)command.getCommand());
             }
         }
     }
-    
-    List getList(){
+
+    List getList() {
         return new ArrayList(list);
     }
-    
-    void setList(List list){
+
+    void setList(List list) {
         this.list = list;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Wed Aug  8 11:56:59 2007
@@ -33,42 +33,43 @@
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
 import java.util.concurrent.ConcurrentHashMap;
+
 /**
- * Provides a TransactionStore implementation that can create transaction aware MessageStore objects from non
- * transaction aware MessageStore objects.
+ * Provides a TransactionStore implementation that can create transaction aware
+ * MessageStore objects from non transaction aware MessageStore objects.
  * 
  * @version $Revision: 1.4 $
  */
-public class KahaTransactionStore implements TransactionStore{
-    private Map transactions=new ConcurrentHashMap();
+public class KahaTransactionStore implements TransactionStore {
+    private Map transactions = new ConcurrentHashMap();
     private Map prepared;
     private KahaPersistenceAdapter adaptor;
 
-    KahaTransactionStore(KahaPersistenceAdapter adaptor,Map preparedMap){
-        this.adaptor=adaptor;
-        this.prepared=preparedMap;
+    KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
+        this.adaptor = adaptor;
+        this.prepared = preparedMap;
     }
 
-    public MessageStore proxy(MessageStore messageStore){
-        return new ProxyMessageStore(messageStore){
-            public void addMessage(ConnectionContext context,final Message send) throws IOException{
-                KahaTransactionStore.this.addMessage(getDelegate(),send);
+    public MessageStore proxy(MessageStore messageStore) {
+        return new ProxyMessageStore(messageStore) {
+            public void addMessage(ConnectionContext context, final Message send) throws IOException {
+                KahaTransactionStore.this.addMessage(getDelegate(), send);
             }
 
-            public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
-                KahaTransactionStore.this.removeMessage(getDelegate(),ack);
+            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+                KahaTransactionStore.this.removeMessage(getDelegate(), ack);
             }
         };
     }
 
-    public TopicMessageStore proxy(TopicMessageStore messageStore){
-        return new ProxyTopicMessageStore(messageStore){
-            public void addMessage(ConnectionContext context,final Message send) throws IOException{
-                KahaTransactionStore.this.addMessage(getDelegate(),send);
+    public TopicMessageStore proxy(TopicMessageStore messageStore) {
+        return new ProxyTopicMessageStore(messageStore) {
+            public void addMessage(ConnectionContext context, final Message send) throws IOException {
+                KahaTransactionStore.this.addMessage(getDelegate(), send);
             }
 
-            public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
-                KahaTransactionStore.this.removeMessage(getDelegate(),ack);
+            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+                KahaTransactionStore.this.removeMessage(getDelegate(), ack);
             }
         };
     }
@@ -76,11 +77,11 @@
     /**
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
-    public void prepare(TransactionId txid){
-        KahaTransaction tx=getTx(txid);
-        if(tx!=null){
+    public void prepare(TransactionId txid) {
+        KahaTransaction tx = getTx(txid);
+        if (tx != null) {
             tx.prepare();
-            prepared.put(txid,tx);
+            prepared.put(txid, tx);
         }
     }
 
@@ -88,9 +89,9 @@
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
-        KahaTransaction tx=getTx(txid);
-        if(tx!=null){
+    public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+        KahaTransaction tx = getTx(txid);
+        if (tx != null) {
             tx.commit(this);
             removeTx(txid);
         }
@@ -99,24 +100,26 @@
     /**
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
-    public void rollback(TransactionId txid){
-        KahaTransaction tx=getTx(txid);
-        if(tx!=null){
+    public void rollback(TransactionId txid) {
+        KahaTransaction tx = getTx(txid);
+        if (tx != null) {
             tx.rollback();
             removeTx(txid);
         }
     }
 
-    public void start() throws Exception{}
+    public void start() throws Exception {
+    }
 
-    public void stop() throws Exception{}
+    public void stop() throws Exception {
+    }
 
-    synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
-        for(Iterator i=prepared.entrySet().iterator();i.hasNext();){
-            Map.Entry entry=(Entry) i.next();
-            XATransactionId xid=(XATransactionId) entry.getKey();
-            KahaTransaction kt=(KahaTransaction) entry.getValue();
-            listener.recover(xid,kt.getMessages(),kt.getAcks());
+    synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
+        for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
+            Map.Entry entry = (Entry)i.next();
+            XATransactionId xid = (XATransactionId)entry.getKey();
+            KahaTransaction kt = (KahaTransaction)entry.getValue();
+            listener.recover(xid, kt.getMessages(), kt.getAcks());
         }
     }
 
@@ -124,12 +127,12 @@
      * @param message
      * @throws IOException
      */
-    void addMessage(final MessageStore destination,final Message message) throws IOException{
-        if(message.isInTransaction()){
-            KahaTransaction tx=getOrCreateTx(message.getTransactionId());
-            tx.add((KahaMessageStore) destination,message);
-        }else{
-            destination.addMessage(null,message);
+    void addMessage(final MessageStore destination, final Message message) throws IOException {
+        if (message.isInTransaction()) {
+            KahaTransaction tx = getOrCreateTx(message.getTransactionId());
+            tx.add((KahaMessageStore)destination, message);
+        } else {
+            destination.addMessage(null, message);
         }
     }
 
@@ -137,43 +140,43 @@
      * @param ack
      * @throws IOException
      */
-    final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{
-        if(ack.isInTransaction()){
-            KahaTransaction tx=getOrCreateTx(ack.getTransactionId());
-            tx.add((KahaMessageStore) destination,ack);
-        }else{
-            destination.removeMessage(null,ack);
+    final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
+        if (ack.isInTransaction()) {
+            KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
+            tx.add((KahaMessageStore)destination, ack);
+        } else {
+            destination.removeMessage(null, ack);
         }
     }
 
-    protected synchronized KahaTransaction getTx(TransactionId key){
-        KahaTransaction result=(KahaTransaction) transactions.get(key);
-        if(result==null){
-            result=(KahaTransaction) prepared.get(key);
+    protected synchronized KahaTransaction getTx(TransactionId key) {
+        KahaTransaction result = (KahaTransaction)transactions.get(key);
+        if (result == null) {
+            result = (KahaTransaction)prepared.get(key);
         }
         return result;
     }
 
-    protected synchronized KahaTransaction getOrCreateTx(TransactionId key){
-        KahaTransaction result=(KahaTransaction) transactions.get(key);
-        if(result==null){
-            result=new KahaTransaction();
-            transactions.put(key,result);
+    protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
+        KahaTransaction result = (KahaTransaction)transactions.get(key);
+        if (result == null) {
+            result = new KahaTransaction();
+            transactions.put(key, result);
         }
         return result;
     }
 
-    protected synchronized void removeTx(TransactionId key){
+    protected synchronized void removeTx(TransactionId key) {
         transactions.remove(key);
         prepared.remove(key);
     }
 
-    public void delete(){
+    public void delete() {
         transactions.clear();
         prepared.clear();
     }
 
-    protected MessageStore getStoreById(Object id){
+    protected MessageStore getStoreById(Object id) {
         return adaptor.retrieveMessageStore(id);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java Wed Aug  8 11:56:59 2007
@@ -16,52 +16,48 @@
 
 import org.apache.activemq.store.ReferenceStore.ReferenceData;
 
-public class ReferenceRecord{
+public class ReferenceRecord {
 
     private String messageId;
     private ReferenceData data;
 
-    public ReferenceRecord(){
+    public ReferenceRecord() {
     }
 
-    public ReferenceRecord(String messageId,ReferenceData data){
-        this.messageId=messageId;
-        this.data=data;
+    public ReferenceRecord(String messageId, ReferenceData data) {
+        this.messageId = messageId;
+        this.data = data;
     }
 
-    
     /**
      * @return the data
      */
-    public ReferenceData getData(){
+    public ReferenceData getData() {
         return this.data;
     }
 
-    
     /**
      * @param data the data to set
      */
-    public void setData(ReferenceData data){
-        this.data=data;
+    public void setData(ReferenceData data) {
+        this.data = data;
     }
 
-    
     /**
      * @return the messageId
      */
-    public String getMessageId(){
+    public String getMessageId() {
         return this.messageId;
     }
 
-    
     /**
      * @param messageId the messageId to set
      */
-    public void setMessageId(String messageId){
-        this.messageId=messageId;
+    public void setMessageId(String messageId) {
+        this.messageId = messageId;
     }
-    
+
     public String toString() {
-        return "ReferenceRecord(id=" + messageId + ",data=" + data +")";
+        return "ReferenceRecord(id=" + messageId + ",data=" + data + ")";
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java Wed Aug  8 11:56:59 2007
@@ -20,10 +20,10 @@
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.store.ReferenceStore.ReferenceData;
 
-public class ReferenceRecordMarshaller implements Marshaller<ReferenceRecord>{
+public class ReferenceRecordMarshaller implements Marshaller<ReferenceRecord> {
 
-    public ReferenceRecord readPayload(DataInput dataIn) throws IOException{
-        ReferenceRecord rr=new ReferenceRecord();
+    public ReferenceRecord readPayload(DataInput dataIn) throws IOException {
+        ReferenceRecord rr = new ReferenceRecord();
         rr.setMessageId(dataIn.readUTF());
         ReferenceData referenceData = new ReferenceData();
         referenceData.setFileId(dataIn.readInt());
@@ -37,9 +37,10 @@
      * @param object
      * @param dataOut
      * @throws IOException
-     * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput)
+     * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object,
+     *      java.io.DataOutput)
      */
-    public void writePayload(ReferenceRecord rr,DataOutput dataOut) throws IOException{
+    public void writePayload(ReferenceRecord rr, DataOutput dataOut) throws IOException {
         dataOut.writeUTF(rr.getMessageId());
         dataOut.writeInt(rr.getData().getFileId());
         dataOut.writeInt(rr.getData().getOffset());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/StoreEntryMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/StoreEntryMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/StoreEntryMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/StoreEntryMarshaller.java Wed Aug  8 11:56:59 2007
@@ -22,22 +22,21 @@
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 
-
 /**
  * Marshall a TopicSubAck
+ * 
  * @version $Revision: 1.10 $
  */
-public class StoreEntryMarshaller implements Marshaller{
-   
+public class StoreEntryMarshaller implements Marshaller {
+
+    public void writePayload(Object object, DataOutput dataOut) throws IOException {
+        IndexItem item = (IndexItem)object;
+        dataOut.writeLong(item.getOffset());
+        item.write(dataOut);
 
-    public void writePayload(Object object,DataOutput dataOut) throws IOException{
-       IndexItem item = (IndexItem)object;
-       dataOut.writeLong(item.getOffset());
-       item.write(dataOut);
-       
     }
 
-    public Object readPayload(DataInput dataIn) throws IOException{
+    public Object readPayload(DataInput dataIn) throws IOException {
         IndexItem item = new IndexItem();
         item.setOffset(dataIn.readLong());
         item.read(dataIn);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java Wed Aug  8 11:56:59 2007
@@ -21,32 +21,32 @@
  * 
  * @version $Revision: 1.10 $
  */
-public class TopicSubAck{
+public class TopicSubAck {
 
-    private int count =0;
+    private int count = 0;
     private StoreEntry messageEntry;
 
     /**
      * @return the count
      */
-    public int getCount(){
+    public int getCount() {
         return this.count;
     }
 
     /**
      * @param count the count to set
      */
-    public void setCount(int count){
-        this.count=count;
+    public void setCount(int count) {
+        this.count = count;
     }
-    
+
     /**
      * @return the value of the count after it's decremented
      */
     public int decrementCount() {
         return --count;
     }
-    
+
     /**
      * @return the value of the count after it's incremented
      */
@@ -54,21 +54,18 @@
         return ++count;
     }
 
-    
     /**
      * @return the messageEntry
      */
-    public StoreEntry getMessageEntry(){
+    public StoreEntry getMessageEntry() {
         return this.messageEntry;
     }
 
-    
     /**
      * @param messageEntry the messageEntry to set
      */
-    public void setMessageEntry(StoreEntry storeEntry){
-        this.messageEntry=storeEntry;
+    public void setMessageEntry(StoreEntry storeEntry) {
+        this.messageEntry = storeEntry;
     }
 
-   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java Wed Aug  8 11:56:59 2007
@@ -22,24 +22,23 @@
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 
-
 /**
  * Marshall a TopicSubAck
+ * 
  * @version $Revision: 1.10 $
  */
-public class TopicSubAckMarshaller implements Marshaller{
-   
+public class TopicSubAckMarshaller implements Marshaller {
+
+    public void writePayload(Object object, DataOutput dataOut) throws IOException {
+        TopicSubAck tsa = (TopicSubAck)object;
+        dataOut.writeInt(tsa.getCount());
+        IndexItem item = (IndexItem)tsa.getMessageEntry();
+        dataOut.writeLong(item.getOffset());
+        item.write(dataOut);
 
-    public void writePayload(Object object,DataOutput dataOut) throws IOException{
-       TopicSubAck tsa = (TopicSubAck) object;
-       dataOut.writeInt(tsa.getCount());
-       IndexItem item = (IndexItem)tsa.getMessageEntry();
-       dataOut.writeLong(item.getOffset());
-       item.write(dataOut);
-       
     }
 
-    public Object readPayload(DataInput dataIn) throws IOException{
+    public Object readPayload(DataInput dataIn) throws IOException {
         TopicSubAck tsa = new TopicSubAck();
         int count = dataIn.readInt();
         tsa.setCount(count);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Wed Aug  8 11:56:59 2007
@@ -21,13 +21,13 @@
 
 /**
  * Holds information for the subscriber
- *
+ * 
  * @version $Revision: 1.10 $
  */
 public class TopicSubContainer {
     private transient ListContainer listContainer;
     private transient StoreEntry batchEntry;
-    
+
     public TopicSubContainer(ListContainer container) {
         this.listContainer = container;
     }
@@ -40,10 +40,10 @@
     }
 
     /**
-     * @param id 
+     * @param id
      * @param batchEntry the batchEntry to set
      */
-    public void setBatchEntry(String id,StoreEntry batchEntry) {
+    public void setBatchEntry(String id, StoreEntry batchEntry) {
         this.batchEntry = batchEntry;
     }
 
@@ -59,28 +59,28 @@
         return listContainer.placeLast(ref);
     }
 
-    public ConsumerMessageRef remove(MessageId id){
-        ConsumerMessageRef result=null;
-        if(!listContainer.isEmpty()){
-            StoreEntry entry=listContainer.getFirst();
-            while(entry!=null){
-                ConsumerMessageRef ref=(ConsumerMessageRef)listContainer.get(entry);
+    public ConsumerMessageRef remove(MessageId id) {
+        ConsumerMessageRef result = null;
+        if (!listContainer.isEmpty()) {
+            StoreEntry entry = listContainer.getFirst();
+            while (entry != null) {
+                ConsumerMessageRef ref = (ConsumerMessageRef)listContainer.get(entry);
                 listContainer.remove(entry);
-                if(listContainer!=null&&batchEntry!=null&&(listContainer.isEmpty()||batchEntry.equals(entry))){
+                if (listContainer != null && batchEntry != null && (listContainer.isEmpty() || batchEntry.equals(entry))) {
                     reset();
                 }
-                if(ref!=null&&ref.getMessageId().equals(id)){
-                    result=ref;
+                if (ref != null && ref.getMessageId().equals(id)) {
+                    result = ref;
                     break;
                 }
-                entry=listContainer.getFirst();
+                entry = listContainer.getFirst();
             }
         }
         return result;
     }
 
     public ConsumerMessageRef get(StoreEntry entry) {
-        return (ConsumerMessageRef) listContainer.get(entry);
+        return (ConsumerMessageRef)listContainer.get(entry);
     }
 
     public StoreEntry getEntry() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java Wed Aug  8 11:56:59 2007
@@ -29,22 +29,24 @@
 
 /**
  * Marshall a Transaction
+ * 
  * @version $Revision: 1.10 $
  */
-public class TransactionMarshaller implements Marshaller{
-    
+public class TransactionMarshaller implements Marshaller {
+
     private WireFormat wireFormat;
-    public TransactionMarshaller(WireFormat wireFormat){
+
+    public TransactionMarshaller(WireFormat wireFormat) {
         this.wireFormat = wireFormat;
-      
+
     }
-    
-    public void writePayload(Object object,DataOutput dataOut) throws IOException{
-        KahaTransaction kt = (KahaTransaction) object;
+
+    public void writePayload(Object object, DataOutput dataOut) throws IOException {
+        KahaTransaction kt = (KahaTransaction)object;
         List list = kt.getList();
         dataOut.writeInt(list.size());
-        for (int i = 0; i < list.size(); i++){
-            TxCommand tx = (TxCommand) list.get(i);
+        for (int i = 0; i < list.size(); i++) {
+            TxCommand tx = (TxCommand)list.get(i);
             Object key = tx.getMessageStoreKey();
             ByteSequence packet = wireFormat.marshal(key);
             dataOut.writeInt(packet.length);
@@ -53,31 +55,30 @@
             packet = wireFormat.marshal(command);
             dataOut.writeInt(packet.length);
             dataOut.write(packet.data, packet.offset, packet.length);
-            
+
         }
-       }
+    }
 
-   
-    public Object readPayload(DataInput dataIn) throws IOException{
+    public Object readPayload(DataInput dataIn) throws IOException {
         KahaTransaction result = new KahaTransaction();
         List list = new ArrayList();
         result.setList(list);
-        int number=dataIn.readInt();
-        for (int i = 0; i < number; i++){
+        int number = dataIn.readInt();
+        for (int i = 0; i < number; i++) {
             TxCommand command = new TxCommand();
             int size = dataIn.readInt();
-            byte[] data=new byte[size];
+            byte[] data = new byte[size];
             dataIn.readFully(data);
-            Object key =  wireFormat.unmarshal(new ByteSequence(data));
+            Object key = wireFormat.unmarshal(new ByteSequence(data));
             command.setMessageStoreKey(key);
             size = dataIn.readInt();
-            data=new byte[size];
+            data = new byte[size];
             dataIn.readFully(data);
-            BaseCommand bc =  (BaseCommand) wireFormat.unmarshal(new ByteSequence(data));
+            BaseCommand bc = (BaseCommand)wireFormat.unmarshal(new ByteSequence(data));
             command.setCommand(bc);
             list.add(command);
         }
         return result;
-       
+
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java Wed Aug  8 11:56:59 2007
@@ -19,58 +19,55 @@
 import org.apache.activemq.command.BaseCommand;
 import org.apache.activemq.command.CommandTypes;
 
-
 /**
- * Base class for  messages/acknowledgements for a transaction
+ * Base class for messages/acknowledgements for a transaction
  * 
  * @version $Revision: 1.4 $
  */
 class TxCommand {
-        protected Object messageStoreKey;
-        protected BaseCommand command;
+    protected Object messageStoreKey;
+    protected BaseCommand command;
 
-        /**
-         * @return Returns the messageStoreKey.
-         */
-        public Object getMessageStoreKey(){
-            return messageStoreKey;
-        }
-
-        /**
-         * @param messageStoreKey The messageStoreKey to set.
-         */
-        public void setMessageStoreKey(Object messageStoreKey){
-            this.messageStoreKey=messageStoreKey;
-        }
-
-        /**
-         * @return Returns the command.
-         */
-        public BaseCommand getCommand(){
-            return command;
-        }
-
-        /**
-         * @param command The command to set.
-         */
-        public void setCommand(BaseCommand command){
-            this.command=command;
-        }
-        
-        /**
-         * @return true if a Message command
-         */
-        public boolean isAdd(){
-            return command != null && command.getDataStructureType() != CommandTypes.MESSAGE_ACK;
-        }
-        
-        /**
-         * @return true if a MessageAck command
-         */
-        public boolean isRemove(){
-            return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK;
-        }
+    /**
+     * @return Returns the messageStoreKey.
+     */
+    public Object getMessageStoreKey() {
+        return messageStoreKey;
+    }
+
+    /**
+     * @param messageStoreKey The messageStoreKey to set.
+     */
+    public void setMessageStoreKey(Object messageStoreKey) {
+        this.messageStoreKey = messageStoreKey;
+    }
+
+    /**
+     * @return Returns the command.
+     */
+    public BaseCommand getCommand() {
+        return command;
+    }
+
+    /**
+     * @param command The command to set.
+     */
+    public void setCommand(BaseCommand command) {
+        this.command = command;
+    }
+
+    /**
+     * @return true if a Message command
+     */
+    public boolean isAdd() {
+        return command != null && command.getDataStructureType() != CommandTypes.MESSAGE_ACK;
+    }
+
+    /**
+     * @return true if a MessageAck command
+     */
+    public boolean isRemove() {
+        return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK;
+    }
 
-  
-    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Wed Aug  8 11:56:59 2007
@@ -31,128 +31,132 @@
 import org.apache.activemq.store.MessageStore;
 
 /**
- * An implementation of {@link org.apache.activemq.store.MessageStore} which uses a
+ * An implementation of {@link org.apache.activemq.store.MessageStore} which
+ * uses a
  * 
  * @version $Revision: 1.7 $
  */
-public class MemoryMessageStore implements MessageStore{
+public class MemoryMessageStore implements MessageStore {
 
     protected final ActiveMQDestination destination;
     protected final Map messageTable;
     protected MessageId lastBatchId;
 
-    public MemoryMessageStore(ActiveMQDestination destination){
-        this(destination,new LinkedHashMap());
+    public MemoryMessageStore(ActiveMQDestination destination) {
+        this(destination, new LinkedHashMap());
     }
 
-    public MemoryMessageStore(ActiveMQDestination destination,Map messageTable){
-        this.destination=destination;
-        this.messageTable=Collections.synchronizedMap(messageTable);
+    public MemoryMessageStore(ActiveMQDestination destination, Map messageTable) {
+        this.destination = destination;
+        this.messageTable = Collections.synchronizedMap(messageTable);
     }
 
-    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
-        synchronized(messageTable){
-            messageTable.put(message.getMessageId(),message);
+    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+        synchronized (messageTable) {
+            messageTable.put(message.getMessageId(), message);
         }
     }
 
-//    public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
-//            throws IOException{
-//        synchronized(messageTable){
-//            messageTable.put(messageId,messageRef);
-//        }
-//    }
+    // public void addMessageReference(ConnectionContext context,MessageId
+    // messageId,long expirationTime,String messageRef)
+    // throws IOException{
+    // synchronized(messageTable){
+    // messageTable.put(messageId,messageRef);
+    // }
+    // }
 
-    public Message getMessage(MessageId identity) throws IOException{
+    public Message getMessage(MessageId identity) throws IOException {
         return (Message)messageTable.get(identity);
     }
 
-//    public String getMessageReference(MessageId identity) throws IOException{
-//        return (String)messageTable.get(identity);
-//    }
+    // public String getMessageReference(MessageId identity) throws IOException{
+    // return (String)messageTable.get(identity);
+    // }
 
-    public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
+    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
         removeMessage(ack.getLastMessageId());
     }
 
-    public void removeMessage(MessageId msgId) throws IOException{
-        synchronized(messageTable){
+    public void removeMessage(MessageId msgId) throws IOException {
+        synchronized (messageTable) {
             messageTable.remove(msgId);
-            if((lastBatchId!=null && lastBatchId.equals(msgId)) || messageTable.isEmpty()){
-                lastBatchId=null;
+            if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
+                lastBatchId = null;
             }
         }
     }
 
-    public void recover(MessageRecoveryListener listener) throws Exception{
-        // the message table is a synchronizedMap - so just have to synchronize here
-        synchronized(messageTable){
-            for(Iterator iter=messageTable.values().iterator();iter.hasNext();){
-                Object msg=(Object)iter.next();
-                if(msg.getClass()==MessageId.class){
+    public void recover(MessageRecoveryListener listener) throws Exception {
+        // the message table is a synchronizedMap - so just have to synchronize
+        // here
+        synchronized (messageTable) {
+            for (Iterator iter = messageTable.values().iterator(); iter.hasNext();) {
+                Object msg = (Object)iter.next();
+                if (msg.getClass() == MessageId.class) {
                     listener.recoverMessageReference((MessageId)msg);
-                }else{
+                } else {
                     listener.recoverMessage((Message)msg);
                 }
             }
         }
     }
 
-    public void start(){
+    public void start() {
     }
 
-    public void stop(){
+    public void stop() {
     }
 
-    public void removeAllMessages(ConnectionContext context) throws IOException{
-        synchronized(messageTable){
+    public void removeAllMessages(ConnectionContext context) throws IOException {
+        synchronized (messageTable) {
             messageTable.clear();
         }
     }
 
-    public ActiveMQDestination getDestination(){
+    public ActiveMQDestination getDestination() {
         return destination;
     }
 
-    public void delete(){
-        synchronized(messageTable){
+    public void delete() {
+        synchronized (messageTable) {
             messageTable.clear();
         }
     }
 
     /**
-     * @param usageManager The UsageManager that is controlling the destination's memory usage.
+     * @param usageManager The UsageManager that is controlling the
+     *                destination's memory usage.
      */
-    public void setUsageManager(UsageManager usageManager){
+    public void setUsageManager(UsageManager usageManager) {
     }
 
-    public int getMessageCount(){
+    public int getMessageCount() {
         return messageTable.size();
     }
 
-    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
-        synchronized(messageTable){
-            boolean pastLackBatch=lastBatchId==null;
-            int count=0;
-            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
-                Map.Entry entry=(Entry)iter.next();
-                if(pastLackBatch){
+    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+        synchronized (messageTable) {
+            boolean pastLackBatch = lastBatchId == null;
+            int count = 0;
+            for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
+                Map.Entry entry = (Entry)iter.next();
+                if (pastLackBatch) {
                     count++;
-                    Object msg=entry.getValue();
-                    lastBatchId=(MessageId)entry.getKey();
-                    if(msg.getClass()==MessageId.class){
+                    Object msg = entry.getValue();
+                    lastBatchId = (MessageId)entry.getKey();
+                    if (msg.getClass() == MessageId.class) {
                         listener.recoverMessageReference((MessageId)msg);
-                    }else{
+                    } else {
                         listener.recoverMessage((Message)msg);
                     }
-                }else{
-                    pastLackBatch=entry.getKey().equals(lastBatchId);
+                } else {
+                    pastLackBatch = entry.getKey().equals(lastBatchId);
                 }
             }
         }
     }
 
-    public void resetBatching(){
-        lastBatchId=null;
+    public void resetBatching() {
+        lastBatchId = null;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Wed Aug  8 11:56:59 2007
@@ -37,7 +37,6 @@
 
 /**
  * @org.apache.xbean.XBean
- * 
  * @version $Revision: 1.4 $
  */
 public class MemoryPersistenceAdapter implements PersistenceAdapter {
@@ -47,14 +46,14 @@
     ConcurrentHashMap topics = new ConcurrentHashMap();
     ConcurrentHashMap queues = new ConcurrentHashMap();
     private boolean useExternalMessageReferences;
-    
+
     public Set getDestinations() {
-        Set rc = new HashSet(queues.size()+topics.size());
+        Set rc = new HashSet(queues.size() + topics.size());
         for (Iterator iter = queues.keySet().iterator(); iter.hasNext();) {
-            rc.add( iter.next() );
+            rc.add(iter.next());
         }
         for (Iterator iter = topics.keySet().iterator(); iter.hasNext();) {
-            rc.add( iter.next() );
+            rc.add(iter.next());
         }
         return rc;
     }
@@ -62,12 +61,12 @@
     public static MemoryPersistenceAdapter newInstance(File file) {
         return new MemoryPersistenceAdapter();
     }
-    
+
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
         MessageStore rc = (MessageStore)queues.get(destination);
-        if(rc==null) {
+        if (rc == null) {
             rc = new MemoryMessageStore(destination);
-            if( transactionStore !=null ) {
+            if (transactionStore != null) {
                 rc = transactionStore.proxy(rc);
             }
             queues.put(destination, rc);
@@ -77,9 +76,9 @@
 
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
         TopicMessageStore rc = (TopicMessageStore)topics.get(destination);
-        if(rc==null) {
+        if (rc == null) {
             rc = new MemoryTopicMessageStore(destination);
-            if( transactionStore !=null ) {
+            if (transactionStore != null) {
                 rc = transactionStore.proxy(rc);
             }
             topics.put(destination, rc);
@@ -88,7 +87,7 @@
     }
 
     public TransactionStore createTransactionStore() throws IOException {
-        if( transactionStore==null ) {
+        if (transactionStore == null) {
             transactionStore = new MemoryTransactionStore();
         }
         return transactionStore;
@@ -108,7 +107,7 @@
 
     public void stop() throws Exception {
     }
-    
+
     public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
@@ -142,28 +141,29 @@
 
     protected MemoryMessageStore asMemoryMessageStore(Object value) {
         if (value instanceof MemoryMessageStore) {
-            return (MemoryMessageStore) value;
+            return (MemoryMessageStore)value;
         }
         log.warn("Expected an instance of MemoryMessageStore but was: " + value);
         return null;
     }
 
     /**
-     * @param usageManager The UsageManager that is controlling the broker's memory usage.
+     * @param usageManager The UsageManager that is controlling the broker's
+     *                memory usage.
      */
     public void setUsageManager(UsageManager usageManager) {
     }
-    
-    public String toString(){
+
+    public String toString() {
         return "MemoryPersistenceAdapter";
     }
 
-    public void setBrokerName(String brokerName){        
+    public void setBrokerName(String brokerName) {
     }
 
-    public void setDirectory(File dir){        
+    public void setDirectory(File dir) {
     }
 
-    public void checkpoint(boolean sync) throws IOException{        
+    public void checkpoint(boolean sync) throws IOException {
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Wed Aug  8 11:56:59 2007
@@ -33,104 +33,100 @@
 /**
  * @version $Revision: 1.5 $
  */
-public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{
+public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
 
     private Map subscriberDatabase;
     private Map topicSubMap;
 
-    public MemoryTopicMessageStore(ActiveMQDestination destination){
-        this(destination,new LRUCache(100,100,0.75f,false),makeMap());
+    public MemoryTopicMessageStore(ActiveMQDestination destination) {
+        this(destination, new LRUCache(100, 100, 0.75f, false), makeMap());
     }
 
-    protected static Map makeMap(){
+    protected static Map makeMap() {
         return Collections.synchronizedMap(new HashMap());
     }
 
-    public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase){
-        super(destination,messageTable);
-        this.subscriberDatabase=subscriberDatabase;
-        this.topicSubMap=makeMap();
+    public MemoryTopicMessageStore(ActiveMQDestination destination, Map messageTable, Map subscriberDatabase) {
+        super(destination, messageTable);
+        this.subscriberDatabase = subscriberDatabase;
+        this.topicSubMap = makeMap();
     }
 
-    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
-        super.addMessage(context,message);
-        for(Iterator i=topicSubMap.values().iterator();i.hasNext();){
-            MemoryTopicSub sub=(MemoryTopicSub)i.next();
-            sub.addMessage(message.getMessageId(),message);
+    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+        super.addMessage(context, message);
+        for (Iterator i = topicSubMap.values().iterator(); i.hasNext();) {
+            MemoryTopicSub sub = (MemoryTopicSub)i.next();
+            sub.addMessage(message.getMessageId(), message);
         }
     }
 
-    public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
-            MessageId messageId) throws IOException{
-        SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
-        MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(key);
-        if(sub!=null){
+    public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+        MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(key);
+        if (sub != null) {
             sub.removeMessage(messageId);
         }
     }
 
-    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
-        return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
+    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+        return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
     }
 
-    public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
-            throws IOException{
-        SubscriptionKey key=new SubscriptionKey(info);
-        MemoryTopicSub sub=new MemoryTopicSub();
-        topicSubMap.put(key,sub);
-        if(retroactive){
-            for(Iterator i=messageTable.entrySet().iterator();i.hasNext();){
-                Map.Entry entry=(Entry)i.next();
-                sub.addMessage((MessageId)entry.getKey(),(Message)entry.getValue());
+    public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
+        SubscriptionKey key = new SubscriptionKey(info);
+        MemoryTopicSub sub = new MemoryTopicSub();
+        topicSubMap.put(key, sub);
+        if (retroactive) {
+            for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
+                Map.Entry entry = (Entry)i.next();
+                sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
             }
         }
-        subscriberDatabase.put(key,info);
+        subscriberDatabase.put(key, info);
     }
 
-    public void deleteSubscription(String clientId,String subscriptionName){
-        org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+    public void deleteSubscription(String clientId, String subscriptionName) {
+        org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
         subscriberDatabase.remove(key);
         topicSubMap.remove(key);
     }
 
-    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
-            throws Exception{
-        MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
-        if(sub!=null){
+    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+        MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
+        if (sub != null) {
             sub.recoverSubscription(listener);
         }
     }
 
-    public void delete(){
+    public void delete() {
         super.delete();
         subscriberDatabase.clear();
         topicSubMap.clear();
     }
 
-    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
     }
 
-    public synchronized int getMessageCount(String clientId,String subscriberName) throws IOException{
-        int result=0;
-        MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriberName));
-        if(sub!=null){
-            result=sub.size();
+    public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
+        int result = 0;
+        MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
+        if (sub != null) {
+            result = sub.size();
         }
         return result;
     }
 
-    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
-            MessageRecoveryListener listener) throws Exception{
-        MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
-        if(sub!=null){
-            sub.recoverNextMessages(maxReturned,listener);
+    public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
+        MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
+        if (sub != null) {
+            sub.recoverNextMessages(maxReturned, listener);
         }
     }
 
-    public void resetBatching(String clientId,String subscriptionName){
-        MemoryTopicSub sub=(MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId,subscriptionName));
-        if(sub!=null){
+    public void resetBatching(String clientId, String subscriptionName) {
+        MemoryTopicSub sub = (MemoryTopicSub)topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
+        if (sub != null) {
             sub.resetBatching();
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Wed Aug  8 11:56:59 2007
@@ -27,65 +27,66 @@
  * 
  * @version $Revision: 1.7 $
  */
-class MemoryTopicSub{
+class MemoryTopicSub {
 
-    private Map map=new LinkedHashMap();
+    private Map map = new LinkedHashMap();
     private MessageId lastBatch;
 
-    void addMessage(MessageId id,Message message){
-        map.put(id,message);
+    void addMessage(MessageId id, Message message) {
+        map.put(id, message);
     }
 
-    void removeMessage(MessageId id){
+    void removeMessage(MessageId id) {
         map.remove(id);
         if (map.isEmpty()) {
-            lastBatch=null;
+            lastBatch = null;
         }
     }
 
-    int size(){
+    int size() {
         return map.size();
     }
 
-    void recoverSubscription(MessageRecoveryListener listener) throws Exception{
-        for(Iterator iter=map.entrySet().iterator();iter.hasNext();){
-            Map.Entry entry=(Entry)iter.next();
-            Object msg=entry.getValue();
-            if(msg.getClass()==MessageId.class){
+    void recoverSubscription(MessageRecoveryListener listener) throws Exception {
+        for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Entry)iter.next();
+            Object msg = entry.getValue();
+            if (msg.getClass() == MessageId.class) {
                 listener.recoverMessageReference((MessageId)msg);
-            }else{
+            } else {
                 listener.recoverMessage((Message)msg);
             }
         }
     }
 
-    void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
-        boolean pastLackBatch=lastBatch==null;
-        MessageId lastId=null;
-        // the message table is a synchronizedMap - so just have to synchronize here
-        int count=0;
-        for(Iterator iter=map.entrySet().iterator();iter.hasNext()&&count<maxReturned;){
-            Map.Entry entry=(Entry)iter.next();
-            if(pastLackBatch){
+    void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+        boolean pastLackBatch = lastBatch == null;
+        MessageId lastId = null;
+        // the message table is a synchronizedMap - so just have to synchronize
+        // here
+        int count = 0;
+        for (Iterator iter = map.entrySet().iterator(); iter.hasNext() && count < maxReturned;) {
+            Map.Entry entry = (Entry)iter.next();
+            if (pastLackBatch) {
                 count++;
-                Object msg=entry.getValue();
-                lastId=(MessageId)entry.getKey();
-                if(msg.getClass()==MessageId.class){
+                Object msg = entry.getValue();
+                lastId = (MessageId)entry.getKey();
+                if (msg.getClass() == MessageId.class) {
                     listener.recoverMessageReference((MessageId)msg);
-                }else{
+                } else {
                     listener.recoverMessage((Message)msg);
                 }
-            }else{
-                pastLackBatch=entry.getKey().equals(lastBatch);
+            } else {
+                pastLackBatch = entry.getKey().equals(lastBatch);
             }
         }
-        if(lastId!=null){
-            lastBatch=lastId;
+        if (lastId != null) {
+            lastBatch = lastId;
         }
 
     }
 
-    void resetBatching(){
-        lastBatch=null;
+    void resetBatching() {
+        lastBatch = null;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Wed Aug  8 11:56:59 2007
@@ -65,20 +65,20 @@
 
         public Message[] getMessages() {
             Message rc[] = new Message[messages.size()];
-            int count=0;
+            int count = 0;
             for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                AddMessageCommand cmd = (AddMessageCommand) iter.next();
-                rc[count++] = cmd.getMessage(); 
+                AddMessageCommand cmd = (AddMessageCommand)iter.next();
+                rc[count++] = cmd.getMessage();
             }
             return rc;
         }
 
         public MessageAck[] getAcks() {
             MessageAck rc[] = new MessageAck[acks.size()];
-            int count=0;
+            int count = 0;
             for (Iterator iter = acks.iterator(); iter.hasNext();) {
-                RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
-                rc[count++] = cmd.getMessageAck(); 
+                RemoveMessageCommand cmd = (RemoveMessageCommand)iter.next();
+                rc[count++] = cmd.getMessageAck();
             }
             return rc;
         }
@@ -87,14 +87,14 @@
          * @throws IOException
          */
         public void commit() throws IOException {
-            // Do all the message adds.            
+            // Do all the message adds.
             for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                AddMessageCommand cmd = (AddMessageCommand) iter.next();
+                AddMessageCommand cmd = (AddMessageCommand)iter.next();
                 cmd.run();
             }
             // And removes..
             for (Iterator iter = acks.iterator(); iter.hasNext();) {
-                RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
+                RemoveMessageCommand cmd = (RemoveMessageCommand)iter.next();
                 cmd.run();
             }
         }
@@ -102,49 +102,52 @@
 
     public interface AddMessageCommand {
         Message getMessage();
+
         void run() throws IOException;
     }
 
     public interface RemoveMessageCommand {
         MessageAck getMessageAck();
+
         void run() throws IOException;
     }
 
     public MessageStore proxy(MessageStore messageStore) {
-	    return new ProxyMessageStore(messageStore) {
-	        public void addMessage(ConnectionContext context, final Message send) throws IOException {
-	            MemoryTransactionStore.this.addMessage(getDelegate(), send);
-	        }
-	
-	        public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
-	            MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
-	        }
-	    };
+        return new ProxyMessageStore(messageStore) {
+            public void addMessage(ConnectionContext context, final Message send) throws IOException {
+                MemoryTransactionStore.this.addMessage(getDelegate(), send);
+            }
+
+            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
+            }
+        };
     }
 
     public TopicMessageStore proxy(TopicMessageStore messageStore) {
-	    return new ProxyTopicMessageStore(messageStore) {
-	        public void addMessage(ConnectionContext context, final Message send) throws IOException {
-	            MemoryTransactionStore.this.addMessage(getDelegate(), send);
-	        }
-	        public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
-	            MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
-	        }
-	    };
+        return new ProxyTopicMessageStore(messageStore) {
+            public void addMessage(ConnectionContext context, final Message send) throws IOException {
+                MemoryTransactionStore.this.addMessage(getDelegate(), send);
+            }
+
+            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
+            }
+        };
     }
 
     /**
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
     public void prepare(TransactionId txid) {
-        Tx tx = (Tx) inflightTransactions.remove(txid);
+        Tx tx = (Tx)inflightTransactions.remove(txid);
         if (tx == null)
             return;
         preparedTransactions.put(txid, tx);
     }
 
     public Tx getTx(Object txid) {
-        Tx tx = (Tx) inflightTransactions.get(txid);
+        Tx tx = (Tx)inflightTransactions.get(txid);
         if (tx == null) {
             tx = new Tx();
             inflightTransactions.put(txid, tx);
@@ -157,18 +160,18 @@
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
     public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
-        
+
         Tx tx;
-        if( wasPrepared ) {
-            tx = (Tx) preparedTransactions.remove(txid);
+        if (wasPrepared) {
+            tx = (Tx)preparedTransactions.remove(txid);
         } else {
-            tx = (Tx) inflightTransactions.remove(txid);
+            tx = (Tx)inflightTransactions.remove(txid);
         }
-        
-        if( tx == null )
+
+        if (tx == null)
             return;
         tx.commit();
-        
+
     }
 
     /**
@@ -187,14 +190,14 @@
 
     synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
         // All the inflight transactions get rolled back..
-        inflightTransactions.clear();        
+        inflightTransactions.clear();
         this.doingRecover = true;
         try {
-	        for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
-	            Object txid = (Object) iter.next();
-                Tx tx = (Tx) preparedTransactions.get(txid);
-                listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
-	        }
+            for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
+                Object txid = (Object)iter.next();
+                Tx tx = (Tx)preparedTransactions.get(txid);
+                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
+            }
         } finally {
             this.doingRecover = false;
         }
@@ -205,16 +208,17 @@
      * @throws IOException
      */
     void addMessage(final MessageStore destination, final Message message) throws IOException {
-        
-        if( doingRecover )
+
+        if (doingRecover)
             return;
-        
-        if (message.getTransactionId()!=null) {
+
+        if (message.getTransactionId() != null) {
             Tx tx = getTx(message.getTransactionId());
             tx.add(new AddMessageCommand() {
                 public Message getMessage() {
                     return message;
                 }
+
                 public void run() throws IOException {
                     destination.addMessage(null, message);
                 }
@@ -223,13 +227,13 @@
             destination.addMessage(null, message);
         }
     }
-    
+
     /**
      * @param ack
      * @throws IOException
      */
-    final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException {
-        if( doingRecover )
+    final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
+        if (doingRecover)
             return;
 
         if (ack.isInTransaction()) {
@@ -238,6 +242,7 @@
                 public MessageAck getMessageAck() {
                     return ack;
                 }
+
                 public void run() throws IOException {
                     destination.removeMessage(null, ack);
                 }
@@ -250,7 +255,7 @@
     public void delete() {
         inflightTransactions.clear();
         preparedTransactions.clear();
-        doingRecover=false;
+        doingRecover = false;
     }
-    
+
 }