You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2021/12/20 14:38:00 UTC
[activemq] branch main updated: [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)
This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new b196e9a [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)
b196e9a is described below
commit b196e9a88a5e221c96346c5ec23c0029bde0fc91
Author: Matt Pavlovich <ma...@hyte.io>
AuthorDate: Mon Dec 20 08:37:56 2021 -0600
[AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720)
---
.../apache/activemq/transaction/Transaction.java | 58 +++++++++++++---------
1 file changed, 34 insertions(+), 24 deletions(-)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
index 13ec353..0933480 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
@@ -52,15 +52,15 @@ public abstract class Transaction {
public Object call() throws Exception {
doPreCommit();
return null;
- }
+ }
});
protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() {
public Object call() throws Exception {
doPostCommit();
return null;
- }
+ }
});
-
+
public byte getState() {
return state;
}
@@ -87,15 +87,19 @@ public abstract class Transaction {
}
public Synchronization findMatching(Synchronization r) {
- int existing = synchronizations.indexOf(r);
- if (existing != -1) {
- return synchronizations.get(existing);
- }
+ synchronized(synchronizations) {
+ int existing = synchronizations.indexOf(r);
+ if (existing != -1) {
+ return synchronizations.get(existing);
+ }
+ }
return null;
}
public void removeSynchronization(Synchronization r) {
- synchronizations.remove(r);
+ synchronized(synchronizations) {
+ synchronizations.remove(r);
+ }
}
public void prePrepare() throws Exception {
@@ -119,26 +123,32 @@ public abstract class Transaction {
throw xae;
}
}
-
+
protected void fireBeforeCommit() throws Exception {
- for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
- Synchronization s = iter.next();
- s.beforeCommit();
+ synchronized(synchronizations) {
+ for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+ Synchronization s = iter.next();
+ s.beforeCommit();
+ }
}
}
protected void fireAfterCommit() throws Exception {
- for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
- Synchronization s = iter.next();
- s.afterCommit();
+ synchronized(synchronizations) {
+ for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+ Synchronization s = iter.next();
+ s.afterCommit();
+ }
}
}
public void fireAfterRollback() throws Exception {
- Collections.reverse(synchronizations);
- for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
- Synchronization s = iter.next();
- s.afterRollback();
+ synchronized(synchronizations) {
+ Collections.reverse(synchronizations);
+ for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+ Synchronization s = iter.next();
+ s.afterRollback();
+ }
}
}
@@ -156,15 +166,15 @@ public abstract class Transaction {
public abstract TransactionId getTransactionId();
public abstract Logger getLog();
-
+
public boolean isPrepared() {
return getState() == PREPARED_STATE;
}
-
+
public int size() {
return synchronizations.size();
}
-
+
protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException {
try {
postCommitTask.get();
@@ -179,9 +189,9 @@ public abstract class Transaction {
} else {
throw new XAException(e.toString());
}
- }
+ }
}
-
+
protected void doPreCommit() throws XAException {
try {
fireBeforeCommit();