You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/28 21:48:05 UTC
svn commit: r490792 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
Author: rajdavies
Date: Thu Dec 28 12:48:04 2006
New Revision: 490792
URL: http://svn.apache.org/viewvc?view=rev&rev=490792
Log:
Transactions, when recovered, need to be recovered in prepare order:
a) order determined by an unsorted map iterator might confuse people
b) brokerSequence can get out of step for messages recovered then committed - e.g.
recovery on startup
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?view=diff&rev=490792&r1=490791&r2=490792
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java Thu Dec 28 12:48:04 2006
@@ -40,6 +40,7 @@
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
/**
@@ -53,7 +54,7 @@
// The prepared XA transactions.
private TransactionStore transactionStore;
- private ConcurrentHashMap xaTransactions = new ConcurrentHashMap();
+ private Map xaTransactions = new LinkedHashMap();
public TransactionBroker(Broker next, TransactionStore transactionStore) {
super(next);
@@ -70,8 +71,7 @@
* Recovers any prepared transactions.
*/
public void start() throws Exception {
- next.start();
- transactionStore.start();
+ transactionStore.start();
try {
final ConnectionContext context = new ConnectionContext();
context.setBroker(this);
@@ -99,6 +99,7 @@
Throwable cause = e.getCause();
throw IOExceptionSupport.create("Recovery Failed: "+cause.getMessage(), cause);
}
+ next.start();
}
public void stop() throws Exception {
@@ -114,33 +115,36 @@
//////////////////////////////////////////////////////////////////////////////
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
ArrayList txs = new ArrayList();
- for (Iterator iter = xaTransactions.values().iterator(); iter.hasNext();) {
- Transaction tx = (Transaction) iter.next();
- if( tx.isPrepared() )
- txs.add(tx.getTransactionId());
+ synchronized(xaTransactions){
+ for(Iterator iter=xaTransactions.values().iterator();iter.hasNext();){
+ Transaction tx=(Transaction)iter.next();
+ if(tx.isPrepared())
+ txs.add(tx.getTransactionId());
+ }
}
XATransactionId rc[] = new XATransactionId[txs.size()];
txs.toArray(rc);
return rc;
}
- public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
-
+ public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception{
// the transaction may have already been started.
- if( xid.isXATransaction() ) {
- Transaction transaction = (Transaction)xaTransactions.get(xid);
- if( transaction != null )
- return;
- transaction = new XATransaction(transactionStore, (XATransactionId)xid, this);
- xaTransactions.put(xid, transaction);
- } else {
- Map transactionMap = context.getTransactions();
- Transaction transaction = (Transaction)transactionMap.get(xid);
- if( transaction != null )
+ if(xid.isXATransaction()){
+ Transaction transaction=null;
+ synchronized(xaTransactions){
+ transaction=(Transaction)xaTransactions.get(xid);
+ if(transaction!=null)
+ return;
+ transaction=new XATransaction(transactionStore,(XATransactionId)xid,this);
+ xaTransactions.put(xid,transaction);
+ }
+ }else{
+ Map transactionMap=context.getTransactions();
+ Transaction transaction=(Transaction)transactionMap.get(xid);
+ if(transaction!=null)
throw new JMSException("Transaction '"+xid+"' has already been started.");
-
- transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
- transactionMap.put(xid, transaction);
+ transaction=new LocalTransaction(transactionStore,(LocalTransactionId)xid,context);
+ transactionMap.put(xid,transaction);
}
}
@@ -215,24 +219,28 @@
// Implementation help methods.
//
//////////////////////////////////////////////////////////////////////////////
- public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
- Map transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
- Transaction transaction = (Transaction)transactionMap.get(xid);
-
- if( transaction != null )
+ public Transaction getTransaction(ConnectionContext context,TransactionId xid,boolean mightBePrepared)
+ throws JMSException,XAException{
+ Map transactionMap=null;
+ synchronized(xaTransactions){
+ transactionMap=xid.isXATransaction()?xaTransactions:context.getTransactions();
+ }
+ Transaction transaction=(Transaction)transactionMap.get(xid);
+ if(transaction!=null)
return transaction;
-
- if( xid.isXATransaction() ) {
- XAException e = new XAException("Transaction '" + xid + "' has not been started.");
- e.errorCode = XAException.XAER_NOTA;
+ if(xid.isXATransaction()){
+ XAException e=new XAException("Transaction '"+xid+"' has not been started.");
+ e.errorCode=XAException.XAER_NOTA;
throw e;
- } else {
- throw new JMSException("Transaction '" + xid + "' has not been started.");
+ }else{
+ throw new JMSException("Transaction '"+xid+"' has not been started.");
}
}
- public void removeTransaction(XATransactionId xid) {
- xaTransactions.remove(xid);
+ public void removeTransaction(XATransactionId xid){
+ synchronized(xaTransactions){
+ xaTransactions.remove(xid);
+ }
}
}