You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/03/25 11:03:01 UTC
[activemq] branch master updated: AMQ-8201 Forward commit commands
for local transactions as well as XA transactions during ACK compaction
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new edd0515 AMQ-8201 Forward commit commands for local transactions as well as XA transactions during ACK compaction
edd0515 is described below
commit edd0515d90d8bdb01e3455bf2c4c132f9a6add29
Author: Jonathan Gallimore <jo...@jrg.me.uk>
AuthorDate: Tue Mar 23 10:38:55 2021 +0000
AMQ-8201 Forward commit commands for local transactions as well as XA transactions during ACK compaction
---
.../activemq/store/kahadb/MessageDatabase.java | 15 ++---
.../java/org/apache/activemq/bugs/AMQ7067Test.java | 76 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 11 deletions(-)
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 6e0688b..886695b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2230,18 +2230,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
private boolean shouldForward(JournalCommand<?> command) {
- boolean result = false;
- if (command != null) {
- if (command instanceof KahaRemoveMessageCommand) {
- result = true;
- } else if (command instanceof KahaCommitCommand) {
- KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command;
- if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) {
- result = true;
- }
- }
+ if (command == null) {
+ return false;
}
- return result;
+
+ return (command instanceof KahaRemoveMessageCommand || command instanceof KahaCommitCommand);
}
private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
index 1b2219e..5da378e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
@@ -526,6 +526,82 @@ public class AMQ7067Test {
}
}
+ @Test
+ public void testForwardAcksAndCommitsWithLocalTransaction() throws Exception {
+ ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setCompactAcksAfterNoGC(2);
+ final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
+ connection.start();
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue holdKahaDb = session.createQueue("holdKahaDb");
+ MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb);
+ TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10));
+ holdKahaDbProducer.send(helloMessage);
+ session.commit();
+ Queue queue = session.createQueue("test");
+
+ for (int i = 0; i < 5; i++) {
+ produce(connection, queue, 60, 512 * 1024);
+ consume(connection, queue, 30, true);
+ }
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 150 == getQueueSize(queue.getQueueName());
+ }
+ });
+
+ // force gc, n data files requires n cycles
+ int limit = ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).getCompactAcksAfterNoGC() + 1;
+ for (int dataFilesToMove = 0; dataFilesToMove < 10; dataFilesToMove++) {
+ for (int i = 0; i < limit; i++) {
+ broker.getPersistenceAdapter().checkpoint(true);
+ }
+ // ack compaction task operates in the background
+ TimeUnit.SECONDS.sleep(2);
+ }
+
+ session.commit();
+
+ connection.close();
+ curruptIndexFile(getDataDirectory());
+
+ broker.stop();
+ broker.waitUntilStopped();
+ createBroker();
+ broker.waitUntilStarted();
+
+ while(true) {
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
+ break;
+ } catch (Exception ex) {
+ System.out.println(ex.getMessage());
+ break;
+ }
+ }
+
+ assertEquals(1, getQueueSize(holdKahaDb.getQueueName()));
+ assertEquals(150, getQueueSize(queue.getQueueName()));
+ }
+
+ private static void consume(Connection connection, Queue queue, int messageCount, boolean transacted) throws JMSException {
+ final Session session = connection.createSession(transacted, transacted ? 0 : Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ int messagesConsumed = 0;
+
+ while (consumer.receive(1000) != null && messagesConsumed < messageCount) {
+ messagesConsumed++;
+ session.commit();
+ }
+
+ System.out.println(messagesConsumed + " messages consumed from " + queue.getQueueName());
+ session.close();
+ }
+
protected static void createDanglingTransaction(XAResource xaRes, XASession xaSession, Queue queue) throws JMSException, IOException, XAException {
MessageProducer producer = xaSession.createProducer(queue);
XATransactionId txId = createXATransaction();