You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/11/24 18:37:00 UTC

[4/4] qpid-jms git commit: update consumers on transacted sessions to settle deliveries when transactionally accepting them

update consumers on transacted sessions to settle deliveries when transactionally accepting them


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a6d03075
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a6d03075
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a6d03075

Branch: refs/heads/master
Commit: a6d03075362ab58ea125c5d454af6a3dc31015aa
Parents: 1ec620a
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Nov 24 17:09:37 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Nov 24 17:09:37 2014 +0000

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  2 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 28 +++++++++++---------
 .../jms/integration/SessionIntegrationTest.java | 27 +++++--------------
 3 files changed, 23 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a6d03075/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 2d9e0ec..632742d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -246,7 +246,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
     JmsInboundMessageDispatch ack(final JmsInboundMessageDispatch envelope) throws JMSException {
         if (envelope != null && envelope.getMessage() != null) {
             JmsMessage message = envelope.getMessage();
-            if (message.getAcknowledgeCallback() != null || session.isTransacted()) {
+            if (message.getAcknowledgeCallback() != null) {
                 // Message has been received by the app.. expand the credit
                 // window so that we receive more messages.
                 doAckDelivered(envelope);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a6d03075/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index e33fddf..c41f5fc 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -219,16 +219,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
         if (ackType.equals(ACK_TYPE.DELIVERED)) {
             LOG.debug("Delivered Ack of message: {}", envelope);
-            if (session.isTransacted()) {
-                Binary txnId = session.getTransactionContext().getAmqpTransactionId();
-                if (txnId != null) {
-                    TransactionalState txState = new TransactionalState();
-                    txState.setOutcome(Accepted.getInstance());
-                    txState.setTxnId(txnId);
-                    delivery.disposition(txState);
-                    session.getTransactionContext().registerTxConsumer(this);
-                }
-            }
             if (!isPresettle()) {
                 delivered.put(envelope, delivery);
             }
@@ -241,10 +231,24 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             }
             LOG.debug("Consumed Ack of message: {}", envelope);
             if (!delivery.isSettled()) {
-                delivery.disposition(Accepted.getInstance());
-                delivery.settle();
+                if (session.isTransacted()) {
+                    Binary txnId = session.getTransactionContext().getAmqpTransactionId();
+                    if (txnId != null) {
+                        TransactionalState txState = new TransactionalState();
+                        txState.setOutcome(Accepted.getInstance());
+                        txState.setTxnId(txnId);
+                        delivery.disposition(txState);
+                        delivery.settle();
+                        session.getTransactionContext().registerTxConsumer(this);
+                    }
+                } else {
+                    delivery.disposition(Accepted.getInstance());
+                    delivery.settle();
+                }
             }
         } else if (ackType.equals(ACK_TYPE.REDELIVERED)) {
+            //TODO: Trace usage of this.
+            //TODO: Don't bother doing anything if it is [remotely]Settled already.
             Modified disposition = new Modified();
             disposition.setUndeliverableHere(false);
             disposition.setDeliveryFailed(true);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a6d03075/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 39bfaf7..a0b0d71 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -434,11 +434,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true);
 
             for (int i = 1; i <= consumeCount; i++) {
-                // Then expect an *unsettled* TransactionalState disposition for each message once received by the consumer
+                // Then expect an *settled* TransactionalState disposition for each message once received by the consumer
                 TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
                 stateMatcher.withTxnId(equalTo(txnId));
                 stateMatcher.withOutcome(new AcceptedMatcher());
-                testPeer.expectDisposition(false, stateMatcher);
+
+                testPeer.expectDisposition(true, stateMatcher);
             }
 
             MessageConsumer messageConsumer = session.createConsumer(queue);
@@ -459,14 +460,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
             testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true);
 
-            for (int i = 1; i <= consumeCount; i++) {
-                // Then expect a *settled* TransactionalState disposition for each message received by the consumer
-                TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
-                stateMatcher.withTxnId(equalTo(txnId));
-                stateMatcher.withOutcome(new AcceptedMatcher());
-                testPeer.expectDisposition(true, stateMatcher);
-            }
-
             session.commit();
 
             testPeer.waitForAllHandlersToComplete(1000);
@@ -506,11 +499,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true);
 
             for (int i = 1; i <= consumeCount; i++) {
-                // Then expect a TransactionalState disposition for each message once received by the consumer
+                // Then expect a *settled* TransactionalState disposition for each message once received by the consumer
                 TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
                 stateMatcher.withTxnId(equalTo(txnId));
                 stateMatcher.withOutcome(new AcceptedMatcher());
-                testPeer.expectDisposition(false, stateMatcher);
+
+                testPeer.expectDisposition(true, stateMatcher);
             }
 
             MessageConsumer messageConsumer = session.createConsumer(queue);
@@ -531,15 +525,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
             testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true);
 
-            for (int i = 1; i <= consumeCount; i++) {
-                // Then expect a 'Modified' disposition for each message received by the consumer
-                // indicating that delivery failed but it can still be consumed.
-                ModifiedMatcher modifiedMatcher = new ModifiedMatcher()
-                                                      .withDeliveryFailed(equalTo(true))
-                                                      .withUndeliverableHere(equalTo(false));
-                testPeer.expectDisposition(true, modifiedMatcher);
-            }
-
             session.rollback();
 
             //TODO: what about messages not received by the consumer?


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org