You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/08/03 17:22:40 UTC
qpid-jms git commit: QPIDJMS-307: avoid NPE during declare rejection,
ensure the AMQP session is closed before JMS createSession throws,
correct exception message.
Repository: qpid-jms
Updated Branches:
refs/heads/master 53d96e8a5 -> cc00816c4
QPIDJMS-307: avoid NPE during declare rejection, ensure the AMQP session is closed before JMS createSession throws, correct exception message.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/cc00816c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/cc00816c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/cc00816c
Branch: refs/heads/master
Commit: cc00816c4d4cfca2bcc4f3ac71b86a0e1d53a36d
Parents: 53d96e8
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Aug 3 18:18:15 2017 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Aug 3 18:18:15 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsSession.java | 15 ++++-
.../amqp/AmqpTransactionCoordinator.java | 4 +-
.../integration/ConnectionIntegrationTest.java | 2 +
.../TransactionsIntegrationTest.java | 63 ++++++++++++++++++++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 9 +++
5 files changed, 89 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/cc00816c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 700d36f..a821c18 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -142,8 +142,19 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
connection.createResource(sessionInfo);
- // We always keep an open TX so start now.
- getTransactionContext().begin();
+ // We always keep an open TX if transacted so start now.
+ try {
+ getTransactionContext().begin();
+ } catch (Exception e) {
+ // failed, close the AMQP session before we throw
+ try {
+ connection.destroyResource(sessionInfo);
+ } catch (Exception ex) {
+ // Ignore, throw original error
+ }
+
+ throw e;
+ }
// Start the completion executor now as it's needed throughout the
// lifetime of the Session.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/cc00816c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
index 567e43d..caf6921 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
@@ -90,7 +90,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
Exception cause = AmqpSupport.convertToException(
getParent().getProvider(), getEndpoint(), rejected.getError());
JMSException failureCause = null;
- if (txId.getProviderContext().equals(COMMIT_MARKER)) {
+ if (COMMIT_MARKER.equals(txId.getProviderContext())){
failureCause = new TransactionRolledBackException(cause.getMessage());
} else {
failureCause = new JMSException(cause.getMessage());
@@ -133,7 +133,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
Declare declare = new Declare();
message.setBody(new AmqpValue(declare));
- ScheduledFuture<?> timeout = scheduleTimeoutIfNeeded("Timed out waiting for discharge of TX.", request);
+ ScheduledFuture<?> timeout = scheduleTimeoutIfNeeded("Timed out waiting for declare of TX.", request);
OperationContext context = new OperationContext(txId, request, timeout);
Delivery delivery = getEndpoint().delivery(tagGenerator.getNextTag());
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/cc00816c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index d8c713f..3e12fd9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -258,6 +258,8 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
testPeer.expectSenderAttach(notNullValue(), txCoordinatorMatcher, true, true, false, 0, null, null);
testPeer.expectDetach(true, false, false);
+ // Expect the AMQP session to be closed due to the JMS session creation failure.
+ testPeer.expectEnd();
try {
connection.createSession(true, Session.SESSION_TRANSACTED);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/cc00816c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 47af07f..d026b2c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -52,6 +52,7 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
@@ -1251,6 +1252,8 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
testPeer.expectDeclareButDoNotRespond();
+ // Expect the AMQP session to be closed due to the JMS session creation failure.
+ testPeer.expectEnd();
try {
connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -1269,6 +1272,66 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout=20000)
+ public void testSessionCreateFailsOnDeclareRejection() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Rejected disposition state to indicate failure.
+ testPeer.expectDeclareAndReject();
+ // Expect the AMQP session to be closed due to the JMS session creation failure.
+ testPeer.expectEnd();
+
+ try {
+ connection.createSession(true, Session.SESSION_TRANSACTED);
+ fail("should have thrown");
+ } catch (JMSException jmse) {
+ // Expected
+ }
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testSessionCreateFailsOnCoordinatorLinkRefusal() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ // Expect coordinator link, refuse it, expect detach reply
+ String errorMessage = "CoordinatorLinkRefusal-breadcrumb";
+ testPeer.expectCoordinatorAttach(true, false, AmqpError.NOT_IMPLEMENTED, errorMessage);
+ testPeer.expectDetach(true, false, false);
+
+ // Expect the AMQP session to be closed due to the JMS session creation failure.
+ testPeer.expectEnd();
+
+ try {
+ connection.createSession(true, Session.SESSION_TRANSACTED);
+ fail("should have thrown");
+ } catch (JMSException jmse) {
+ assertNotNull(jmse.getMessage());
+ assertTrue("Expected exception message to contain breadcrumb", jmse.getMessage().contains(errorMessage));
+ }
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
public void testTransactionRolledBackOnSessionCloseTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/cc00816c/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index ffbafa7..2e2c9af 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -78,6 +78,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.EndFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.FlowFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.FrameDescriptorMapping;
import org.apache.qpid.jms.test.testpeer.describedtypes.OpenFrame;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
import org.apache.qpid.jms.test.testpeer.describedtypes.SaslChallengeFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.SaslMechanismsFrame;
@@ -2082,6 +2083,14 @@ public class TestAmqpPeer implements AutoCloseable
expectTransfer(declareMatcher, nullValue(), false, false, null, false);
}
+ public void expectDeclareAndReject()
+ {
+ TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+ declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+
+ expectTransfer(declareMatcher, nullValue(), false, new Rejected(), true);
+ }
+
public void expectDischarge(Binary txnId, boolean dischargeState) {
expectDischarge(txnId, dischargeState, new Accepted());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org