You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/12/06 21:36:22 UTC
[activemq-artemis] branch main updated: ARTEMIS-4108 Improving flush on large message fix
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 33567fca8d ARTEMIS-4108 Improving flush on large message fix
33567fca8d is described below
commit 33567fca8d6f9a0c8ca918f9393e0ca63c8ccf9a
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Dec 6 16:07:09 2022 -0500
ARTEMIS-4108 Improving flush on large message fix
---
.../amqp/proton/ProtonServerSenderContext.java | 37 ++++++++++++----------
1 file changed, 21 insertions(+), 16 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index e8654b06e4..2d85abaf84 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -191,28 +191,33 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// If the draining is already running, then don't do anything
if (draining.compareAndSet(false, true)) {
final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
- serverConsumer.forceDelivery(1, new Runnable() {
- @Override
- public void run() {
- try {
- connection.runNow(() -> {
- if (pendingLargeMessage != null) {
- afterLargeMessage = () -> drained(plugSender);
- } else {
- drained(plugSender);
- }
- });
- } finally {
- draining.set(false);
- }
- }
- });
+ flushDrain(serverConsumer, plugSender);
}
} else {
serverConsumer.receiveCredits(-1);
}
}
+ private void flushDrain(ServerConsumerImpl serverConsumer, ProtonServerSenderContext plugSender) {
+ serverConsumer.forceDelivery(1, new Runnable() {
+ @Override
+ public void run() {
+ try {
+ connection.runNow(() -> {
+ if (pendingLargeMessage != null) {
+ // retry the flush after the large message is done
+ afterLargeMessage = () -> flushDrain(serverConsumer, plugSender);
+ } else {
+ drained(plugSender);
+ }
+ });
+ } finally {
+ draining.set(false);
+ }
+ }
+ });
+ }
+
private void drained(ProtonServerSenderContext sender) {
sender.reportDrained();
setupCredit();