You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Ulf Lilleengen (Jira)" <ji...@apache.org> on 2019/09/17 09:42:00 UTC

[jira] [Commented] (ARTEMIS-2494) Artemis should allow redelivery on address full for AMQP

    [ https://issues.apache.org/jira/browse/ARTEMIS-2494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931242#comment-16931242 ] 

Ulf Lilleengen commented on ARTEMIS-2494:
-----------------------------------------

Patch without the configuration wired in:

 
{code:java}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 14463730a..b4d2a1eed 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -40,12 +41,11 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.*;
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
@@ -304,20 +304,22 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data, routingContext);
       } catch (Exception e) {
          log.warn(e.getMessage(), e);
-         Rejected rejected = new Rejected();
          ErrorCondition condition = new ErrorCondition();
 
+         // Set condition
          if (e instanceof ActiveMQSecurityException) {
             condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
          } else {
             condition.setCondition(Symbol.valueOf("failed"));
          }
+         condition.setDescription(e.getMessage());
+
+         // Determine delivery state
+         DeliveryState deliveryState = determineDeliveryState(e, condition);
          connection.runLater(() -> {
 
-            condition.setDescription(e.getMessage());
-            rejected.setError(condition);
 
-            delivery.disposition(rejected);
+            delivery.disposition(deliveryState);
             delivery.settle();
             flow();
             connection.flush();
@@ -326,6 +328,19 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       }
    }
 
+   private DeliveryState determineDeliveryState(Exception e, ErrorCondition condition) {
+      if (e instanceof ActiveMQAddressFullException) {
+         Modified modified = new Modified();
+         modified.setDeliveryFailed(true);
+         modified.setUndeliverableHere(false);
+         return modified;
+      } else {
+         Rejected rejected = new Rejected();
+         rejected.setError(condition);
+         return rejected;
+      }
+   }
+
    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
       protonSession.removeReceiver(receiver);
 {code}

> Artemis should allow redelivery on address full for AMQP
> --------------------------------------------------------
>
>                 Key: ARTEMIS-2494
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2494
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: AMQP
>    Affects Versions: 2.10.0
>            Reporter: Ulf Lilleengen
>            Priority: Major
>
> When a queue is full, Artemis will respond with disposition=Rejected. In certain situations, it is desirable to receive a Modified in this case, so that a client may re-queue the message for re-delivery (this is what Artemis does if consumers respond with released or modified for instance).
>  
> The current behavior is determined by ProtonServerReceiverContext#actualDelivery . It would be great if this method could respond with Modified instead in the event of the exception being an ActiveMQAddressFullException.
>  
> This behavior should probably be exposed as a configuration option to avoid breaking existing behavior.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)