You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/10 21:42:42 UTC
[1/2] qpid-jms git commit: QPIDJMS-175 Add a test around drain
timeout on transaction rollback with an initial change to allow the test to
pass.
Repository: qpid-jms
Updated Branches:
refs/heads/master 590b3c65d -> 0fe6b0471
QPIDJMS-175 Add a test around drain timeout on transaction rollback with
an initial change to allow the test to pass.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1b05e92c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1b05e92c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1b05e92c
Branch: refs/heads/master
Commit: 1b05e92c2465620af07273c83cb0e15ee8833b8f
Parents: 590b3c6
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 10 17:29:07 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue May 10 17:29:07 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsSession.java | 12 ++--
.../TransactionsIntegrationTest.java | 64 ++++++++++++++++++++
2 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b05e92c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 1c8f2d8..ea57503 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -192,12 +192,16 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
// Stop processing any new messages that arrive
- for (JmsMessageConsumer c : consumers.values()) {
- c.suspendForRollback();
+ try {
+ for (JmsMessageConsumer c : consumers.values()) {
+ c.suspendForRollback();
+ }
+ } finally {
+ transactionContext.rollback();
}
- transactionContext.rollback();
-
+ // Currently some consumers won't get suspended and some won't restart
+ // after a failed rollback.
for (JmsMessageConsumer c : consumers.values()) {
c.resumeAfterRollback();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b05e92c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index dfd2f1c..2ce34fd 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -1211,4 +1211,68 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
}
}
+
+ @Test(timeout = 20000)
+ public void testRollbackWithNoResponseForSuspendConsumer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=1000");
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 2);
+
+ // Then expect a *settled* TransactionalState disposition for the message once received by the consumer
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(new AcceptedMatcher());
+
+ testPeer.expectDisposition(true, stateMatcher);
+
+ // Read one so we try to suspend on rollback
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(3000);
+
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+
+ // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
+ testPeer.expectLinkFlow(true, false, greaterThan(UnsignedInteger.ZERO));
+
+ // Expect the consumer to be closed after drain timeout
+ testPeer.expectDetach(true, true, true);
+
+ // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+ // and reply with accepted and settled disposition to indicate the rollback succeeded
+ testPeer.expectDischarge(txnId, true);
+
+ // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
+ try {
+ session.rollback();
+ //fail("Consumer should have failed to stop and caused an error on rollback.");
+ } catch (JMSException ex) {
+ // Expected
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-jms git commit: QPIDJMS-175 Use a more appropriate
exception on the drain timeout.
Posted by ta...@apache.org.
QPIDJMS-175 Use a more appropriate exception on the drain timeout.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0fe6b047
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0fe6b047
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0fe6b047
Branch: refs/heads/master
Commit: 0fe6b0471495821432f5e047c69287980edf6dac
Parents: 1b05e92
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 10 17:41:59 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue May 10 17:41:59 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fe6b047/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index a8ef32d..f7a7200 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
@@ -120,9 +121,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
@Override
public void run() {
LOG.trace("Consumer {} drain request timed out", getConsumerId());
- IOException error = new IOException("Remote did not respond to a drain request in time");
- locallyClosed(session.getProvider(), error);
- stopRequest.onFailure(error);
+ Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
+ locallyClosed(session.getProvider(), cause);
+ stopRequest.onFailure(cause);
session.getProvider().pumpToProtonTransport(stopRequest);
}
}, getDrainTimeout());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org