You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/25 00:28:54 UTC

activemq-artemis git commit: ARTEMIS-1304 Message loss on Commmit timeout during failover

Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 44d3be5a7 -> 2a2f25664


ARTEMIS-1304 Message loss on Commmit timeout during failover

(cherry picked from commit 50a900c04b497c470eeb5c69985944cd6666ffa0)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2a2f2566
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2a2f2566
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2a2f2566

Branch: refs/heads/1.x
Commit: 2a2f25664ca8b1808cc47bf6c42d99585370ea2b
Parents: 44d3be5
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Jul 24 20:11:18 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jul 24 20:27:56 2017 -0400

----------------------------------------------------------------------
 .../core/client/impl/ClientSessionImpl.java     |  8 +-
 .../cluster/failover/FailoverTest.java          | 80 ++++++++++++++++++++
 2 files changed, 84 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a2f2566/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index fd6355a..d5a867d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -504,7 +504,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       try {
          sessionContext.simpleCommit();
       } catch (ActiveMQException e) {
-         if (e.getType() == ActiveMQExceptionType.UNBLOCKED || rollbackOnly) {
+         if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || rollbackOnly) {
             // The call to commit was unlocked on failover, we therefore rollback the tx,
             // and throw a transaction rolled back exception instead
             //or
@@ -1292,7 +1292,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       } catch (XAException xae) {
          throw xae;
       } catch (ActiveMQException e) {
-         if (e.getType() == ActiveMQExceptionType.UNBLOCKED) {
+         if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
             // Unblocked on failover
             try {
                // will retry once after failover & unblock
@@ -1385,7 +1385,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       } catch (XAException xae) {
          throw xae;
       } catch (ActiveMQException e) {
-         if (e.getType() == ActiveMQExceptionType.UNBLOCKED) {
+         if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
             // Unblocked on failover
             throw new XAException(XAException.XA_RETRY);
          }
@@ -1418,7 +1418,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
          throw xae;
       } catch (ActiveMQException e) {
          // we can retry this only because we know for sure that no work would have been done
-         if (e.getType() == ActiveMQExceptionType.UNBLOCKED) {
+         if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
             try {
                sessionContext.xaStart(xid, flags);
             } catch (XAException xae) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a2f2566/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index 4c3c29e..8b97558 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -388,6 +388,86 @@ public class FailoverTest extends FailoverTestBase {
       }
    }
 
+   /**
+    * This test would fail one in three or five times,
+    * where the commit would leave the session dirty after a timeout.
+    */
+   @Test(timeout = 120000)
+   public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception {
+      locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
+
+      ((InVMNodeManager) nodeManager).failoverPause = 6000L;
+
+      ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
+      final ClientSession session = createSession(sf1, false, false, false);
+
+
+      session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, true);
+
+      final CountDownLatch connectionFailed = new CountDownLatch(1);
+
+      session.addFailureListener(new SessionFailureListener() {
+         @Override
+         public void beforeReconnect(ActiveMQException exception) {
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+            connectionFailed.countDown();
+         }
+      });
+
+      final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      for (int i = 0; i < 500; i++) {
+         ClientMessage message = session.createMessage(true);
+         message.putIntProperty("counter", i);
+
+         producer.send(message);
+
+      }
+
+      session.commit();
+
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      session.start();
+      ClientMessage m = null;
+      for (int i = 0; i < 500; i++) {
+         m = consumer.receive(1000);
+         Assert.assertNotNull(m);
+         Assert.assertEquals(i, m.getIntProperty("counter").intValue());
+      }
+
+      m.acknowledge();
+
+      crash(false, session);
+      try {
+         session.commit();
+         fail("Exception expected");
+      } catch (Exception expected) {
+         expected.printStackTrace();
+      }
+
+      Thread.sleep(2000);
+
+      m = null;
+      for (int i = 0; i < 500; i++) {
+         m = consumer.receive(1000);
+         Assert.assertNotNull(m);
+         Assert.assertEquals(i, m.getIntProperty("counter").intValue());
+      }
+
+      m.acknowledge();
+
+      session.commit();
+
+   }
+
    // https://issues.jboss.org/browse/HORNETQ-685
    @Test
    public void testTimeoutOnFailoverTransactionRollback() throws Exception {