You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/09/19 14:08:52 UTC

[qpid-jms] branch master updated: QPIDJMS-474: better handle connection failure mid-creation on transacted session

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a9aab8  QPIDJMS-474: better handle connection failure mid-creation on transacted session
4a9aab8 is described below

commit 4a9aab83bca11f7346215a827266e5039df1393d
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Thu Sep 19 15:03:59 2019 +0100

    QPIDJMS-474: better handle connection failure mid-creation on transacted session
---
 .../java/org/apache/qpid/jms/JmsConnection.java    |  6 +++-
 .../qpid/jms/JmsLocalTransactionContext.java       |  8 ++++-
 .../jms/integration/ConnectionIntegrationTest.java | 37 ++++++++++++++++++++++
 .../provider/failover/FailoverIntegrationTest.java | 20 +++++++++---
 4 files changed, 65 insertions(+), 6 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 5afcb3f..6df1d6e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -1358,7 +1358,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         // Signal that connection dropped we need to mark transactions as
         // failed, deliver failure events to asynchronous send completions etc.
         for (JmsSession session : sessions.values()) {
-            session.onConnectionInterrupted();
+            try {
+                session.onConnectionInterrupted();
+            } catch (Throwable t) {
+                LOG.warn("Exception while marking session interrupted", t);
+            }
         }
 
         onProviderException(ex);
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index 1452478..0b0b370 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -251,6 +251,10 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     private void doRollback(boolean startNewTx) throws JMSException {
         lock.writeLock().lock();
         try {
+            if(transactionInfo == null) {
+                return;
+            }
+
             LOG.debug("Rollback: {}", transactionInfo.getId());
             JmsTransactionId oldTransactionId = transactionInfo.getId();
             final JmsTransactionInfo nextTx;
@@ -331,7 +335,9 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     public void onConnectionInterrupted() {
         lock.writeLock().tryLock();
         try {
-            transactionInfo.setInDoubt(true);
+            if(transactionInfo != null) {
+                transactionInfo.setInDoubt(true);
+            }
         } finally {
             if (lock.writeLock().isHeldByCurrentThread()) {
                 lock.writeLock().unlock();
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 9a272e9..4d1a9b2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -75,6 +75,8 @@ import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
 import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.apache.qpid.jms.util.MetaDataSupport;
+import org.apache.qpid.jms.util.QpidJMSTestRunner;
+import org.apache.qpid.jms.util.Repeat;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
@@ -83,7 +85,9 @@ import org.apache.qpid.proton.engine.impl.AmqpHeader;
 import org.hamcrest.Matcher;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
+@RunWith(QpidJMSTestRunner.class)
 public class ConnectionIntegrationTest extends QpidJmsTestCase {
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
@@ -329,6 +333,39 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testRemotelyDropConnectionDuringSessionCreationTransacted() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+
+            ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=foo");
+            Connection connection = factory.createConnection();
+
+            CountDownLatch exceptionListenerFired = new CountDownLatch(1);
+            connection.setExceptionListener(ex -> exceptionListenerFired.countDown());
+
+            // Expect the begin, then drop connection without without a close frame before the tx-coordinator setup.
+            testPeer.expectBegin();
+            testPeer.dropAfterLastHandler();
+
+            try {
+                connection.createSession(true, Session.SESSION_TRANSACTED);
+                fail("Expected exception to be thrown");
+            } catch (JMSException jmse) {
+                // Expected
+            }
+
+            assertTrue("Exception listener did not fire", exceptionListenerFired.await(5, TimeUnit.SECONDS));
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            connection.close();
+        }
+    }
+
     @Test(timeout = 20000)
     public void testConnectionPropertiesContainExpectedMetaData() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 39a9a94..5e572b3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -1642,6 +1642,15 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
     @Test(timeout=20000)
     public void testTxRecreatedAfterConnectionFailsOver() throws Exception {
+        doTxRecreatedAfterConnectionFailsOver(true);
+    }
+
+    @Test(timeout=20000)
+    public void testTxRecreatedAfterConnectionFailsOver2() throws Exception {
+        doTxRecreatedAfterConnectionFailsOver(false);
+    }
+
+    private void doTxRecreatedAfterConnectionFailsOver(boolean dropAfterCoordinator) throws Exception {
         try (TestAmqpPeer originalPeer = new TestAmqpPeer();
              TestAmqpPeer finalPeer = new TestAmqpPeer();) {
 
@@ -1682,12 +1691,15 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
 
             originalPeer.expectBegin();
-            originalPeer.expectCoordinatorAttach();
 
-            // First expect an unsettled 'declare' transfer to the txn coordinator, and
-            // reply with a Declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            originalPeer.expectDeclare(txnId);
+            if(dropAfterCoordinator) {
+                originalPeer.expectCoordinatorAttach();
+
+                // First expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a Declared disposition state containing the txnId.
+                originalPeer.expectDeclare(txnId);
+            }
 
             originalPeer.dropAfterLastHandler();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org