You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/10/09 11:55:48 UTC
activemq git commit: AMQ-7067 - ensure updates to ackMessageFileMap
are protected by the index lock
Repository: activemq
Updated Branches:
refs/heads/master 57c793953 -> a311139bf
AMQ-7067 - ensure updates to ackMessageFileMap are protected by the index lock
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a311139b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a311139b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a311139b
Branch: refs/heads/master
Commit: a311139bfe2f2b3ffc0c84cfb1e9cec0c11830c7
Parents: 57c7939
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 9 12:55:11 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Oct 9 12:55:11 2018 +0100
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 27 ++++++++++++++------
1 file changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a311139b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 46696f4..d231a86 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1401,6 +1401,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void execute(Transaction tx) throws IOException {
for (Operation op : messagingTx) {
op.execute(tx);
+ recordAckMessageReferenceLocation(location, op.getLocation());
}
}
});
@@ -1408,21 +1409,26 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} finally {
indexLock.writeLock().unlock();
}
- for (Operation op: inflightTx) {
- recordAckMessageReferenceLocation(location, op.getLocation());
- }
}
@SuppressWarnings("rawtypes")
protected void process(KahaPrepareCommand command, Location location) {
TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
+ List<Operation> tx = null;
synchronized (inflightTransactions) {
- List<Operation> tx = inflightTransactions.remove(key);
+ tx = inflightTransactions.remove(key);
if (tx != null) {
preparedTransactions.put(key, tx);
- for (Operation op: tx) {
+ }
+ }
+ if (tx != null && !tx.isEmpty()) {
+ indexLock.writeLock().lock();
+ try {
+ for (Operation op : tx) {
recordAckMessageReferenceLocation(location, op.getLocation());
}
+ } finally {
+ indexLock.writeLock().unlock();
}
}
}
@@ -1437,9 +1443,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
updates = preparedTransactions.remove(key);
}
}
- if (key.isXATransaction() && updates != null) {
- for(Operation op : updates) {
- recordAckMessageReferenceLocation(location, op.getLocation());
+ if (key.isXATransaction() && updates != null && !updates.isEmpty()) {
+ indexLock.writeLock().lock();
+ try {
+ for (Operation op : updates) {
+ recordAckMessageReferenceLocation(location, op.getLocation());
+ }
+ } finally {
+ indexLock.writeLock().unlock();
}
}
}