You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/12/06 21:36:22 UTC

[activemq-artemis] branch main updated: ARTEMIS-4108 Improving flush on large message fix

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 33567fca8d ARTEMIS-4108 Improving flush on large message fix
33567fca8d is described below

commit 33567fca8d6f9a0c8ca918f9393e0ca63c8ccf9a
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Dec 6 16:07:09 2022 -0500

    ARTEMIS-4108 Improving flush on large message fix
---
 .../amqp/proton/ProtonServerSenderContext.java     | 37 ++++++++++++----------
 1 file changed, 21 insertions(+), 16 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index e8654b06e4..2d85abaf84 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -191,28 +191,33 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // If the draining is already running, then don't do anything
          if (draining.compareAndSet(false, true)) {
             final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
-            serverConsumer.forceDelivery(1, new Runnable() {
-               @Override
-               public void run() {
-                  try {
-                     connection.runNow(() -> {
-                        if (pendingLargeMessage != null) {
-                           afterLargeMessage = () -> drained(plugSender);
-                        } else {
-                           drained(plugSender);
-                        }
-                     });
-                  } finally {
-                     draining.set(false);
-                  }
-               }
-            });
+            flushDrain(serverConsumer, plugSender);
          }
       } else {
          serverConsumer.receiveCredits(-1);
       }
    }
 
+   private void flushDrain(ServerConsumerImpl serverConsumer, ProtonServerSenderContext plugSender) {
+      serverConsumer.forceDelivery(1, new Runnable() {
+         @Override
+         public void run() {
+            try {
+               connection.runNow(() -> {
+                  if (pendingLargeMessage != null) {
+                     // retry the flush after the large message is done
+                     afterLargeMessage = () -> flushDrain(serverConsumer, plugSender);
+                  } else {
+                     drained(plugSender);
+                  }
+               });
+            } finally {
+               draining.set(false);
+            }
+         }
+      });
+   }
+
    private void drained(ProtonServerSenderContext sender) {
       sender.reportDrained();
       setupCredit();