You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2021/12/23 13:49:58 UTC

[activemq] 02/03: [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)

This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit f2dbc92743cfacf3b5397f0aae82a24dcfa4b7c1
Author: Matt Pavlovich <ma...@hyte.io>
AuthorDate: Mon Dec 20 08:37:56 2021 -0600

    [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)
---
 .../apache/activemq/transaction/Transaction.java   | 58 +++++++++++++---------
 1 file changed, 34 insertions(+), 24 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
index 13ec353..0933480 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
@@ -52,15 +52,15 @@ public abstract class Transaction {
         public Object call() throws Exception {
             doPreCommit();
             return null;
-        }   
+        }
     });
     protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() {
         public Object call() throws Exception {
             doPostCommit();
             return null;
-        }   
+        }
     });
-    
+
     public byte getState() {
         return state;
     }
@@ -87,15 +87,19 @@ public abstract class Transaction {
     }
 
     public Synchronization findMatching(Synchronization r) {
-        int existing = synchronizations.indexOf(r);
-        if (existing != -1) {
-            return synchronizations.get(existing);
-        }
+    	synchronized(synchronizations) {
+            int existing = synchronizations.indexOf(r);
+            if (existing != -1) {
+                return synchronizations.get(existing);
+            }
+    	}
         return null;
     }
 
     public void removeSynchronization(Synchronization r) {
-        synchronizations.remove(r);
+        synchronized(synchronizations) {
+            synchronizations.remove(r);
+        }
     }
 
     public void prePrepare() throws Exception {
@@ -119,26 +123,32 @@ public abstract class Transaction {
             throw xae;
         }
     }
-    
+
     protected void fireBeforeCommit() throws Exception {
-        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = iter.next();
-            s.beforeCommit();
+        synchronized(synchronizations) {
+            for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+                Synchronization s = iter.next();
+                s.beforeCommit();
+            }
         }
     }
 
     protected void fireAfterCommit() throws Exception {
-        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = iter.next();
-            s.afterCommit();
+        synchronized(synchronizations) {
+            for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+                Synchronization s = iter.next();
+                s.afterCommit();
+            }
         }
     }
 
     public void fireAfterRollback() throws Exception {
-    	Collections.reverse(synchronizations);
-        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = iter.next();
-            s.afterRollback();
+    	synchronized(synchronizations) {
+            Collections.reverse(synchronizations);
+            for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+                Synchronization s = iter.next();
+               s.afterRollback();
+            }
         }
     }
 
@@ -156,15 +166,15 @@ public abstract class Transaction {
     public abstract TransactionId getTransactionId();
 
     public abstract Logger getLog();
-    
+
     public boolean isPrepared() {
         return getState() == PREPARED_STATE;
     }
-    
+
     public int size() {
         return synchronizations.size();
     }
-    
+
     protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException {
         try {
             postCommitTask.get();
@@ -179,9 +189,9 @@ public abstract class Transaction {
             } else {
                 throw new XAException(e.toString());
             }
-        }    
+        }
     }
-    
+
     protected void doPreCommit() throws XAException {
         try {
             fireBeforeCommit();