You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/09/19 14:08:52 UTC
[qpid-jms] branch master updated: QPIDJMS-474: better handle
connection failure mid-creation on transacted session
This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push:
new 4a9aab8 QPIDJMS-474: better handle connection failure mid-creation on transacted session
4a9aab8 is described below
commit 4a9aab83bca11f7346215a827266e5039df1393d
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Thu Sep 19 15:03:59 2019 +0100
QPIDJMS-474: better handle connection failure mid-creation on transacted session
---
.../java/org/apache/qpid/jms/JmsConnection.java | 6 +++-
.../qpid/jms/JmsLocalTransactionContext.java | 8 ++++-
.../jms/integration/ConnectionIntegrationTest.java | 37 ++++++++++++++++++++++
.../provider/failover/FailoverIntegrationTest.java | 20 +++++++++---
4 files changed, 65 insertions(+), 6 deletions(-)
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 5afcb3f..6df1d6e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -1358,7 +1358,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
// Signal that connection dropped we need to mark transactions as
// failed, deliver failure events to asynchronous send completions etc.
for (JmsSession session : sessions.values()) {
- session.onConnectionInterrupted();
+ try {
+ session.onConnectionInterrupted();
+ } catch (Throwable t) {
+ LOG.warn("Exception while marking session interrupted", t);
+ }
}
onProviderException(ex);
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index 1452478..0b0b370 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -251,6 +251,10 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
private void doRollback(boolean startNewTx) throws JMSException {
lock.writeLock().lock();
try {
+ if(transactionInfo == null) {
+ return;
+ }
+
LOG.debug("Rollback: {}", transactionInfo.getId());
JmsTransactionId oldTransactionId = transactionInfo.getId();
final JmsTransactionInfo nextTx;
@@ -331,7 +335,9 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
public void onConnectionInterrupted() {
lock.writeLock().tryLock();
try {
- transactionInfo.setInDoubt(true);
+ if(transactionInfo != null) {
+ transactionInfo.setInDoubt(true);
+ }
} finally {
if (lock.writeLock().isHeldByCurrentThread()) {
lock.writeLock().unlock();
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 9a272e9..4d1a9b2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -75,6 +75,8 @@ import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.util.MetaDataSupport;
+import org.apache.qpid.jms.util.QpidJMSTestRunner;
+import org.apache.qpid.jms.util.Repeat;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
@@ -83,7 +85,9 @@ import org.apache.qpid.proton.engine.impl.AmqpHeader;
import org.hamcrest.Matcher;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.runner.RunWith;
+@RunWith(QpidJMSTestRunner.class)
public class ConnectionIntegrationTest extends QpidJmsTestCase {
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
@@ -329,6 +333,39 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
}
}
+ @Repeat(repetitions = 1)
+ @Test(timeout = 20000)
+ public void testRemotelyDropConnectionDuringSessionCreationTransacted() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ testPeer.expectSaslAnonymous();
+ testPeer.expectOpen();
+ testPeer.expectBegin();
+
+ ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=foo");
+ Connection connection = factory.createConnection();
+
+ CountDownLatch exceptionListenerFired = new CountDownLatch(1);
+ connection.setExceptionListener(ex -> exceptionListenerFired.countDown());
+
+ // Expect the begin, then drop connection without without a close frame before the tx-coordinator setup.
+ testPeer.expectBegin();
+ testPeer.dropAfterLastHandler();
+
+ try {
+ connection.createSession(true, Session.SESSION_TRANSACTED);
+ fail("Expected exception to be thrown");
+ } catch (JMSException jmse) {
+ // Expected
+ }
+
+ assertTrue("Exception listener did not fire", exceptionListenerFired.await(5, TimeUnit.SECONDS));
+
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ connection.close();
+ }
+ }
+
@Test(timeout = 20000)
public void testConnectionPropertiesContainExpectedMetaData() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 39a9a94..5e572b3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -1642,6 +1642,15 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
@Test(timeout=20000)
public void testTxRecreatedAfterConnectionFailsOver() throws Exception {
+ doTxRecreatedAfterConnectionFailsOver(true);
+ }
+
+ @Test(timeout=20000)
+ public void testTxRecreatedAfterConnectionFailsOver2() throws Exception {
+ doTxRecreatedAfterConnectionFailsOver(false);
+ }
+
+ private void doTxRecreatedAfterConnectionFailsOver(boolean dropAfterCoordinator) throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
@@ -1682,12 +1691,15 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
originalPeer.expectBegin();
- originalPeer.expectCoordinatorAttach();
- // First expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- originalPeer.expectDeclare(txnId);
+ if(dropAfterCoordinator) {
+ originalPeer.expectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ originalPeer.expectDeclare(txnId);
+ }
originalPeer.dropAfterLastHandler();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org