You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/06/07 19:00:45 UTC

[activemq-artemis] branch main updated: ARTEMIS-3330 JMS session stopped on failed rollback

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 26b8398  ARTEMIS-3330 JMS session stopped on failed rollback
26b8398 is described below

commit 26b83985cfb53a7ba21a7812b6559961c0216a7a
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Jun 3 12:54:01 2021 -0500

    ARTEMIS-3330 JMS session stopped on failed rollback
---
 .../core/client/impl/ClientSessionImpl.java        |  15 +--
 .../tests/integration/jms/client/RollbackTest.java | 116 +++++++++++++++++++++
 2 files changed, 125 insertions(+), 6 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 53c9c97..d3a66a5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -1019,13 +1019,16 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
          consumer.clear(waitConsumers);
       }
 
-      // Acks must be flushed here *after connection is stopped and all onmessages finished executing
-      flushAcks();
-
-      sessionContext.simpleRollback(isLastMessageAsDelivered);
+      try {
+         // Acks must be flushed here *after connection is stopped and all onmessages finished executing
+         flushAcks();
 
-      if (wasStarted) {
-         start();
+         sessionContext.simpleRollback(isLastMessageAsDelivered);
+      } finally {
+         if (wasStarted) {
+            // restore the original session state even if something fails
+            start();
+         }
       }
 
       rollbackOnly = false;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/RollbackTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/RollbackTest.java
new file mode 100644
index 0000000..fe072c2
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/RollbackTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.jboss.logging.Logger;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RollbackTest extends ActiveMQTestBase {
+
+   protected ActiveMQServer server;
+
+   private static Logger log = Logger.getLogger(RollbackTest.class);
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(false, createDefaultInVMConfig());
+      server.getConfiguration().getIncomingInterceptorClassNames().add(MyInterceptor.class.getName());
+      server.start();
+   }
+
+   @Test
+   public void testFailedRollback() throws Exception {
+      final String TOPIC = "myTopic";
+      final String SUBSCRIPTION = "mySub";
+
+      try (ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0")) {
+         connectionFactory.setCallTimeout(1000); // fail fast
+         connectionFactory.setReconnectAttempts(-1);
+         connectionFactory.setConfirmationWindowSize(1024 * 1024);
+
+         try (Connection consumerConnection = connectionFactory.createConnection()) {
+            consumerConnection.start();
+            final Session session = consumerConnection.createSession(Session.SESSION_TRANSACTED);
+            Topic topic = session.createTopic(TOPIC);
+            final MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic, SUBSCRIPTION);
+            MessageProducer p = session.createProducer(topic);
+            p.send(session.createMessage());
+            p.close();
+            session.commit();
+
+            try {
+               Message m = messageConsumer.receive(2000);
+               assertNotNull(m);
+               // the interceptor will block this first rollback and trigger a failure, the failure will cause the client to re-attach its session
+               session.rollback();
+               fail();
+            } catch (JMSException jmsException) {
+               // expected
+            }
+
+            try {
+               session.rollback();
+            } catch (JMSException e) {
+               fail("Rollback failed again! Giving up. " + e.getMessage());
+            }
+
+            Message m = messageConsumer.receive(2000);
+            assertNotNull(m);
+            try {
+               session.commit();
+            } catch (JMSException e) {
+               fail("Commit failed. " + e.getMessage());
+            }
+         }
+      }
+
+      assertEquals(0L, server.locateQueue(SUBSCRIPTION).getMessageCount());
+   }
+
+   public static class MyInterceptor implements Interceptor {
+      private boolean intercepted = false;
+
+      @Override
+      public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
+         if (!intercepted && packet.getType() == PacketImpl.SESS_ROLLBACK) {
+            intercepted = true;
+            return false;
+         }
+         return true;
+      }
+   }
+}