You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/28 21:51:22 UTC
svn commit: r490795 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
Author: rajdavies
Date: Thu Dec 28 12:51:22 2006
New Revision: 490795
URL: http://svn.apache.org/viewvc?view=rev&rev=490795
Log:
Transactions, when recovered, need to be recovered in prepare order:
a) order determined by an unsorted map iterator might confuse people
b) brokerSequence can get out of step for messages recovered then committed - e.g.
recovery on startup
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java?view=diff&rev=490795&r1=490794&r2=490795
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java Thu Dec 28 12:51:22 2006
@@ -21,6 +21,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import javax.transaction.xa.XAException;
@@ -34,15 +37,14 @@
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
-import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class JournalTransactionStore implements TransactionStore {
private final JournalPersistenceAdapter peristenceAdapter;
- ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
- ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
+ Map inflightTransactions = new LinkedHashMap();
+ Map preparedTransactions = new LinkedHashMap();
private boolean doingRecover;
@@ -128,30 +130,43 @@
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
- public void prepare(TransactionId txid) throws IOException {
- Tx tx = (Tx) inflightTransactions.remove(txid);
- if (tx == null)
+ public void prepare(TransactionId txid) throws IOException{
+ Tx tx=null;
+ synchronized(inflightTransactions){
+ tx=(Tx)inflightTransactions.remove(txid);
+ }
+ if(tx==null)
return;
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
- preparedTransactions.put(txid, tx);
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
+ synchronized(preparedTransactions){
+ preparedTransactions.put(txid,tx);
+ }
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
- public void replayPrepare(TransactionId txid) throws IOException {
- Tx tx = (Tx) inflightTransactions.remove(txid);
- if (tx == null)
+ public void replayPrepare(TransactionId txid) throws IOException{
+ Tx tx=null;
+ synchronized(inflightTransactions){
+ tx=(Tx)inflightTransactions.remove(txid);
+ }
+ if(tx==null)
return;
- preparedTransactions.put(txid, tx);
+ synchronized(preparedTransactions){
+ preparedTransactions.put(txid,tx);
+ }
}
- public Tx getTx(Object txid, RecordLocation location) {
- Tx tx = (Tx) inflightTransactions.get(txid);
- if (tx == null) {
- tx = new Tx(location);
- inflightTransactions.put(txid, tx);
+ public Tx getTx(Object txid,RecordLocation location){
+ Tx tx=null;
+ synchronized(inflightTransactions){
+ tx=(Tx)inflightTransactions.get(txid);
+ }
+ if(tx==null){
+ tx=new Tx(location);
+ inflightTransactions.put(txid,tx);
}
return tx;
}
@@ -160,22 +175,23 @@
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+ public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
Tx tx;
- if (wasPrepared) {
- tx = (Tx) preparedTransactions.remove(txid);
- } else {
- tx = (Tx) inflightTransactions.remove(txid);
+ if(wasPrepared){
+ synchronized(preparedTransactions){
+ tx=(Tx)preparedTransactions.remove(txid);
+ }
+ }else{
+ synchronized(inflightTransactions){
+ tx=(Tx)inflightTransactions.remove(txid);
+ }
}
-
- if (tx == null)
+ if(tx==null)
return;
-
- if (txid.isXATransaction()) {
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared),
- true);
- } else {
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared),
+ if(txid.isXATransaction()){
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
+ }else{
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
true);
}
}
@@ -184,11 +200,15 @@
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
- if (wasPrepared) {
- return (Tx) preparedTransactions.remove(txid);
- } else {
- return (Tx) inflightTransactions.remove(txid);
+ public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
+ if(wasPrepared){
+ synchronized(preparedTransactions){
+ return (Tx)preparedTransactions.remove(txid);
+ }
+ }else{
+ synchronized(inflightTransactions){
+ return (Tx)inflightTransactions.remove(txid);
+ }
}
}
@@ -196,31 +216,39 @@
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
- public void rollback(TransactionId txid) throws IOException {
-
- Tx tx = (Tx) inflightTransactions.remove(txid);
- if (tx != null)
- tx = (Tx) preparedTransactions.remove(txid);
-
- if (tx != null) {
- if (txid.isXATransaction()) {
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false),
- true);
- } else {
- peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false),
+ public void rollback(TransactionId txid) throws IOException{
+ Tx tx=null;
+ synchronized(inflightTransactions){
+ tx=(Tx)inflightTransactions.remove(txid);
+ }
+ if(tx!=null)
+ synchronized(preparedTransactions){
+ tx=(Tx)preparedTransactions.remove(txid);
+ }
+ if(tx!=null){
+ if(txid.isXATransaction()){
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
+ }else{
+ peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
true);
}
}
-
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
- public void replayRollback(TransactionId txid) throws IOException {
- if (inflightTransactions.remove(txid) != null)
- preparedTransactions.remove(txid);
+ public void replayRollback(TransactionId txid) throws IOException{
+ boolean inflight=false;
+ synchronized(inflightTransactions){
+ inflight=inflightTransactions.remove(txid)!=null;
+ }
+ if(inflight){
+ synchronized(preparedTransactions){
+ preparedTransactions.remove(txid);
+ }
+ }
}
public void start() throws Exception {
@@ -229,18 +257,24 @@
public void stop() throws Exception {
}
- synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
+ synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
// All the in-flight transactions get rolled back..
- inflightTransactions.clear();
- this.doingRecover = true;
- try {
- for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
- Object txid = (Object) iter.next();
- Tx tx = (Tx) preparedTransactions.get(txid);
- listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
+ synchronized(inflightTransactions){
+ inflightTransactions.clear();
+ }
+ this.doingRecover=true;
+ try{
+ Map txs=null;
+ synchronized(preparedTransactions){
+ txs=new LinkedHashMap(preparedTransactions);
+ }
+ for(Iterator iter=txs.keySet().iterator();iter.hasNext();){
+ Object txid=(Object)iter.next();
+ Tx tx=(Tx)txs.get(txid);
+ listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
}
- } finally {
- this.doingRecover = false;
+ }finally{
+ this.doingRecover=false;
}
}
@@ -269,30 +303,32 @@
}
- public RecordLocation checkpoint() throws IOException {
-
+ public RecordLocation checkpoint() throws IOException{
// Nothing really to checkpoint.. since, we don't
// checkpoint tx operations in to long term store until they are committed.
-
// But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not
- // roll over active tx records.
- RecordLocation rc = null;
- for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
- Tx tx = (Tx) iter.next();
- RecordLocation location = tx.location;
- if (rc == null || rc.compareTo(location) < 0) {
- rc = location;
- }
- }
- for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
- Tx tx = (Tx) iter.next();
- RecordLocation location = tx.location;
- if (rc == null || rc.compareTo(location) < 0) {
- rc = location;
+ // roll over active tx records.
+ RecordLocation rc=null;
+ synchronized(inflightTransactions){
+ for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){
+ Tx tx=(Tx)iter.next();
+ RecordLocation location=tx.location;
+ if(rc==null||rc.compareTo(location)<0){
+ rc=location;
+ }
+ }
+ }
+ synchronized(preparedTransactions){
+ for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){
+ Tx tx=(Tx)iter.next();
+ RecordLocation location=tx.location;
+ if(rc==null||rc.compareTo(location)<0){
+ rc=location;
+ }
}
+ return rc;
}
- return rc;
}
public boolean isDoingRecover() {