You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/07/18 22:34:20 UTC

svn commit: r557391 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: amq/AMQPersistenceAdapter.java kahadaptor/KahaReferenceStoreAdapter.java

Author: rajdavies
Date: Wed Jul 18 13:34:19 2007
New Revision: 557391

URL: http://svn.apache.org/viewvc?view=rev&rev=557391
Log:
persist in-progress XA transactions - in order to speed up recovery

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=557391&r1=557390&r2=557391
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Jul 18 13:34:19 2007
@@ -11,7 +11,6 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-
 package org.apache.activemq.store.amq;
 
 import java.io.File;
@@ -49,8 +48,6 @@
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TopicReferenceStore;
 import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.amq.AMQTransactionStore.Tx;
-import org.apache.activemq.store.amq.AMQTransactionStore.TxOperation;
 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
 import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Scheduler;
@@ -180,11 +177,8 @@
 // The following was attempting to reduce startup times by avoiding the log 
 // file scanning that recovery performs.  The problem with it is that XA transactions
 // only live in transaction log and are not stored in the reference store, but they still
-// need to be recovered when the broker starts up.  Perhaps on a graceful shutdown we 
-// should record all the in flight XA transactions to a file to avoid having to scan 
-// the entire transaction log.  For now going to comment this bit out.        
-//    
-        /*
+// need to be recovered when the broker starts up.  
+        
         if(referenceStoreAdapter.isStoreValid()==false){
             log.warn("The ReferenceStore is not valid - recovering ...");
             recover();
@@ -192,10 +186,11 @@
         }else {
            Location location=writeTraceMessage("RECOVERED "+new Date(),true);
             asyncDataManager.setMark(location,true);
+            //recover transactions
+            getTransactionStore().setPreparedTransactions(referenceStoreAdapter.retrievePreparedState());
        }
-        */
-        recover();
         
+              
         // Do a checkpoint periodically.
         periodicCheckpointTask=new Runnable(){
 
@@ -237,6 +232,7 @@
         synchronized(this){
             checkpointTask.shutdown();
         }
+        referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
         queues.clear();
         topics.clear();
         IOException firstException=null;
@@ -355,13 +351,15 @@
         return destinations;
     }
 
-    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
+    MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
         if(destination.isQueue()){
             return createQueueMessageStore((ActiveMQQueue)destination);
         }else{
             return createTopicMessageStore((ActiveMQTopic)destination);
         }
     }
+    
+    
 
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
         AMQMessageStore store=queues.get(destination);
@@ -494,28 +492,16 @@
                             break;
                         case JournalTransaction.XA_COMMIT:
                         case JournalTransaction.LOCAL_COMMIT:
-                            Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
+                            AMQTx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
                             if(tx==null)
                                 break; // We may be trying to replay a commit that
                             // was already committed.
                             // Replay the committed operations.
                             tx.getOperations();
                             for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){
-                                TxOperation op=(TxOperation)iter.next();
-                                if(op.operationType==TxOperation.ADD_OPERATION_TYPE){
-                                    if(op.store.replayAddMessage(context,(Message)op.data,op.location))
-                                        redoCounter++;
-                                }
-                                if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){
-                                    if(op.store.replayRemoveMessage(context,(MessageAck)op.data))
-                                        redoCounter++;
-                                }
-                                if(op.operationType==TxOperation.ACK_OPERATION_TYPE){
-                                    JournalTopicAck ack=(JournalTopicAck)op.data;
-                                    if(((AMQTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack
-                                            .getSubscritionName(),ack.getMessageId())){
-                                        redoCounter++;
-                                    }
+                                AMQTxOperation op=(AMQTxOperation)iter.next();
+                                if (op.replay(this,context)) {
+                                    redoCounter++;
                                 }
                             }
                             break;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=557391&r1=557390&r2=557391
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Jul 18 13:34:19 2007
@@ -20,8 +20,10 @@
 import java.io.*;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -29,6 +31,7 @@
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.CommandMarshaller;
 import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
@@ -41,30 +44,33 @@
 import org.apache.activemq.store.ReferenceStoreAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.amq.AMQTx;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.codehaus.groovy.antlr.treewalker.PreOrderTraversal;
 
-public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
-    private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class);
-   private static final String STORE_STATE = "store-state";
-   private static final String RECORD_REFERENCES = "record-references";
+public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter{
+
+    private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
+    private static final String STORE_STATE="store-state";
+    private static final String RECORD_REFERENCES="record-references";
+    private static final String TRANSACTIONS="transactions-state";
     private MapContainer stateMap;
-	private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
+    private MapContainer preparedTransactions;
+    private Map<Integer,AtomicInteger> recordReferences=new HashMap<Integer,AtomicInteger>();
     private ListContainer durableSubscribers;
     private boolean storeValid;
     private Store stateStore;
 
-	
     public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
-    	throw new RuntimeException("Use createQueueReferenceStore instead");
+        throw new RuntimeException("Use createQueueReferenceStore instead");
     }
 
     public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
