You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/04/16 14:22:38 UTC

[activemq-artemis] branch main updated: ARTEMIS-3240 - ensure pending transactions are rolled back on connection failure. Fix and test

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f3d877  ARTEMIS-3240 - ensure pending transactions are rolled back on connection failure. Fix and test
0f3d877 is described below

commit 0f3d87799a6be4faea768537594fc7fd7fb3c3df
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Apr 13 14:35:40 2021 +0100

    ARTEMIS-3240 - ensure pending transactions are rolled back on connection failure. Fix and test
---
 .../core/protocol/openwire/OpenWireConnection.java |  4 ++
 .../integration/openwire/amq/JMSConsumer2Test.java | 60 ++++++++++++++++++++++
 2 files changed, 64 insertions(+)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index b66f477..8af0731 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -682,6 +682,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    @Override
    public void fail(ActiveMQException me, String message) {
 
+      for (Transaction tx : txMap.values()) {
+         tx.rollbackIfPossible();
+      }
+
       if (me != null) {
          //filter it like the other protocols
          if (!(me instanceof ActiveMQRemoteDisconnectException)) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java
index 31ff706..60bc068 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java
@@ -21,7 +21,9 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
+import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.Socket;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,10 +33,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.junit.Test;
 
 /**
@@ -211,4 +218,57 @@ public class JMSConsumer2Test extends BasicOpenWireTest {
       redispatchSession.close();
    }
 
+   @Test
+   public void testRedeliveryOnServerConnectionFailWithPendingAckInLocalTx() throws Exception {
+      // Send a message to the broker.
+      connection.start();
+      sendMessages(connection, new ActiveMQQueue(queueName), 1);
+      connection.close();
+
+      factory.setWatchTopicAdvisories(false);
+      factory.setNonBlockingRedelivery(true);
+      connection = (ActiveMQConnection) factory.createConnection();
+      connection.start();
+
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      final CountDownLatch gotMessage = new CountDownLatch(1);
+      consumer.setMessageListener(message -> gotMessage.countDown());
+
+      assertTrue(gotMessage.await(1, TimeUnit.SECONDS));
+
+      // want to ensure the ack has had a chance to get back to the broker
+      final Queue queueInstance = server.locateQueue(new SimpleString(queueName));
+      Wait.waitFor(new Wait.Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return queueInstance.getAcknowledgeAttempts() > 0;
+         }
+      });
+
+      // whack the connection so there is no transaction outcome
+      try {
+         connection.getTransport().narrow(Socket.class).close();
+      } catch (IOException e) {
+         e.printStackTrace();
+      }
+      try {
+         connection.close();
+      } catch (Exception expected) {
+      }
+
+      // expect rollback and redelivery on new consumer
+      connection = (ActiveMQConnection) factory.createConnection();
+      connection.start();
+
+      session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      consumer = session.createConsumer(destination);
+
+      assertNotNull(consumer.receive(2000));
+      session.commit();
+
+      connection.close();
+   }
 }