You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2019/09/24 16:34:52 UTC

[activemq] branch master updated: AMQ-7185 - rework to leave tx-inflight messages pending in the cursor to avoid duplicates on completion, fix and test

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 644b529  AMQ-7185 - rework to leave tx-inflight messages pending in the cursor to avoid duplicates on completion, fix and test
644b529 is described below

commit 644b529ef64800ee162ddbd7c46551f2508c4c92
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Sep 24 17:32:54 2019 +0100

    AMQ-7185 - rework to leave tx-inflight messages pending in the cursor to avoid duplicates on completion, fix and test
---
 .../broker/region/DurableTopicSubscription.java    |  11 +-
 .../broker/region/PrefetchSubscription.java        |   8 +
 .../apache/activemq/broker/BrokerTestSupport.java  |   5 +
 .../activemq/broker/XARecoveryBrokerTest.java      |  97 +++++++++++-
 .../broker/mLevelDBXARecoveryBrokerTest.java       |   2 +
 .../java/org/apache/activemq/bugs/AMQ7185Test.java | 170 +++++++++++++++++++++
 6 files changed, 288 insertions(+), 5 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 22df07a..6a5c599 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -322,13 +322,15 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
 
     @Override
     protected boolean canDispatch(MessageReference node) {
-        if (!ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId())) {
-            return false; // prepared ack
-        }
         return true;  // let them go, our dispatchPending gates the active / inactive state.
     }
 
     @Override
+    protected boolean trackedInPendingTransaction(MessageReference node) {
+        return !ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId());
+    }
+
+    @Override
     protected void acknowledge(ConnectionContext context, MessageAck ack, final MessageReference node) throws IOException {
         this.setTimeOfLastMessageAck(System.currentTimeMillis());
         Destination regionDestination = (Destination) node.getRegionDestination();
@@ -349,6 +351,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
                 @Override
                 public void afterCommit() throws Exception {
                     synchronized (pendingLock) {
+                        // may be in the cursor post activate/load from the store
+                        pending.remove(node);
                         ackedAndPrepared.remove(node.getMessageId());
                     }
                 }
@@ -357,7 +361,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
                 public void afterRollback() throws Exception {
                     synchronized (pendingLock) {
                         ackedAndPrepared.remove(node.getMessageId());
-                        pending.addMessageFirst(node);
                     }
                     dispatchPending();
                 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index a3b9a4d..f479923 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -641,6 +641,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         if (node == null) {
                             break;
                         }
+                        if (trackedInPendingTransaction(node)) {
+                            node.decrementReferenceCount();
+                            continue;
+                        }
 
                         // Synchronize between dispatched list and remove of message from pending list
                         // related to remove subscription action
@@ -685,6 +689,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
         }
     }
 
+    protected boolean trackedInPendingTransaction(MessageReference node) {
+        return false;
+    }
+
     protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
         pending.setMaxBatchSize(numberToDispatch);
     }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
index d84ff27..44f325b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
@@ -217,6 +217,11 @@ public class BrokerTestSupport extends CombinationTestSupport {
         return info;
     }
 
+    protected TransactionInfo createEndTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
+        TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.END);
+        return info;
+    }
+
     protected TransactionInfo createPrepareTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
         TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.PREPARE);
         return info;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 89745a9..8415b93 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -1372,6 +1372,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         ack.setTransactionId(txid);
         connection.send(ack);
 
+        connection.request(createEndTransaction(connectionInfo, txid));
         connection.request(createPrepareTransaction(connectionInfo, txid));
 
         // reconnect, verify perpared acks unavailable
@@ -1455,6 +1456,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         ack.setTransactionId(txid);
         connection.send(ack);
 
+        connection.request(createEndTransaction(connectionInfo, txid));
         connection.request(createPrepareTransaction(connectionInfo, txid));
 
         // reconnect, verify perpared acks unavailable
@@ -1479,8 +1481,101 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         // commit original tx
         connection.request(createCommitTransaction2Phase(connectionInfo, txid));
 
+        // verify still unavailable
         message = receiveMessage(connection, 2000);