-    	throw new RuntimeException("Use createTopicReferenceStore instead");
+        throw new RuntimeException("Use createTopicReferenceStore instead");
     }
-    
-    @Override
-    public synchronized void start() throws Exception{
+
+    @Override public synchronized void start() throws Exception{
         super.start();
         Store store=getStateStore();
         boolean empty=store.getMapContainerIds().isEmpty();
@@ -82,43 +88,44 @@
             }
         }
         stateMap.put(STORE_STATE,new AtomicBoolean());
-        durableSubscribers = store.getListContainer("durableSubscribers");
+        durableSubscribers=store.getListContainer("durableSubscribers");
         durableSubscribers.setMarshaller(new CommandMarshaller());
+        preparedTransactions=store.getMapContainer("transactions",TRANSACTIONS,false);
+        //need to set the Marshallers here
+        preparedTransactions.setKeyMarshaller(Store.CommandMarshaller);
+        preparedTransactions.setValueMarshaller(new AMQTxMarshaller(wireFormat));
     }
-       
-    @Override
-    public synchronized void stop() throws Exception {
+
+    @Override public synchronized void stop() throws Exception{
         stateMap.put(RECORD_REFERENCES,recordReferences);
         stateMap.put(STORE_STATE,new AtomicBoolean(true));
-        if (this.stateStore != null) {
+        if(this.stateStore!=null){
             this.stateStore.close();
-            this.stateStore = null;
-            this.stateMap = null;
+            this.stateStore=null;
+            this.stateMap=null;
         }
-        super.stop();        
+        super.stop();
     }
-    
-    
-    public boolean isStoreValid() {
+
+    public boolean isStoreValid(){
         return storeValid;
     }
-    
 
-	public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
-		ReferenceStore rc=(ReferenceStore)queues.get(destination);
+    public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException{
+        ReferenceStore rc=(ReferenceStore)queues.get(destination);
         if(rc==null){
             rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
             messageStores.put(destination,rc);
-//            if(transactionStore!=null){
-//                rc=transactionStore.proxy(rc);
-//            }
+            //            if(transactionStore!=null){
+            //                rc=transactionStore.proxy(rc);
+            //            }
             queues.put(destination,rc);
         }
         return rc;
-	}
+    }
 
-	public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
-		TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
+    public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException{
+        TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
         if(rc==null){
             Store store=getStore();
             MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data");
@@ -127,54 +134,52 @@
             ackContainer.setMarshaller(new TopicSubAckMarshaller());
             rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
             messageStores.put(destination,rc);
-//            if(transactionStore!=null){
-//                rc=transactionStore.proxy(rc);
-//            }
+            //            if(transactionStore!=null){
+            //                rc=transactionStore.proxy(rc);
+            //            }
             topics.put(destination,rc);
         }
         return rc;
-	}
+    }
 
-	public void buildReferenceFileIdsInUse() throws IOException {
-		
-        recordReferences = new HashMap<Integer,AtomicInteger>();
-		
-		Set<ActiveMQDestination> destinations = getDestinations();
-		for (ActiveMQDestination destination : destinations) {
-			if( destination.isQueue() ) {
-				KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination);
-				store.addReferenceFileIdsInUse();
-			} else {
-				KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination);
-				store.addReferenceFileIdsInUse();
-			}
-        }		
-	}
-    
-        
-    protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException{
+    public void buildReferenceFileIdsInUse() throws IOException{
+        recordReferences=new HashMap<Integer,AtomicInteger>();
+        Set<ActiveMQDestination> destinations=getDestinations();
+        for(ActiveMQDestination destination:destinations){
+            if(destination.isQueue()){
+                KahaReferenceStore store=(KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination);
+                store.addReferenceFileIdsInUse();
+            }else{
+                KahaTopicReferenceStore store=(KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination);
+                store.addReferenceFileIdsInUse();
+            }
+        }
+    }
+
+    protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName)
+            throws IOException{
         Store store=getStore();
-        MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);
+        MapContainer<MessageId,ReferenceRecord> container=store.getMapContainer(id,containerName);
         container.setKeyMarshaller(new MessageIdMarshaller());
