You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/03/25 11:03:01 UTC

[activemq] branch master updated: AMQ-8201 Forward commit commands for local transactions as well as XA transactions during ACK compaction

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new edd0515  AMQ-8201 Forward commit commands for local transactions as well as XA transactions during ACK compaction
edd0515 is described below

commit edd0515d90d8bdb01e3455bf2c4c132f9a6add29
Author: Jonathan Gallimore <jo...@jrg.me.uk>
AuthorDate: Tue Mar 23 10:38:55 2021 +0000

    AMQ-8201 Forward commit commands for local transactions as well as XA transactions during ACK compaction
---
 .../activemq/store/kahadb/MessageDatabase.java     | 15 ++---
 .../java/org/apache/activemq/bugs/AMQ7067Test.java | 76 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 11 deletions(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 6e0688b..886695b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2230,18 +2230,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     }
 
     private boolean shouldForward(JournalCommand<?> command) {
-        boolean result = false;
-        if (command != null) {
-            if (command instanceof KahaRemoveMessageCommand) {
-                result = true;
-            } else if (command instanceof KahaCommitCommand) {
-                KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command;
-                if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) {
-                    result = true;
-                }
-            }
+        if (command == null) {
+            return false;
         }
-        return result;
+
+        return (command instanceof KahaRemoveMessageCommand || command instanceof KahaCommitCommand);
     }
 
     private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
index 1b2219e..5da378e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
@@ -526,6 +526,82 @@ public class AMQ7067Test {
         }
     }
 
+    @Test
+    public void testForwardAcksAndCommitsWithLocalTransaction() throws Exception {
+        ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setCompactAcksAfterNoGC(2);
+        final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue holdKahaDb = session.createQueue("holdKahaDb");
+        MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb);
+        TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10));
+        holdKahaDbProducer.send(helloMessage);
+        session.commit();
+        Queue queue = session.createQueue("test");
+
+        for (int i = 0; i < 5; i++) {
+            produce(connection, queue, 60, 512 * 1024);
+            consume(connection, queue, 30, true);
+        }
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 150 == getQueueSize(queue.getQueueName());
+            }
+        });
+
+        // force gc, n data files requires n cycles
+        int limit = ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).getCompactAcksAfterNoGC() + 1;
+        for (int dataFilesToMove = 0; dataFilesToMove < 10; dataFilesToMove++) {
+            for (int i = 0; i < limit; i++) {
+                broker.getPersistenceAdapter().checkpoint(true);
+            }
+            // ack compaction task operates in the background
+            TimeUnit.SECONDS.sleep(2);
+        }
+
+        session.commit();
+
+        connection.close();
+        curruptIndexFile(getDataDirectory());
+
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+        broker.waitUntilStarted();
+
+        while(true) {
+            try {
+                TimeUnit.SECONDS.sleep(1);
+                System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
+                break;
+            } catch (Exception ex) {
+                System.out.println(ex.getMessage());
+                break;
+            }
+        }
+
+        assertEquals(1, getQueueSize(holdKahaDb.getQueueName()));
+        assertEquals(150, getQueueSize(queue.getQueueName()));
+    }
+
+    private static void consume(Connection connection, Queue queue, int messageCount, boolean transacted) throws JMSException {
+        final Session session = connection.createSession(transacted, transacted ? 0 : Session.AUTO_ACKNOWLEDGE);
+        final MessageConsumer consumer = session.createConsumer(queue);
+
+        int messagesConsumed = 0;
+
+        while (consumer.receive(1000) != null && messagesConsumed < messageCount) {
+            messagesConsumed++;
+            session.commit();
+        }
+
+        System.out.println(messagesConsumed + " messages consumed from " + queue.getQueueName());
+        session.close();
+    }
+
     protected static void createDanglingTransaction(XAResource xaRes, XASession xaSession, Queue queue) throws JMSException, IOException, XAException {
         MessageProducer producer = xaSession.createProducer(queue);
         XATransactionId txId = createXATransaction();