You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/08/03 17:22:40 UTC

qpid-jms git commit: QPIDJMS-307: avoid NPE during declare rejection, ensure the AMQP session is closed before JMS createSession throws, correct exception message.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 53d96e8a5 -> cc00816c4


QPIDJMS-307: avoid NPE during declare rejection, ensure the AMQP session is closed before JMS createSession throws, correct exception message.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/cc00816c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/cc00816c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/cc00816c

Branch: refs/heads/master
Commit: cc00816c4d4cfca2bcc4f3ac71b86a0e1d53a36d
Parents: 53d96e8
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Aug 3 18:18:15 2017 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Aug 3 18:18:15 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    | 15 ++++-
 .../amqp/AmqpTransactionCoordinator.java        |  4 +-
 .../integration/ConnectionIntegrationTest.java  |  2 +
 .../TransactionsIntegrationTest.java            | 63 ++++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  9 +++
 5 files changed, 89 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/cc00816c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 700d36f..a821c18 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -142,8 +142,19 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
         connection.createResource(sessionInfo);
 
-        // We always keep an open TX so start now.
-        getTransactionContext().begin();
+        // We always keep an open TX if transacted so start now.
+        try {
+            getTransactionContext().begin();
+        } catch (Exception e) {
+            // failed, close the AMQP session before we throw
+            try {
+                connection.destroyResource(sessionInfo);
+            } catch (Exception ex) {
+                // Ignore, throw original error
+            }
+
+            throw e;
+        }
 
         // Start the completion executor now as it's needed throughout the
         // lifetime of the Session.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/cc00816c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
index 567e43d..caf6921 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
@@ -90,7 +90,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
                     Exception cause = AmqpSupport.convertToException(
                         getParent().getProvider(), getEndpoint(), rejected.getError());
                     JMSException failureCause = null;
-                    if (txId.getProviderContext().equals(COMMIT_MARKER)) {
+                    if (COMMIT_MARKER.equals(txId.getProviderContext())){
                         failureCause = new TransactionRolledBackException(cause.getMessage());
                     } else {
                         failureCause = new JMSException(cause.getMessage());
@@ -133,7 +133,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
         Declare declare = new Declare();
         message.setBody(new AmqpValue(declare));
 
-        ScheduledFuture<?> timeout = scheduleTimeoutIfNeeded("Timed out waiting for discharge of TX.", request);
+        ScheduledFuture<?> timeout = scheduleTimeoutIfNeeded("Timed out waiting for declare of TX.", request);
         OperationContext context = new OperationContext(txId, request, timeout);
 
         Delivery delivery = getEndpoint().delivery(tagGenerator.getNextTag());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/cc00816c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index d8c713f..3e12fd9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -258,6 +258,8 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
             txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
             testPeer.expectSenderAttach(notNullValue(), txCoordinatorMatcher, true, true, false, 0, null, null);
             testPeer.expectDetach(true, false, false);
+            // Expect the AMQP session to be closed due to the JMS session creation failure.
+            testPeer.expectEnd();
 
             try {
                 connection.createSession(true, Session.SESSION_TRANSACTED);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/cc00816c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 47af07f..d026b2c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -52,6 +52,7 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
@@ -1251,6 +1252,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.expectBegin();
             testPeer.expectCoordinatorAttach();
             testPeer.expectDeclareButDoNotRespond();
+            // Expect the AMQP session to be closed due to the JMS session creation failure.
+            testPeer.expectEnd();
 
             try {
                 connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -1269,6 +1272,66 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout=20000)
+    public void testSessionCreateFailsOnDeclareRejection() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a Rejected disposition state to indicate failure.
+            testPeer.expectDeclareAndReject();
+            // Expect the AMQP session to be closed due to the JMS session creation failure.
+            testPeer.expectEnd();
+
+            try {
+                connection.createSession(true, Session.SESSION_TRANSACTED);
+                fail("should have thrown");
+            } catch (JMSException jmse) {
+                // Expected
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testSessionCreateFailsOnCoordinatorLinkRefusal() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            // Expect coordinator link, refuse it, expect detach reply
+            String errorMessage = "CoordinatorLinkRefusal-breadcrumb";
+            testPeer.expectCoordinatorAttach(true, false, AmqpError.NOT_IMPLEMENTED, errorMessage);
+            testPeer.expectDetach(true, false, false);
+
+            // Expect the AMQP session to be closed due to the JMS session creation failure.
+            testPeer.expectEnd();
+
+            try {
+                connection.createSession(true, Session.SESSION_TRANSACTED);
+                fail("should have thrown");
+            } catch (JMSException jmse) {
+                assertNotNull(jmse.getMessage());
+                assertTrue("Expected exception message to contain breadcrumb", jmse.getMessage().contains(errorMessage));
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=20000)
     public void testTransactionRolledBackOnSessionCloseTimesOut() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/cc00816c/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index ffbafa7..2e2c9af 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -78,6 +78,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.EndFrame;
 import org.apache.qpid.jms.test.testpeer.describedtypes.FlowFrame;
 import org.apache.qpid.jms.test.testpeer.describedtypes.FrameDescriptorMapping;
 import org.apache.qpid.jms.test.testpeer.describedtypes.OpenFrame;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
 import org.apache.qpid.jms.test.testpeer.describedtypes.SaslChallengeFrame;
 import org.apache.qpid.jms.test.testpeer.describedtypes.SaslMechanismsFrame;
@@ -2082,6 +2083,14 @@ public class TestAmqpPeer implements AutoCloseable
         expectTransfer(declareMatcher, nullValue(), false, false, null, false);
     }
 
+    public void expectDeclareAndReject()
+    {
+        TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+        declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+
+        expectTransfer(declareMatcher, nullValue(), false, new Rejected(), true);
+    }
+
     public void expectDischarge(Binary txnId, boolean dischargeState) {
         expectDischarge(txnId, dischargeState, new Accepted());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org