-        container.setValueMarshaller(new ReferenceRecordMarshaller());        
+        container.setValueMarshaller(new ReferenceRecordMarshaller());
         container.load();
         return container;
     }
-    
-    synchronized void addInterestInRecordFile(int recordNumber) {
-        Integer key = Integer.valueOf(recordNumber);
-        AtomicInteger rr = recordReferences.get(key);
-        if (rr == null) {
-            rr = new AtomicInteger();
+
+    synchronized void addInterestInRecordFile(int recordNumber){
+        Integer key=Integer.valueOf(recordNumber);
+        AtomicInteger rr=recordReferences.get(key);
+        if(rr==null){
+            rr=new AtomicInteger();
             recordReferences.put(key,rr);
         }
         rr.incrementAndGet();
     }
-    
-    synchronized void removeInterestInRecordFile(int recordNumber) {
-        Integer key = Integer.valueOf(recordNumber);
-        AtomicInteger rr = recordReferences.get(key);
-        if (rr != null && rr.decrementAndGet() <= 0) {
+
+    synchronized void removeInterestInRecordFile(int recordNumber){
+        Integer key=Integer.valueOf(recordNumber);
+        AtomicInteger rr=recordReferences.get(key);
+        if(rr!=null&&rr.decrementAndGet()<=0){
             recordReferences.remove(key);
         }
     }
@@ -196,28 +201,45 @@
     public void clearMessages() throws IOException{
         deleteAllMessages();
     }
-    
+
     /**
      * 
      * @throws IOException 
      * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
      */
     public void recoverState() throws IOException{
-        for (Iterator i = durableSubscribers.iterator();i.hasNext();) {
-            SubscriptionInfo info = (SubscriptionInfo)i.next();
-            TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
+        for(Iterator i=durableSubscribers.iterator();i.hasNext();){
+            SubscriptionInfo info=(SubscriptionInfo)i.next();
+            TopicReferenceStore ts=createTopicReferenceStore((ActiveMQTopic)info.getDestination());
             ts.addSubsciption(info.getClientId(),info.getSubcriptionName(),info.getSelector(),false);
         }
-        
     }
-    
-    @Override
-    public synchronized void setDirectory(File directory){
-        File file = new File(directory,"data");
+
+    public Map<TransactionId,AMQTx> retrievePreparedState() throws IOException{
+        Map<TransactionId,AMQTx> result=new HashMap<TransactionId,AMQTx>();
+        preparedTransactions.load();
+        for(Iterator i=preparedTransactions.keySet().iterator();i.hasNext();){
+            TransactionId key=(TransactionId)i.next();
+            AMQTx value=(AMQTx)preparedTransactions.get(key);
+            result.put(key,value);
+        }
+        return result;
+    }
+
+    public void savePreparedState(Map<TransactionId,AMQTx> map) throws IOException{
+        preparedTransactions.clear();
+        for(Iterator<Map.Entry<TransactionId,AMQTx>> iter=map.entrySet().iterator();iter.hasNext();){
+            Map.Entry<TransactionId,AMQTx> entry=iter.next();
+            preparedTransactions.put(entry.getKey(),entry.getValue());
+        }
+    }
+
+    @Override public synchronized void setDirectory(File directory){
+        File file=new File(directory,"data");
         super.setDirectory(file);
         this.stateStore=createStateStore(directory);
     }
-    
+
     protected synchronized Store getStateStore() throws IOException{
         if(this.stateStore==null){
             File stateDirectory=new File(getDirectory(),"kr-state");
@@ -226,8 +248,8 @@
         }
         return this.stateStore;
     }
-    
-    private Store createStateStore(File directory) {
+
+    private Store createStateStore(File directory){
         File stateDirectory=new File(directory,"state");
         stateDirectory.mkdirs();
         try{
@@ -236,19 +258,15 @@
             log.error("Failed to create the state store",e);
         }
         return null;
-        
     }
-    
-    protected void addSubscriberState(SubscriptionInfo info) throws IOException {
+
+    protected void addSubscriberState(SubscriptionInfo info) throws IOException{
         durableSubscribers.add(info);
     }
-    
-    protected void removeSubscriberState(SubscriptionInfo info) {
+
+    protected void removeSubscriberState(SubscriptionInfo info){
         durableSubscribers.remove(info);
     }
-
-   
-        
 }