You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Ulf Lilleengen (Jira)" <ji...@apache.org> on 2019/09/17 09:42:00 UTC
[jira] [Commented] (ARTEMIS-2494) Artemis should allow redelivery
on address full for AMQP
[ https://issues.apache.org/jira/browse/ARTEMIS-2494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931242#comment-16931242 ]
Ulf Lilleengen commented on ARTEMIS-2494:
-----------------------------------------
Patch without the configuration wired in:
{code:java}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 14463730a..b4d2a1eed 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -40,12 +41,11 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.*;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
@@ -304,20 +304,22 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data, routingContext);
} catch (Exception e) {
log.warn(e.getMessage(), e);
- Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
+ // Set condition
if (e instanceof ActiveMQSecurityException) {
condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
} else {
condition.setCondition(Symbol.valueOf("failed"));
}
+ condition.setDescription(e.getMessage());
+
+ // Determine delivery state
+ DeliveryState deliveryState = determineDeliveryState(e, condition);
connection.runLater(() -> {
- condition.setDescription(e.getMessage());
- rejected.setError(condition);
- delivery.disposition(rejected);
+ delivery.disposition(deliveryState);
delivery.settle();
flow();
connection.flush();
@@ -326,6 +328,19 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
}
+ private DeliveryState determineDeliveryState(Exception e, ErrorCondition condition) {
+ if (e instanceof ActiveMQAddressFullException) {
+ Modified modified = new Modified();
+ modified.setDeliveryFailed(true);
+ modified.setUndeliverableHere(false);
+ return modified;
+ } else {
+ Rejected rejected = new Rejected();
+ rejected.setError(condition);
+ return rejected;
+ }
+ }
+
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
protonSession.removeReceiver(receiver);
{code}
> Artemis should allow redelivery on address full for AMQP
> --------------------------------------------------------
>
> Key: ARTEMIS-2494
> URL: https://issues.apache.org/jira/browse/ARTEMIS-2494
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: AMQP
> Affects Versions: 2.10.0
> Reporter: Ulf Lilleengen
> Priority: Major
>
> When a queue is full, Artemis will respond with disposition=Rejected. In certain situations, it is desirable to receive a Modified in this case, so that a client may re-queue the message for re-delivery (this is what Artemis does if consumers respond with released or modified for instance).
>
> The current behavior is determined by ProtonServerReceiverContext#actualDelivery . It would be great if this method could respond with Modified instead in the event of the exception being an ActiveMQAddressFullException.
>
> This behavior should probably be exposed as a configuration option to avoid breaking existing behavior.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)