You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2019/09/24 16:34:52 UTC
[activemq] branch master updated: AMQ-7185 - rework to leave
tx-inflight messages pending in the cursor to avoid duplicates on
completion, fix and test
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 644b529 AMQ-7185 - rework to leave tx-inflight messages pending in the cursor to avoid duplicates on completion, fix and test
644b529 is described below
commit 644b529ef64800ee162ddbd7c46551f2508c4c92
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Sep 24 17:32:54 2019 +0100
AMQ-7185 - rework to leave tx-inflight messages pending in the cursor to avoid duplicates on completion, fix and test
---
.../broker/region/DurableTopicSubscription.java | 11 +-
.../broker/region/PrefetchSubscription.java | 8 +
.../apache/activemq/broker/BrokerTestSupport.java | 5 +
.../activemq/broker/XARecoveryBrokerTest.java | 97 +++++++++++-
.../broker/mLevelDBXARecoveryBrokerTest.java | 2 +
.../java/org/apache/activemq/bugs/AMQ7185Test.java | 170 +++++++++++++++++++++
6 files changed, 288 insertions(+), 5 deletions(-)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 22df07a..6a5c599 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -322,13 +322,15 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
@Override
protected boolean canDispatch(MessageReference node) {
- if (!ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId())) {
- return false; // prepared ack
- }
return true; // let them go, our dispatchPending gates the active / inactive state.
}
@Override
+ protected boolean trackedInPendingTransaction(MessageReference node) {
+ return !ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId());
+ }
+
+ @Override
protected void acknowledge(ConnectionContext context, MessageAck ack, final MessageReference node) throws IOException {
this.setTimeOfLastMessageAck(System.currentTimeMillis());
Destination regionDestination = (Destination) node.getRegionDestination();
@@ -349,6 +351,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
@Override
public void afterCommit() throws Exception {
synchronized (pendingLock) {
+ // may be in the cursor post activate/load from the store
+ pending.remove(node);
ackedAndPrepared.remove(node.getMessageId());
}
}
@@ -357,7 +361,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public void afterRollback() throws Exception {
synchronized (pendingLock) {
ackedAndPrepared.remove(node.getMessageId());
- pending.addMessageFirst(node);
}
dispatchPending();
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index a3b9a4d..f479923 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -641,6 +641,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node == null) {
break;
}
+ if (trackedInPendingTransaction(node)) {
+ node.decrementReferenceCount();
+ continue;
+ }
// Synchronize between dispatched list and remove of message from pending list
// related to remove subscription action
@@ -685,6 +689,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
}
+ protected boolean trackedInPendingTransaction(MessageReference node) {
+ return false;
+ }
+
protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
pending.setMaxBatchSize(numberToDispatch);
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
index d84ff27..44f325b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
@@ -217,6 +217,11 @@ public class BrokerTestSupport extends CombinationTestSupport {
return info;
}
+ protected TransactionInfo createEndTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
+ TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.END);
+ return info;
+ }
+
protected TransactionInfo createPrepareTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.PREPARE);
return info;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 89745a9..8415b93 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -1372,6 +1372,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
ack.setTransactionId(txid);
connection.send(ack);
+ connection.request(createEndTransaction(connectionInfo, txid));
connection.request(createPrepareTransaction(connectionInfo, txid));
// reconnect, verify perpared acks unavailable
@@ -1455,6 +1456,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
ack.setTransactionId(txid);
connection.send(ack);
+ connection.request(createEndTransaction(connectionInfo, txid));
connection.request(createPrepareTransaction(connectionInfo, txid));
// reconnect, verify perpared acks unavailable
@@ -1479,8 +1481,101 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// commit original tx
connection.request(createCommitTransaction2Phase(connectionInfo, txid));
+ // verify still unavailable
message = receiveMessage(connection, 2000);
- assertNull("unexpected non null", message);
+ assertNull("unexpected non null: " + message, message);
+
+ // unsubscribe
+ connection.request(consumerInfo.createRemoveCommand());
+ RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
+ removeSubscriptionInfo.setClientId(connectionInfo.getClientId());
+ removeSubscriptionInfo.setSubscriptionName(consumerInfo.getSubscriptionName());
+ connection.request(removeSubscriptionInfo);
+ }
+
+ public void initCombosForTestNoDupOnRollbackRedelivery() {
+ addCombinationValues("keepDurableSubsActive", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
+ }
+
+ public void testNoDupOnRollbackRedelivery() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ // setup durable subs
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
+ int numMessages = 1;
+ for (int i = 0; i < numMessages; i++) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ connection.send(message);
+ }
+
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ Message message = null;
+ for (int i = 0; i < numMessages; i++) {
+ message = receiveMessage(connection);
+ assertNotNull(message);
+ }
+
+ // one ack with last received, mimic a beforeEnd synchronization
+ MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
+
+ connection.request(createEndTransaction(connectionInfo, txid));
+ connection.request(createRollbackTransaction(connectionInfo, txid));
+
+ connection.send(consumerInfo.createRemoveCommand());
+ connection.send(sessionInfo.createRemoveCommand());
+ connection.send(connectionInfo.createRemoveCommand());
+
+
+ LOG.info("new connection/consumer for redelivery");
+
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+
+ // setup durable subs
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("durable");
+ connection.send(consumerInfo);
+
+ message = receiveMessage(connection);
+ assertNotNull(message);
+
+ Message dup = receiveMessage(connection);
+ assertNull("no duplicate send: " + dup, dup);
+
+ txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
+
+ connection.request(createEndTransaction(connectionInfo, txid));
+ connection.request(createCommitTransaction1Phase(connectionInfo, txid));
// unsubscribe
connection.request(consumerInfo.createRemoveCommand());
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
index da17f92..7adb983 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
@@ -72,4 +72,6 @@ public class mLevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
public void testQueuePersistentPreparedAcksAvailableAfterRollback() throws Exception {
// pending acks are not tracked in leveldb
}
+ public void testTopicPersistentPreparedAcksUnavailableTillRollback() throws Exception {
+ }
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7185Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7185Test.java
new file mode 100644
index 0000000..2acf962
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7185Test.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.*;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class AMQ7185Test
+{
+ private final String xaDestinationName = "DestinationXA";
+ private BrokerService broker;
+ private String connectionUri;
+ private long txGenerator = System.currentTimeMillis();
+
+ private XAConnectionFactory xaConnectionFactory;
+ private ConnectionFactory connectionFactory;
+
+ final Topic dest = new ActiveMQTopic(xaDestinationName);
+
+ @Before
+ public void startBroker() throws Exception {
+ broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setPersistent(false);
+ broker.setAdvisorySupport(false);
+ broker.addConnector("tcp://0.0.0.0:0?trace=true");
+ broker.start();
+ broker.waitUntilStarted();
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+ connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+ ((ActiveMQConnectionFactory) connectionFactory).setWatchTopicAdvisories(false);
+ // failover ensure audit is in play
+ xaConnectionFactory = new ActiveMQXAConnectionFactory("failover://" + connectionUri);
+ ((ActiveMQXAConnectionFactory) xaConnectionFactory).setWatchTopicAdvisories(false);
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ @Test
+ public void testRollbackRedeliveryNoDup() throws Exception {
+
+ XAConnection xaConnection = xaConnectionFactory.createXAConnection();
+ xaConnection.setClientID("cid0");
+ xaConnection.start();
+ XASession session = xaConnection.createXASession();
+ TopicSubscriber consumer = session.createDurableSubscriber(dest, "sub");
+ consumer.close();
+ session.close();
+ xaConnection.close();
+
+ publish(dest);
+
+ Xid tid;
+ TextMessage receivedMessage;
+ xaConnection = xaConnectionFactory.createXAConnection();
+ xaConnection.setClientID("cid0");
+ xaConnection.start();
+ session = xaConnection.createXASession();
+ consumer = session.createDurableSubscriber(dest, "sub");
+
+ tid = createXid();
+ XAResource resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+
+ receivedMessage = (TextMessage) consumer.receive(4000);
+ assertNotNull(receivedMessage);
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.rollback(tid);
+ consumer.close();
+ session.close();
+ xaConnection.close();
+
+
+ // redelivery
+ xaConnection = xaConnectionFactory.createXAConnection();
+ xaConnection.setClientID("cid0");
+ xaConnection.start();
+ session = xaConnection.createXASession();
+ consumer = session.createDurableSubscriber(dest, "sub");
+
+ tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ receivedMessage = (TextMessage) consumer.receive(1000);
+ assertNotNull(receivedMessage);
+
+ // verify only one
+ receivedMessage = (TextMessage) consumer.receiveNoWait();
+ assertNull(receivedMessage);
+
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+
+ consumer.close();
+ session.close();
+ xaConnection.close();
+
+ // assertNoMessageInDLQ
+ assertEquals("Only one enqueue", 1, broker.getAdminView().getTotalEnqueueCount());
+ }
+
+ private void publish(Topic dest) throws JMSException {
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createProducer(dest).send(new ActiveMQTextMessage());
+ connection.close();
+ }
+
+
+ public Xid createXid() throws IOException {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream os = new DataOutputStream(baos);
+ os.writeLong(++txGenerator);
+ os.close();
+ final byte[] bs = baos.toByteArray();
+
+ return new Xid() {
+ public int getFormatId() {
+ return 86;
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return bs;
+ }
+
+ public byte[] getBranchQualifier() {
+ return bs;
+ }
+ };
+ }
+}