You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2021/12/20 14:38:00 UTC

[activemq] branch main updated: [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)

This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new b196e9a  [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)
b196e9a is described below

commit b196e9a88a5e221c96346c5ec23c0029bde0fc91
Author: Matt Pavlovich <ma...@hyte.io>
AuthorDate: Mon Dec 20 08:37:56 2021 -0600

    [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)
---
 .../apache/activemq/transaction/Transaction.java   | 58 +++++++++++++---------
 1 file changed, 34 insertions(+), 24 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
index 13ec353..0933480 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
@@ -52,15 +52,15 @@ public abstract class Transaction {
         public Object call() throws Exception {
             doPreCommit();
             return null;
-        }   
+        }
     });
     protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() {
         public Object call() throws Exception {
             doPostCommit();
             return null;
-        }   
+        }
     });
-    
+
     public byte getState() {
         return state;
     }
@@ -87,15 +87,19 @@ public abstract class Transaction {
     }
 
     public Synchronization findMatching(Synchronization r) {
-        int existing = synchronizations.indexOf(r);
-        if (existing != -1) {
-            return synchronizations.get(existing);
-        }
+    	synchronized(synchronizations) {
+            int existing = synchronizations.indexOf(r);
+            if (existing != -1) {
+                return synchronizations.get(existing);
+            }
+    	}
         return null;
     }
 
     public void removeSynchronization(Synchronization r) {
-        synchronizations.remove(r);
+        synchronized(synchronizations) {
+            synchronizations.remove(r);
+        }
     }
 
     public void prePrepare() throws Exception {
@@ -119,26 +123,32 @@ public abstract class Transaction {
             throw xae;
         }
     }
-    
+
     protected void fireBeforeCommit() throws Exception {
-        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = iter.next();
-            s.beforeCommit();
+        synchronized(synchronizations) {
+            for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+                Synchronization s = iter.next();
+                s.beforeCommit();
+            }
         }
     }
 
     protected void fireAfterCommit() throws Exception {
-        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = iter.next();
-            s.afterCommit();
+        synchronized(synchronizations) {
+            for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+                Synchronization s = iter.next();
+                s.afterCommit();
+            }
         }
     }
 
     public void fireAfterRollback() throws Exception {
-    	Collections.reverse(synchronizations);
-        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
-            Synchronization s = iter.next();
-            s.afterRollback();
+    	synchronized(synchronizations) {
+            Collections.reverse(synchronizations);
+            for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+                Synchronization s = iter.next();
+               s.afterRollback();
+            }
         }
     }
 
@@ -156,15 +166,15 @@ public abstract class Transaction {
     public abstract TransactionId getTransactionId();
 
     public abstract Logger getLog();
-    
+
     public boolean isPrepared() {
         return getState() == PREPARED_STATE;
     }
-    
+
     public int size() {
         return synchronizations.size();
     }
-    
+
     protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException {
         try {
             postCommitTask.get();
@@ -179,9 +189,9 @@ public abstract class Transaction {
             } else {
                 throw new XAException(e.toString());
             }
-        }    
+        }
     }
-    
+
     protected void doPreCommit() throws XAException {
         try {
             fireBeforeCommit();