-        assertNull("unexpected non null", message);
+        assertNull("unexpected non null: " + message, message);
+
+        // unsubscribe
+        connection.request(consumerInfo.createRemoveCommand());
+        RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
+        removeSubscriptionInfo.setClientId(connectionInfo.getClientId());
+        removeSubscriptionInfo.setSubscriptionName(consumerInfo.getSubscriptionName());
+        connection.request(removeSubscriptionInfo);
+    }
+
+    public void initCombosForTestNoDupOnRollbackRedelivery() {
+        addCombinationValues("keepDurableSubsActive", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
+    }
+
+    public void testNoDupOnRollbackRedelivery() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // setup durable subs
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        int numMessages = 1;
+        for (int i = 0; i < numMessages; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        Message message = null;
+        for (int i = 0; i < numMessages; i++) {
+            message = receiveMessage(connection);
+            assertNotNull(message);
+        }
+
+        // one ack with last received, mimic a beforeEnd synchronization
+        MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        connection.request(createEndTransaction(connectionInfo, txid));
+        connection.request(createRollbackTransaction(connectionInfo, txid));
+
+        connection.send(consumerInfo.createRemoveCommand());
+        connection.send(sessionInfo.createRemoveCommand());
+        connection.send(connectionInfo.createRemoveCommand());
+
+
+        LOG.info("new connection/consumer for redelivery");
+
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+
+        // setup durable subs
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        message = receiveMessage(connection);
+        assertNotNull(message);
+
+        Message dup = receiveMessage(connection);
+        assertNull("no duplicate send: " + dup, dup);
+
+        txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        connection.request(createEndTransaction(connectionInfo, txid));
+        connection.request(createCommitTransaction1Phase(connectionInfo, txid));
 
         // unsubscribe
         connection.request(consumerInfo.createRemoveCommand());
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
index da17f92..7adb983 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
@@ -72,4 +72,6 @@ public class mLevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
     public void testQueuePersistentPreparedAcksAvailableAfterRollback() throws Exception {
         // pending acks are not tracked in leveldb
     }
+    public void testTopicPersistentPreparedAcksUnavailableTillRollback() throws Exception {
+    }
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7185Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7185Test.java
new file mode 100644
index 0000000..2acf962
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7185Test.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.*;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class AMQ7185Test
+{
+    private final String xaDestinationName = "DestinationXA";
+    private BrokerService broker;
+    private String connectionUri;
+    private long txGenerator = System.currentTimeMillis();
+
+    private XAConnectionFactory xaConnectionFactory;
+    private ConnectionFactory connectionFactory;
+
+    final Topic dest = new ActiveMQTopic(xaDestinationName);
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(false);
+        broker.setAdvisorySupport(false);
+        broker.addConnector("tcp://0.0.0.0:0?trace=true");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        ((ActiveMQConnectionFactory) connectionFactory).setWatchTopicAdvisories(false);
+        // failover ensure audit is in play
+        xaConnectionFactory = new ActiveMQXAConnectionFactory("failover://" + connectionUri);
+        ((ActiveMQXAConnectionFactory) xaConnectionFactory).setWatchTopicAdvisories(false);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test
+    public void testRollbackRedeliveryNoDup() throws Exception {
+
+        XAConnection xaConnection = xaConnectionFactory.createXAConnection();
+        xaConnection.setClientID("cid0");
+        xaConnection.start();
+        XASession session = xaConnection.createXASession();
+        TopicSubscriber consumer = session.createDurableSubscriber(dest, "sub");
+        consumer.close();
+        session.close();
+        xaConnection.close();
+
+        publish(dest);
+
+        Xid tid;
+        TextMessage receivedMessage;
+        xaConnection = xaConnectionFactory.createXAConnection();
+        xaConnection.setClientID("cid0");
+        xaConnection.start();
+        session = xaConnection.createXASession();
+        consumer = session.createDurableSubscriber(dest, "sub");
+
+        tid = createXid();
+        XAResource resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        receivedMessage = (TextMessage) consumer.receive(4000);
+        assertNotNull(receivedMessage);
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.rollback(tid);
+        consumer.close();
+        session.close();
+        xaConnection.close();
+
+
+        // redelivery
+        xaConnection = xaConnectionFactory.createXAConnection();
+        xaConnection.setClientID("cid0");
+        xaConnection.start();
+        session = xaConnection.createXASession();
+        consumer = session.createDurableSubscriber(dest, "sub");
+
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        receivedMessage = (TextMessage) consumer.receive(1000);
+        assertNotNull(receivedMessage);
+
+        // verify only one
+        receivedMessage = (TextMessage) consumer.receiveNoWait();
+        assertNull(receivedMessage);
+
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+
+        consumer.close();
+        session.close();
+        xaConnection.close();
+
+        // assertNoMessageInDLQ
+        assertEquals("Only one enqueue", 1, broker.getAdminView().getTotalEnqueueCount());
+    }
+
+    private void publish(Topic dest) throws JMSException {
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createProducer(dest).send(new ActiveMQTextMessage());
+        connection.close();
+    }
+
+
+    public Xid createXid() throws IOException {
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(++txGenerator);
+        os.close();
+        final byte[] bs = baos.toByteArray();
+
+        return new Xid() {
+            public int getFormatId() {
+                return 86;
+            }
+
+            public byte[] getGlobalTransactionId() {
+                return bs;
+            }
+
+            public byte[] getBranchQualifier() {
+                return bs;
+            }
+        };
+    }
+}