You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/07/29 10:08:37 UTC
[qpid-broker-j] 01/04: QPID-8345: [Broker-J][AMQP 1.0] Dequeue
messages sent non-transactionally as pre-settled
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 9b67c9a1a465d722443c6ad272b6c1b3528f29d7
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Jul 24 11:31:42 2019 +0100
QPID-8345: [Broker-J][AMQP 1.0] Dequeue messages sent non-transactionally as pre-settled
(cherry picked from commit ca0d0d7ec22c31364c1adc32f5dffee0392ff230)
---
.../server/protocol/v1_0/ConsumerTarget_1_0.java | 41 +++++++++++++++++++++-
.../protocol/v1_0/messaging/TransferTest.java | 8 ++++-
2 files changed, 47 insertions(+), 2 deletions(-)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index bc87141..5f9283e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -240,9 +240,14 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
if (_linkEndpoint.isAttached())
{
- if (SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
+ boolean sendPreSettled = SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode());
+ if (sendPreSettled)
{
transfer.setSettled(true);
+ if (_acquires && _transactionId == null)
+ {
+ transfer.setState(new Accepted());
+ }
}
else
{
@@ -302,6 +307,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
}
getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
getEndpoint().transfer(transfer, false);
+
+ if (sendPreSettled && _acquires && _transactionId == null)
+ {
+ handleAcquiredEntrySentPareSettledNonTransactional(entry, consumer);
+ }
}
else
{
@@ -319,6 +329,35 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
}
}
+ private void handleAcquiredEntrySentPareSettledNonTransactional(final MessageInstance entry,
+ final MessageInstanceConsumer consumer)
+ {
+ if (entry.makeAcquisitionUnstealable(consumer))
+ {
+ final ServerTransaction txn = _linkEndpoint.getAsyncAutoCommitTransaction();
+ txn.dequeue(entry.getEnqueueRecord(),
+ new ServerTransaction.Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ @Override
+ public void onRollback()
+ {
+ entry.release(consumer);
+ }
+ });
+ txn.commit();
+ }
+ else
+ {
+ entry.release(consumer);
+ }
+ }
+
@Override
public void flushBatched()
{
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 53a8af6..94a1249 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -737,7 +737,6 @@ public class TransferTest extends BrokerAdminUsingTestBase
.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.FIRST)
.attachSndSettleMode(SenderSettleMode.SETTLED)
.attach().consumeResponse(Attach.class);
Attach attach = interaction.getLatestResponse(Attach.class);
@@ -760,6 +759,13 @@ public class TransferTest extends BrokerAdminUsingTestBase
// verify no unexpected performative received by closing the connection
interaction.doCloseConnection();
}
+
+ if (getBrokerAdmin().isQueueDepthSupported())
+ {
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "test");
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo("test")));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org