You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/28 21:51:22 UTC

svn commit: r490795 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java

Author: rajdavies
Date: Thu Dec 28 12:51:22 2006
New Revision: 490795

URL: http://svn.apache.org/viewvc?view=rev&rev=490795
Log:
Transactions, when recovered, need to be recovered in prepare order:
a)  order determined by an unsorted map iterator might confuse people
b) brokerSequence can get out of step for messages recovered then committed - e.g. 
recovery on startup

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java?view=diff&rev=490795&r1=490794&r2=490795
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java Thu Dec 28 12:51:22 2006
@@ -21,6 +21,9 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import javax.transaction.xa.XAException;
 
@@ -34,15 +37,14 @@
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
 
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  */
 public class JournalTransactionStore implements TransactionStore {
 
     private final JournalPersistenceAdapter peristenceAdapter;
-    ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
-    ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
+    Map inflightTransactions = new LinkedHashMap();
+    Map preparedTransactions = new LinkedHashMap();
     private boolean doingRecover;
 
     
@@ -128,30 +130,43 @@
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
-    public void prepare(TransactionId txid) throws IOException {
-        Tx tx = (Tx) inflightTransactions.remove(txid);
-        if (tx == null)
+    public void prepare(TransactionId txid) throws IOException{
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=(Tx)inflightTransactions.remove(txid);
+        }
+        if(tx==null)
             return;
-        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
-        preparedTransactions.put(txid, tx);
+        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
+        synchronized(preparedTransactions){
+            preparedTransactions.put(txid,tx);
+        }
     }
     
     /**
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
-    public void replayPrepare(TransactionId txid) throws IOException {
-        Tx tx = (Tx) inflightTransactions.remove(txid);
-        if (tx == null)
+    public void replayPrepare(TransactionId txid) throws IOException{
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=(Tx)inflightTransactions.remove(txid);
+        }
+        if(tx==null)
             return;
-        preparedTransactions.put(txid, tx);
+        synchronized(preparedTransactions){
+            preparedTransactions.put(txid,tx);
+        }
     }
 
-    public Tx getTx(Object txid, RecordLocation location) {
-        Tx tx = (Tx) inflightTransactions.get(txid);
-        if (tx == null) {
-            tx = new Tx(location);
-            inflightTransactions.put(txid, tx);
+    public Tx getTx(Object txid,RecordLocation location){
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=(Tx)inflightTransactions.get(txid);
+        }
+        if(tx==null){
+            tx=new Tx(location);
+            inflightTransactions.put(txid,tx);
         }
         return tx;
     }
@@ -160,22 +175,23 @@
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+    public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
         Tx tx;
-        if (wasPrepared) {
-            tx = (Tx) preparedTransactions.remove(txid);
-        } else {
-            tx = (Tx) inflightTransactions.remove(txid);
+        if(wasPrepared){
+            synchronized(preparedTransactions){
+                tx=(Tx)preparedTransactions.remove(txid);
+            }
+        }else{
+            synchronized(inflightTransactions){
+                tx=(Tx)inflightTransactions.remove(txid);
+            }
         }
-
-        if (tx == null)
+        if(tx==null)
             return;
-
-        if (txid.isXATransaction()) {
-            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared),
-                    true);
-        } else {
-            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared),
+        if(txid.isXATransaction()){
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
+        }else{
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
                     true);
         }
     }
@@ -184,11 +200,15 @@
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
-        if (wasPrepared) {
-            return (Tx) preparedTransactions.remove(txid);
-        } else {
-            return (Tx) inflightTransactions.remove(txid);
+    public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
+        if(wasPrepared){
+            synchronized(preparedTransactions){
+                return (Tx)preparedTransactions.remove(txid);
+            }
+        }else{
+            synchronized(inflightTransactions){
+                return (Tx)inflightTransactions.remove(txid);
+            }
         }
     }
 
@@ -196,31 +216,39 @@
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
-    public void rollback(TransactionId txid) throws IOException {
-
-        Tx tx = (Tx) inflightTransactions.remove(txid);
-        if (tx != null)
-            tx = (Tx) preparedTransactions.remove(txid);
-
-        if (tx != null) {
-            if (txid.isXATransaction()) {
-                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false),
-                        true);
-            } else {
-                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false),
+    public void rollback(TransactionId txid) throws IOException{
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=(Tx)inflightTransactions.remove(txid);
+        }
+        if(tx!=null)
+            synchronized(preparedTransactions){
+                tx=(Tx)preparedTransactions.remove(txid);
+            }
+        if(tx!=null){
+            if(txid.isXATransaction()){
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
+            }else{
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
                         true);
             }
         }
-
     }
 
     /**
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
-    public void replayRollback(TransactionId txid) throws IOException {
-        if (inflightTransactions.remove(txid) != null)
-            preparedTransactions.remove(txid);
+    public void replayRollback(TransactionId txid) throws IOException{
+        boolean inflight=false;
+        synchronized(inflightTransactions){
+            inflight=inflightTransactions.remove(txid)!=null;
+        }
+        if(inflight){
+            synchronized(preparedTransactions){
+                preparedTransactions.remove(txid);
+            }
+        }
     }
     
     public void start() throws Exception {
@@ -229,18 +257,24 @@
     public void stop() throws Exception {
     }
     
-    synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
+    synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
         // All the in-flight transactions get rolled back..
-        inflightTransactions.clear();
-        this.doingRecover = true;
-        try {
-            for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
-                Object txid = (Object) iter.next();
-                Tx tx = (Tx) preparedTransactions.get(txid);
-                listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
+        synchronized(inflightTransactions){
+            inflightTransactions.clear();
+        }
+        this.doingRecover=true;
+        try{
+            Map txs=null;
+            synchronized(preparedTransactions){
+                txs=new LinkedHashMap(preparedTransactions);
+            }
+            for(Iterator iter=txs.keySet().iterator();iter.hasNext();){
+                Object txid=(Object)iter.next();
+                Tx tx=(Tx)txs.get(txid);
+                listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
             }
-        } finally {
-            this.doingRecover = false;
+        }finally{
+            this.doingRecover=false;
         }
     }
 
@@ -269,30 +303,32 @@
     }
 
 
-    public RecordLocation checkpoint() throws IOException {
-        
+    public RecordLocation checkpoint() throws IOException{
         // Nothing really to checkpoint.. since, we don't
         // checkpoint tx operations in to long term store until they are committed.
-
         // But we keep track of the first location of an operation
         // that was associated with an active tx. The journal can not
-        // roll over active tx records.        
-        RecordLocation rc = null;
-        for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
-            Tx tx = (Tx) iter.next();
-            RecordLocation location = tx.location;
-            if (rc == null || rc.compareTo(location) < 0) {
-                rc = location;
-            }
-        }
-        for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
-            Tx tx = (Tx) iter.next();
-            RecordLocation location = tx.location;
-            if (rc == null || rc.compareTo(location) < 0) {
-                rc = location;
+        // roll over active tx records.
+        RecordLocation rc=null;
+        synchronized(inflightTransactions){
+            for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){
+                Tx tx=(Tx)iter.next();
+                RecordLocation location=tx.location;
+                if(rc==null||rc.compareTo(location)<0){
+                    rc=location;
+                }
+            }
+        }
+        synchronized(preparedTransactions){
+            for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){
+                Tx tx=(Tx)iter.next();
+                RecordLocation location=tx.location;
+                if(rc==null||rc.compareTo(location)<0){
+                    rc=location;
+                }
             }
+            return rc;
         }
-        return rc;
     }
 
     public boolean isDoingRecover() {