You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/25 00:28:54 UTC
activemq-artemis git commit: ARTEMIS-1304 Message loss on Commmit
timeout during failover
Repository: activemq-artemis
Updated Branches:
refs/heads/1.x 44d3be5a7 -> 2a2f25664
ARTEMIS-1304 Message loss on Commmit timeout during failover
(cherry picked from commit 50a900c04b497c470eeb5c69985944cd6666ffa0)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2a2f2566
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2a2f2566
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2a2f2566
Branch: refs/heads/1.x
Commit: 2a2f25664ca8b1808cc47bf6c42d99585370ea2b
Parents: 44d3be5
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Jul 24 20:11:18 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jul 24 20:27:56 2017 -0400
----------------------------------------------------------------------
.../core/client/impl/ClientSessionImpl.java | 8 +-
.../cluster/failover/FailoverTest.java | 80 ++++++++++++++++++++
2 files changed, 84 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a2f2566/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index fd6355a..d5a867d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -504,7 +504,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
try {
sessionContext.simpleCommit();
} catch (ActiveMQException e) {
- if (e.getType() == ActiveMQExceptionType.UNBLOCKED || rollbackOnly) {
+ if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || rollbackOnly) {
// The call to commit was unlocked on failover, we therefore rollback the tx,
// and throw a transaction rolled back exception instead
//or
@@ -1292,7 +1292,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} catch (XAException xae) {
throw xae;
} catch (ActiveMQException e) {
- if (e.getType() == ActiveMQExceptionType.UNBLOCKED) {
+ if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
// Unblocked on failover
try {
// will retry once after failover & unblock
@@ -1385,7 +1385,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} catch (XAException xae) {
throw xae;
} catch (ActiveMQException e) {
- if (e.getType() == ActiveMQExceptionType.UNBLOCKED) {
+ if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
// Unblocked on failover
throw new XAException(XAException.XA_RETRY);
}
@@ -1418,7 +1418,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
throw xae;
} catch (ActiveMQException e) {
// we can retry this only because we know for sure that no work would have been done
- if (e.getType() == ActiveMQExceptionType.UNBLOCKED) {
+ if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
try {
sessionContext.xaStart(xid, flags);
} catch (XAException xae) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a2f2566/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index 4c3c29e..8b97558 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -388,6 +388,86 @@ public class FailoverTest extends FailoverTestBase {
}
}
+ /**
+ * This test would fail one in three or five times,
+ * where the commit would leave the session dirty after a timeout.
+ */
+ @Test(timeout = 120000)
+ public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception {
+ locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
+
+ ((InVMNodeManager) nodeManager).failoverPause = 6000L;
+
+ ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
+ final ClientSession session = createSession(sf1, false, false, false);
+
+
+ session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, true);
+
+ final CountDownLatch connectionFailed = new CountDownLatch(1);
+
+ session.addFailureListener(new SessionFailureListener() {
+ @Override
+ public void beforeReconnect(ActiveMQException exception) {
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+ connectionFailed.countDown();
+ }
+ });
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ for (int i = 0; i < 500; i++) {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+
+ }
+
+ session.commit();
+
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+ ClientMessage m = null;
+ for (int i = 0; i < 500; i++) {
+ m = consumer.receive(1000);
+ Assert.assertNotNull(m);
+ Assert.assertEquals(i, m.getIntProperty("counter").intValue());
+ }
+
+ m.acknowledge();
+
+ crash(false, session);
+ try {
+ session.commit();
+ fail("Exception expected");
+ } catch (Exception expected) {
+ expected.printStackTrace();
+ }
+
+ Thread.sleep(2000);
+
+ m = null;
+ for (int i = 0; i < 500; i++) {
+ m = consumer.receive(1000);
+ Assert.assertNotNull(m);
+ Assert.assertEquals(i, m.getIntProperty("counter").intValue());
+ }
+
+ m.acknowledge();
+
+ session.commit();
+
+ }
+
// https://issues.jboss.org/browse/HORNETQ-685
@Test
public void testTimeoutOnFailoverTransactionRollback() throws Exception {