You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/12/20 21:07:59 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1554 JMS bridge with transactions cannot be stopped on session failover

ARTEMIS-1554 JMS bridge with transactions cannot be stopped on session failover


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cfb82066
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cfb82066
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cfb82066

Branch: refs/heads/master
Commit: cfb8206650fbee3bb9059da2ef83de84c7add0e5
Parents: 43b7275
Author: xstefank <xs...@gmail.com>
Authored: Wed Dec 13 09:03:30 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Dec 20 16:07:51 2017 -0500

----------------------------------------------------------------------
 .../artemis/jms/bridge/impl/JMSBridgeImpl.java  | 11 +++++
 .../tests/extras/jms/bridge/JMSBridgeTest.java  | 51 ++++++++++++++++++++
 2 files changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cfb82066/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
index 19cac25..b67b0b3 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
@@ -55,6 +55,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
 import org.apache.activemq.artemis.api.core.client.FailoverEventType;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger;
 import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
 import org.apache.activemq.artemis.jms.bridge.DestinationFactory;
@@ -497,6 +498,8 @@ public final class JMSBridgeImpl implements JMSBridge {
                ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx");
             }
 
+            stopSessionFailover();
+
             try {
                tx.rollback();
                abortedMessageCount += messages.size();
@@ -535,6 +538,14 @@ public final class JMSBridgeImpl implements JMSBridge {
       }
    }
 
+   private void stopSessionFailover() {
+      XASession xaSource = (XASession) sourceSession;
+      XASession xaTarget = (XASession) targetSession;
+
+      ((ClientSessionInternal) xaSource.getXAResource()).getSessionContext().releaseCommunications();
+      ((ClientSessionInternal) xaTarget.getXAResource()).getSessionContext().releaseCommunications();
+   }
+
    @Override
    public synchronized boolean isStarted() {
       return started;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cfb82066/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
index df45f68..fed218b 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
@@ -34,22 +34,31 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.artemis.api.jms.JMSFactoryType;
 import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
 import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
 import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
 import org.apache.activemq.artemis.service.extensions.ServiceUtils;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.ra.DummyTransactionManager;
 import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 public class JMSBridgeTest extends BridgeTestBase {
 
    private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
 
+   @Rule
+   public Timeout timeout = new Timeout(120000);
+
    // MaxBatchSize but no MaxBatchTime
 
    @Test
@@ -1289,6 +1298,48 @@ public class JMSBridgeTest extends BridgeTestBase {
       }
    }
 
+   @Test
+   public void testCrashDestStopBridge() throws Exception {
+      cff1xa = new ConnectionFactoryFactory() {
+         @Override
+         public Object createConnectionFactory() throws Exception {
+            ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, params1));
+
+            cf.setReconnectAttempts(-1);
+            cf.setCallFailoverTimeout(-1);
+            cf.setCallTimeout(10000);
+            cf.setBlockOnNonDurableSend(true);
+            cf.setBlockOnDurableSend(true);
+            cf.setCacheLargeMessagesClient(true);
+
+            return cf;
+         }
+
+      };
+
+      JMSBridgeImpl bridge = new JMSBridgeImpl(cff0xa, cff1xa, sourceQueueFactory, targetQueueFactory, null, null, null, null, null, 1000, -1, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, 10, 5000, null, null, false).setBridgeName("test-bridge");
+      addActiveMQComponent(bridge);
+      bridge.setTransactionManager(newTransactionManager());
+
+      bridge.start();
+
+      // Now crash the dest server
+
+      JMSBridgeTest.log.info("About to crash server");
+
+      jmsServer1.stop();
+
+      // Now stop the bridge while the failover is happening
+
+      JMSBridgeTest.log.info("About to stop the bridge");
+
+      bridge.stop();
+
+      // Shutdown the source server
+
+      jmsServer0.stop();
+   }
+
    // Private -------------------------------------------------------------------------------
 
    private void testStress(final QualityOfServiceMode qosMode,