You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/28 21:48:05 UTC

svn commit: r490792 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java

Author: rajdavies
Date: Thu Dec 28 12:48:04 2006
New Revision: 490792

URL: http://svn.apache.org/viewvc?view=rev&rev=490792
Log:
Transactions, when recovered, need to be recovered in prepare order:
a)  order determined by an unsorted map iterator might confuse people
b) brokerSequence can get out of step for messages recovered then committed - e.g. 
recovery on startup

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?view=diff&rev=490792&r1=490791&r2=490792
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java Thu Dec 28 12:48:04 2006
@@ -40,6 +40,7 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
@@ -53,7 +54,7 @@
 
     // The prepared XA transactions.
     private TransactionStore transactionStore;
-    private ConcurrentHashMap xaTransactions = new ConcurrentHashMap();
+    private Map xaTransactions = new LinkedHashMap();
 
     public TransactionBroker(Broker next, TransactionStore transactionStore) {
         super(next);
@@ -70,8 +71,7 @@
      * Recovers any prepared transactions.
      */
     public void start() throws Exception {
-        next.start();
-        transactionStore.start();
+        transactionStore.start();  
         try {
             final ConnectionContext context = new ConnectionContext();
             context.setBroker(this);
@@ -99,6 +99,7 @@
             Throwable cause = e.getCause();
             throw IOExceptionSupport.create("Recovery Failed: "+cause.getMessage(), cause);
         }
+        next.start();
     }
     
     public void stop() throws Exception {
@@ -114,33 +115,36 @@
     //////////////////////////////////////////////////////////////////////////////
     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
         ArrayList txs = new ArrayList();
-        for (Iterator iter = xaTransactions.values().iterator(); iter.hasNext();) {
-            Transaction tx = (Transaction) iter.next();
-            if( tx.isPrepared() )
-                txs.add(tx.getTransactionId());
+        synchronized(xaTransactions){
+            for(Iterator iter=xaTransactions.values().iterator();iter.hasNext();){
+                Transaction tx=(Transaction)iter.next();
+                if(tx.isPrepared())
+                    txs.add(tx.getTransactionId());
+            }
         }
         XATransactionId rc[] = new XATransactionId[txs.size()];
         txs.toArray(rc);
         return rc;
     }
 
-    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
-        
+    public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception{
         // the transaction may have already been started.
-        if( xid.isXATransaction() ) {
-            Transaction transaction = (Transaction)xaTransactions.get(xid);
-            if( transaction != null  )
-                return;
-            transaction = new XATransaction(transactionStore, (XATransactionId)xid, this);
-            xaTransactions.put(xid, transaction);
-        } else {
-            Map transactionMap = context.getTransactions();        
-            Transaction transaction = (Transaction)transactionMap.get(xid);
-            if( transaction != null  )
+        if(xid.isXATransaction()){
+            Transaction transaction=null;
+            synchronized(xaTransactions){
+                transaction=(Transaction)xaTransactions.get(xid);
+                if(transaction!=null)
+                    return;
+                transaction=new XATransaction(transactionStore,(XATransactionId)xid,this);
+                xaTransactions.put(xid,transaction);
+            }
+        }else{
+            Map transactionMap=context.getTransactions();
+            Transaction transaction=(Transaction)transactionMap.get(xid);
+            if(transaction!=null)
                 throw new JMSException("Transaction '"+xid+"' has already been started.");
-            
-            transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
-            transactionMap.put(xid, transaction);
+            transaction=new LocalTransaction(transactionStore,(LocalTransactionId)xid,context);
+            transactionMap.put(xid,transaction);
         }
     }
 
@@ -215,24 +219,28 @@
     // Implementation help methods.
     //
     //////////////////////////////////////////////////////////////////////////////
-    public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
-        Map transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();        
-        Transaction transaction = (Transaction)transactionMap.get(xid);
-
-        if( transaction != null  )
+    public Transaction getTransaction(ConnectionContext context,TransactionId xid,boolean mightBePrepared)
+            throws JMSException,XAException{
+        Map transactionMap=null;
+        synchronized(xaTransactions){
+            transactionMap=xid.isXATransaction()?xaTransactions:context.getTransactions();
+        }
+        Transaction transaction=(Transaction)transactionMap.get(xid);
+        if(transaction!=null)
             return transaction;
-        
-        if( xid.isXATransaction() ) {
-            XAException e = new XAException("Transaction '" + xid + "' has not been started.");
-            e.errorCode = XAException.XAER_NOTA;
+        if(xid.isXATransaction()){
+            XAException e=new XAException("Transaction '"+xid+"' has not been started.");
+            e.errorCode=XAException.XAER_NOTA;
             throw e;
-        } else {
-            throw new JMSException("Transaction '" + xid + "' has not been started.");
+        }else{
+            throw new JMSException("Transaction '"+xid+"' has not been started.");
         }
     }
 
-    public void removeTransaction(XATransactionId xid) {
-        xaTransactions.remove(xid);
+    public void removeTransaction(XATransactionId xid){
+        synchronized(xaTransactions){
+            xaTransactions.remove(xid);
+        }
     }
 
 }