You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/11/24 18:37:00 UTC
[4/4] qpid-jms git commit: update consumers on transacted sessions to
settle deliveries when transactionally accepting them
update consumers on transacted sessions to settle deliveries when transactionally accepting them
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a6d03075
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a6d03075
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a6d03075
Branch: refs/heads/master
Commit: a6d03075362ab58ea125c5d454af6a3dc31015aa
Parents: 1ec620a
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Nov 24 17:09:37 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Nov 24 17:09:37 2014 +0000
----------------------------------------------------------------------
.../org/apache/qpid/jms/JmsMessageConsumer.java | 2 +-
.../qpid/jms/provider/amqp/AmqpConsumer.java | 28 +++++++++++---------
.../jms/integration/SessionIntegrationTest.java | 27 +++++--------------
3 files changed, 23 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a6d03075/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 2d9e0ec..632742d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -246,7 +246,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
JmsInboundMessageDispatch ack(final JmsInboundMessageDispatch envelope) throws JMSException {
if (envelope != null && envelope.getMessage() != null) {
JmsMessage message = envelope.getMessage();
- if (message.getAcknowledgeCallback() != null || session.isTransacted()) {
+ if (message.getAcknowledgeCallback() != null) {
// Message has been received by the app.. expand the credit
// window so that we receive more messages.
doAckDelivered(envelope);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a6d03075/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index e33fddf..c41f5fc 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -219,16 +219,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
if (ackType.equals(ACK_TYPE.DELIVERED)) {
LOG.debug("Delivered Ack of message: {}", envelope);
- if (session.isTransacted()) {
- Binary txnId = session.getTransactionContext().getAmqpTransactionId();
- if (txnId != null) {
- TransactionalState txState = new TransactionalState();
- txState.setOutcome(Accepted.getInstance());
- txState.setTxnId(txnId);
- delivery.disposition(txState);
- session.getTransactionContext().registerTxConsumer(this);
- }
- }
if (!isPresettle()) {
delivered.put(envelope, delivery);
}
@@ -241,10 +231,24 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
LOG.debug("Consumed Ack of message: {}", envelope);
if (!delivery.isSettled()) {
- delivery.disposition(Accepted.getInstance());
- delivery.settle();
+ if (session.isTransacted()) {
+ Binary txnId = session.getTransactionContext().getAmqpTransactionId();
+ if (txnId != null) {
+ TransactionalState txState = new TransactionalState();
+ txState.setOutcome(Accepted.getInstance());
+ txState.setTxnId(txnId);
+ delivery.disposition(txState);
+ delivery.settle();
+ session.getTransactionContext().registerTxConsumer(this);
+ }
+ } else {
+ delivery.disposition(Accepted.getInstance());
+ delivery.settle();
+ }
}
} else if (ackType.equals(ACK_TYPE.REDELIVERED)) {
+ //TODO: Trace usage of this.
+ //TODO: Don't bother doing anything if it is [remotely]Settled already.
Modified disposition = new Modified();
disposition.setUndeliverableHere(false);
disposition.setDeliveryFailed(true);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a6d03075/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 39bfaf7..a0b0d71 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -434,11 +434,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true);
for (int i = 1; i <= consumeCount; i++) {
- // Then expect an *unsettled* TransactionalState disposition for each message once received by the consumer
+ // Then expect an *settled* TransactionalState disposition for each message once received by the consumer
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(new AcceptedMatcher());
- testPeer.expectDisposition(false, stateMatcher);
+
+ testPeer.expectDisposition(true, stateMatcher);
}
MessageConsumer messageConsumer = session.createConsumer(queue);
@@ -459,14 +460,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true);
- for (int i = 1; i <= consumeCount; i++) {
- // Then expect a *settled* TransactionalState disposition for each message received by the consumer
- TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
- stateMatcher.withTxnId(equalTo(txnId));
- stateMatcher.withOutcome(new AcceptedMatcher());
- testPeer.expectDisposition(true, stateMatcher);
- }
-
session.commit();
testPeer.waitForAllHandlersToComplete(1000);
@@ -506,11 +499,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true);
for (int i = 1; i <= consumeCount; i++) {
- // Then expect a TransactionalState disposition for each message once received by the consumer
+ // Then expect a *settled* TransactionalState disposition for each message once received by the consumer
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(new AcceptedMatcher());
- testPeer.expectDisposition(false, stateMatcher);
+
+ testPeer.expectDisposition(true, stateMatcher);
}
MessageConsumer messageConsumer = session.createConsumer(queue);
@@ -531,15 +525,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true);
- for (int i = 1; i <= consumeCount; i++) {
- // Then expect a 'Modified' disposition for each message received by the consumer
- // indicating that delivery failed but it can still be consumed.
- ModifiedMatcher modifiedMatcher = new ModifiedMatcher()
- .withDeliveryFailed(equalTo(true))
- .withUndeliverableHere(equalTo(false));
- testPeer.expectDisposition(true, modifiedMatcher);
- }
-
session.rollback();
//TODO: what about messages not received by the consumer?
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org