You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/10 21:42:42 UTC

[1/2] qpid-jms git commit: QPIDJMS-175 Add a test around drain timeout on transaction rollback with an initial change to allow the test to pass.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 590b3c65d -> 0fe6b0471


QPIDJMS-175 Add a test around drain timeout on transaction rollback with
an initial change to allow the test to pass.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1b05e92c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1b05e92c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1b05e92c

Branch: refs/heads/master
Commit: 1b05e92c2465620af07273c83cb0e15ee8833b8f
Parents: 590b3c6
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 10 17:29:07 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue May 10 17:29:07 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    | 12 ++--
 .../TransactionsIntegrationTest.java            | 64 ++++++++++++++++++++
 2 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b05e92c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 1c8f2d8..ea57503 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -192,12 +192,16 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
 
         // Stop processing any new messages that arrive
-        for (JmsMessageConsumer c : consumers.values()) {
-            c.suspendForRollback();
+        try {
+            for (JmsMessageConsumer c : consumers.values()) {
+                c.suspendForRollback();
+            }
+        } finally {
+            transactionContext.rollback();
         }
 
-        transactionContext.rollback();
-
+        // Currently some consumers won't get suspended and some won't restart
+        // after a failed rollback.
         for (JmsMessageConsumer c : consumers.values()) {
             c.resumeAfterRollback();
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b05e92c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index dfd2f1c..2ce34fd 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -1211,4 +1211,68 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void testRollbackWithNoResponseForSuspendConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=1000");
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 2);
+
+            // Then expect a *settled* TransactionalState disposition for the message once received by the consumer
+            TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withOutcome(new AcceptedMatcher());
+
+            testPeer.expectDisposition(true, stateMatcher);
+
+            // Read one so we try to suspend on rollback
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(3000);
+
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+
+            // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
+            testPeer.expectLinkFlow(true, false, greaterThan(UnsignedInteger.ZERO));
+
+            // Expect the consumer to be closed after drain timeout
+            testPeer.expectDetach(true, true, true);
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+            // and reply with accepted and settled disposition to indicate the rollback succeeded
+            testPeer.expectDischarge(txnId, true);
+
+            // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
+            try {
+                session.rollback();
+                //fail("Consumer should have failed to stop and caused an error on rollback.");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: QPIDJMS-175 Use a more appropriate exception on the drain timeout.

Posted by ta...@apache.org.
QPIDJMS-175 Use a more appropriate exception on the drain timeout.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0fe6b047
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0fe6b047
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0fe6b047

Branch: refs/heads/master
Commit: 0fe6b0471495821432f5e047c69287980edf6dac
Parents: 1b05e92
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 10 17:41:59 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue May 10 17:41:59 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fe6b047/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index a8ef32d..f7a7200 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.jms.JMSException;
 
 import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.meta.JmsConsumerId;
@@ -120,9 +121,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                     @Override
                     public void run() {
                         LOG.trace("Consumer {} drain request timed out", getConsumerId());
-                        IOException error = new IOException("Remote did not respond to a drain request in time");
-                        locallyClosed(session.getProvider(), error);
-                        stopRequest.onFailure(error);
+                        Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
+                        locallyClosed(session.getProvider(), cause);
+                        stopRequest.onFailure(cause);
                         session.getProvider().pumpToProtonTransport(stopRequest);
                     }
                 }, getDrainTimeout());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org