You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/01/29 14:27:12 UTC

[04/11] qpid-broker-j git commit: QPID-8062: [Broker-J][AMQP 1.0] Fix handling of routing errors when target destination cannot route received message

QPID-8062: [Broker-J][AMQP 1.0] Fix handling of routing errors when target destination cannot route received message

* close the link on receipt of unroutable message when
    the source of the link does not support the rejected outcome or
    non-transactional message has already been settled by the sender
* mark the publishing transaction as "rollback only" on receipt of
    unroutable pre-settled message

(cherry picked from commit 06b4213286305f6d05ce496ca278b73b566f24be)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b4e746a4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b4e746a4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b4e746a4

Branch: refs/heads/7.0.x
Commit: b4e746a4b76d7baac75c4bc04fdd93198f9fad20
Parents: ae36d90
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Jan 17 16:50:43 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Jan 29 13:57:14 2018 +0000

----------------------------------------------------------------------
 .../v1_0/AnonymousRelayDestination.java         |  52 ++-
 .../protocol/v1_0/NodeReceivingDestination.java |  44 +-
 .../protocol/v1_0/ReceivingDestination.java     |  11 +-
 .../v1_0/StandardReceivingLinkEndpoint.java     |  70 ++-
 .../tests/protocol/v1_0/MessageEncoder.java     |  12 +
 .../anonymousrelay/AnonymousRelayTest.java      | 440 +++++++++++++++++++
 .../protocol/v1_0/messaging/TransferTest.java   |   8 +-
 7 files changed, 571 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index de7991c..806469b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;/*
 import static org.apache.qpid.server.protocol.v1_0.Session_1_0.DELAYED_DELIVERY;
 
 import java.util.Arrays;
+import java.util.Collections;
 
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -29,6 +30,8 @@ import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.DestinationAddress;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -37,6 +40,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
 public class AnonymousRelayDestination implements ReceivingDestination
@@ -70,7 +74,12 @@ public class AnonymousRelayDestination implements ReceivingDestination
     }
 
     @Override
-    public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
+    public Outcome send(final ServerMessage<?> message,
+                        final ServerTransaction txn,
+                        final SecurityToken securityToken,
+                        final boolean rejectedOutcomeSupportedBySource,
+                        final boolean deliverySettled,
+                        final Binary deliveryTag) throws AmqpErrorException
     {
         final ReceivingDestination destination;
         final String routingAddress = message.getTo();
@@ -99,12 +108,41 @@ public class AnonymousRelayDestination implements ReceivingDestination
             }
             else
             {
-                outcome = createdRejectedOutcome(AmqpError.NOT_FOUND, "Unknown destination '" + routingAddress + "'");
+                final Error notFoundError = new Error(AmqpError.NOT_FOUND,
+                                                      String.format("Unknown destination '%s'", routingAddress));
+                notFoundError.setInfo(Collections.singletonMap(DELIVERY_TAG, deliveryTag));
+
+                // If the source of the link does not support the rejected outcome,
+                // or the message has already been settled by the sender,
+                // then the routing node MUST detach the link with an error.
+                // AMQP-140: When pre-settled messages are being sent within a transaction,
+                // then the behaviour defined for transactions should take precedence
+                // (essentially marking the transaction as rollback only).
+                if (!rejectedOutcomeSupportedBySource || (deliverySettled && !(txn instanceof LocalTransaction)))
+                {
+                    throw new AmqpErrorException(notFoundError);
+                }
+                else
+                {
+                    if (deliverySettled && txn instanceof LocalTransaction)
+                    {
+                        ((LocalTransaction) txn).setRollbackOnly();
+                    }
+
+                    Rejected rejected = new Rejected();
+                    rejected.setError(notFoundError);
+                    outcome = rejected;
+                }
             }
         }
         else
         {
-            outcome = destination.send(message, txn, securityToken);
+            outcome = destination.send(message,
+                                       txn,
+                                       securityToken,
+                                       rejectedOutcomeSupportedBySource,
+                                       deliverySettled,
+                                       deliveryTag);
         }
         return outcome;
     }
@@ -127,12 +165,4 @@ public class AnonymousRelayDestination implements ReceivingDestination
     {
         return null;
     }
-
-    private Outcome createdRejectedOutcome(AmqpError errorCode, String errorMessage)
-    {
-        Rejected rejected = new Rejected();
-        final Error notFoundError = new Error(errorCode, errorMessage);
-        rejected.setError(notFoundError);
-        return rejected;
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index 57ad962..6d5859f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -34,6 +34,8 @@ import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.DestinationAddress;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -43,12 +45,13 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
 public class NodeReceivingDestination implements ReceivingDestination
 {
     private static final Accepted ACCEPTED = new Accepted();
-    public static final Rejected REJECTED = new Rejected();
+    private static final Rejected REJECTED = new Rejected();
     private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
     private final boolean _discardUnroutable;
     private final EventLogger _eventLogger;
@@ -93,7 +96,12 @@ public class NodeReceivingDestination implements ReceivingDestination
     }
 
     @Override
-    public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
+    public Outcome send(final ServerMessage<?> message,
+                        final ServerTransaction txn,
+                        final SecurityToken securityToken,
+                        final boolean rejectedOutcomeSupportedBySource,
+                        final boolean deliverySettled,
+                        final Binary deliveryTag) throws AmqpErrorException
     {
         final String routingAddress = "".equals(_routingAddress) ? getRoutingAddress(message) : _routingAddress;
         _destination.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress));
@@ -128,27 +136,45 @@ public class NodeReceivingDestination implements ReceivingDestination
         {
             if (!_discardUnroutable)
             {
+                final Error error;
                 if (result.isRejected())
                 {
-                    AmqpError error;
+                    AmqpError errorCode;
                     if (result.containsReject(RejectType.LIMIT_EXCEEDED))
                     {
-                        error = AmqpError.RESOURCE_LIMIT_EXCEEDED;
+                        errorCode = AmqpError.RESOURCE_LIMIT_EXCEEDED;
                     }
                     else if (result.containsReject(RejectType.PRECONDITION_FAILED))
                     {
-                        error = AmqpError.PRECONDITION_FAILED;
+                        errorCode = AmqpError.PRECONDITION_FAILED;
                     }
                     else
                     {
-                        error = AmqpError.ILLEGAL_STATE;
+                        errorCode = AmqpError.ILLEGAL_STATE;
                     }
-                    return createdRejectedOutcome(error, result.getRejectReason());
+                    error = new Error(errorCode, result.getRejectReason());
                 }
                 else
                 {
-                    return createdRejectedOutcome(AmqpError.NOT_FOUND,
-                                                  String.format("Unknown destination '%s'", routingAddress));
+                    error = new Error(AmqpError.NOT_FOUND,
+                                      String.format("Unknown destination '%s'", routingAddress));
+                }
+                error.setInfo(Collections.singletonMap(DELIVERY_TAG, deliveryTag));
+
+                if (!rejectedOutcomeSupportedBySource || (deliverySettled && !(txn instanceof LocalTransaction)))
+                {
+                    throw new AmqpErrorException(error);
+                }
+                else
+                {
+                    if (deliverySettled && txn instanceof LocalTransaction)
+                    {
+                        ((LocalTransaction) txn).setRollbackOnly();
+                    }
+
+                    Rejected rejected = new Rejected();
+                    rejected.setError(error);
+                    return rejected;
                 }
             }
             else

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
index a6a910f..5abd825 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.security.SecurityToken;
@@ -29,7 +31,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
 
 public interface ReceivingDestination
 {
-
+    Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag");
     Symbol REJECT_UNROUTABLE = Symbol.valueOf("REJECT_UNROUTABLE");
     Symbol DISCARD_UNROUTABLE = Symbol.valueOf("DISCARD_UNROUTABLE");
 
@@ -37,7 +39,12 @@ public interface ReceivingDestination
 
     Outcome[] getOutcomes();
 
-    Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken);
+    Outcome send(final ServerMessage<?> message,
+                 final ServerTransaction txn,
+                 final SecurityToken securityToken,
+                 final boolean rejectedOutcomeSupportedBySource,
+                 final boolean deliverySettled,
+                 final Binary deliveryTag) throws AmqpErrorException;
 
     int getCredit();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index bc9dd57..a565f7f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -54,7 +54,6 @@ import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
@@ -80,7 +79,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
     private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
     private static final String LINK = "link";
 
-    private ReceivingDestination _receivingDestination;
+    private volatile ReceivingDestination _receivingDestination;
+    private volatile boolean _rejectedOutcomeSupportedBySource;
 
     private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<>();
 
@@ -247,13 +247,11 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                            .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
 
                     Outcome outcome;
-                    Source source = getSource();
                     if (serverMessage.isPersistent() && !getAddressSpace().getMessageStore().isPersistent())
                     {
                         final Error preconditionFailedError = new Error(AmqpError.PRECONDITION_FAILED,
                                                                         "Non-durable message store cannot accept durable message.");
-                        if (source.getOutcomes() != null && Arrays.asList(source.getOutcomes())
-                                                                  .contains(Rejected.REJECTED_SYMBOL))
+                        if (_rejectedOutcomeSupportedBySource)
                         {
                             final Rejected rejected = new Rejected();
                             rejected.setError(preconditionFailedError);
@@ -267,52 +265,44 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                     }
                     else
                     {
-                        outcome = getReceivingDestination().send(serverMessage, transaction,
-                                                                 session.getSecurityToken());
-                    }
-
-                    DeliveryState resultantState;
-
-                    final List<Symbol> sourceSupportedOutcomes = new ArrayList<>();
-                    if (source.getOutcomes() != null)
-                    {
-                        sourceSupportedOutcomes.addAll(Arrays.asList(source.getOutcomes()));
-                    }
-                    else if (source.getDefaultOutcome() == null)
-                    {
-                        sourceSupportedOutcomes.add(Accepted.ACCEPTED_SYMBOL);
-                    }
-
-                    boolean transacted = transactionId != null && transaction instanceof LocalTransaction;
-                    if (sourceSupportedOutcomes.contains(outcome.getSymbol()))
-                    {
-                        if (transactionId == null)
+                        try
                         {
-                            resultantState = outcome;
+                            outcome = getReceivingDestination().send(serverMessage,
+                                                                     transaction,
+                                                                     session.getSecurityToken(),
+                                                                     _rejectedOutcomeSupportedBySource,
+                                                                     delivery.isSettled(),
+                                                                     delivery.getDeliveryTag());
                         }
-                        else
+                        catch (AmqpErrorException e)
                         {
-                            TransactionalState transactionalState = new TransactionalState();
-                            transactionalState.setOutcome(outcome);
-                            transactionalState.setTxnId(transactionId);
-                            resultantState = transactionalState;
+                            return e.getError();
                         }
                     }
+
+                    Outcome sourceDefaultOutcome = getSource().getDefaultOutcome();
+                    boolean defaultOutcome = sourceDefaultOutcome != null &&
+                                             sourceDefaultOutcome.getSymbol().equals(outcome.getSymbol());
+                    DeliveryState resultantState;
+                    if (transactionId == null)
+                    {
+                        resultantState = defaultOutcome ? null : outcome;
+                    }
                     else
                     {
-                        if(transacted && source.getDefaultOutcome() != null
-                           && outcome.getSymbol() != source.getDefaultOutcome().getSymbol())
-                        {
-                            ((LocalTransaction) transaction).setRollbackOnly();
-                        }
-                        resultantState = null;
+                        TransactionalState transactionalState = new TransactionalState();
+                        transactionalState.setOutcome(defaultOutcome ? null : outcome);
+                        transactionalState.setTxnId(transactionId);
+                        resultantState = transactionalState;
                     }
 
                     boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
 
                     if (transaction instanceof AsyncAutoCommitTransaction)
                     {
-                        _pendingDispositions.add(new PendingDispositionHolder(delivery.getDeliveryTag(), resultantState, settled));
+                        _pendingDispositions.add(new PendingDispositionHolder(delivery.getDeliveryTag(),
+                                                                              resultantState,
+                                                                              settled));
                     }
                     else
                     {
@@ -321,7 +311,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                     }
 
                     getSession().getAMQPConnection().registerMessageReceived(serverMessage.getSize());
-                    if (transacted)
+                    if (transactionId != null)
                     {
                         getSession().getAMQPConnection().registerTransactedMessageReceived();
                     }
@@ -456,6 +446,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
             }
         }
         getLink().setTermini(source, target);
+        _rejectedOutcomeSupportedBySource =
+                source.getOutcomes() != null && Arrays.asList(source.getOutcomes()).contains(Rejected.REJECTED_SYMBOL);
     }
 
     public ReceivingDestination getReceivingDestination()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
index 37e327e..0849376 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
@@ -28,9 +28,11 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
 
 public class MessageEncoder
 {
+    private Properties _properties;
     private Header _header;
     private List<String> _data = new LinkedList<>();
 
@@ -44,6 +46,11 @@ public class MessageEncoder
         _header = header;
     }
 
+    public void setProperties(final Properties properties)
+    {
+        _properties = properties;
+    }
+
     public QpidByteBuffer getPayload()
     {
         List<QpidByteBuffer> payload = new ArrayList<>();
@@ -52,6 +59,11 @@ public class MessageEncoder
             payload.add(_header.createEncodingRetainingSection().getEncodedForm());
         }
 
+        if (_properties != null)
+        {
+            payload.add(_properties.createEncodingRetainingSection().getEncodedForm());
+        }
+
         if (_data.isEmpty())
         {
             throw new IllegalStateException("Message should have at least one data section");

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
new file mode 100644
index 0000000..c3c69d1
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
@@ -0,0 +1,440 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.anonymousrelay;
+
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
+import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class AnonymousRelayTest extends BrokerAdminUsingTestBase
+{
+    private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+    private static final String TEST_MESSAGE_CONTENT = "test";
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        final BrokerAdmin brokerAdmin = getBrokerAdmin();
+        brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        _brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2. Sending A Message",
+            description = "Messages sent over links into a routing node will be"
+                          + " forwarded to the node referenced in the to field of properties of the message"
+                          + " just as if a direct link has been established to that node.")
+    @Test
+    public void transferPreSettledToExistingDestination() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
+
+            interaction.begin()
+                       .consumeResponse(Begin.class)
+
+                       .attachRole(Role.SENDER)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+
+                       .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
+                       .transferSettled(Boolean.TRUE)
+                       .transfer()
+                       .sync();
+
+            Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+            assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+        }
+    }
+
+    @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors",
+            description = "It is possible that a message sent to a routing node has an address in the to field"
+                          + " of properties which, if used in the address field of target of an attach,"
+                          + " would result in an unsuccessful link establishment (for example,"
+                          + " if the address cannot be resolved to a node). In this case the routing node"
+                          + " MUST communicate the error back to the sender of the message."
+                          + " [...] the message has already been settled by the sender,"
+                          + " then the routing node MUST detach the link with an error.")
+    @Test
+    public void transferPreSettledToUnknownDestination() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
+
+            interaction.begin()
+                       .consumeResponse(Begin.class)
+
+                       .attachRole(Role.SENDER)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+
+                       .transferPayload(generateMessagePayloadToDestination("Unknown"))
+                       .transferSettled(Boolean.TRUE)
+                       .transfer();
+
+            Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            Error error = detach.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
+        }
+    }
+
+    @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors",
+            description = "It is possible that a message sent to a routing node has an address in the to field"
+                          + " of properties which, if used in the address field of target of an attach,"
+                          + " would result in an unsuccessful link establishment (for example,"
+                          + " if the address cannot be resolved to a node). In this case the routing node"
+                          + " MUST communicate the error back to the sender of the message."
+                          + " If the source of the link supports the rejected outcome,"
+                          + " and the message has not already been settled by the sender, then the routing node"
+                          + " MUST reject the message.")
+    @Test
+    public void transferUnsettledToUnknownDestinationWhenRejectedOutcomeSupportedBySource() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
+
+            interaction.begin()
+                       .consumeResponse(Begin.class)
+
+                       .attachRole(Role.SENDER)
+                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+
+                       .transferPayload(generateMessagePayloadToDestination("Unknown"))
+                       .transfer()
+                       .consumeResponse();
+
+            Disposition disposition = interaction.getLatestResponse(Disposition.class);
+
+            assertThat(disposition.getSettled(), is(true));
+
+            DeliveryState dispositionState = disposition.getState();
+            assertThat(dispositionState, is(instanceOf(Rejected.class)));
+
+            Rejected rejected = (Rejected)dispositionState;
+            Error error = rejected.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
+        }
+    }
+
+    @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors",
+            description = "It is possible that a message sent to a routing node has an address in the to field"
+                          + " of properties which, if used in the address field of target of an attach,"
+                          + " would result in an unsuccessful link establishment (for example,"
+                          + " if the address cannot be resolved to a node). In this case the routing node"
+                          + " MUST communicate the error back to the sender of the message."
+                          + " [...]"
+                          + " If the source of the link does not support the rejected outcome,"
+                          + " [...] then the routing node MUST detach the link with an error.")
+    @Test
+    public void transferUnsettledToUnknownDestinationWhenRejectedOutcomeNotSupportedBySource() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
+
+            interaction.begin()
+                       .consumeResponse(Begin.class)
+
+                       .attachRole(Role.SENDER)
+                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+
+                       .transferPayload(generateMessagePayloadToDestination("Unknown"))
+                       .transfer();
+
+            Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            Error error = detach.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
+        }
+    }
+
+    @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2. Sending A Message",
+            description = "Messages sent over links into a routing node will be"
+                          + " forwarded to the node referenced in the to field of properties of the message"
+                          + " just as if a direct link has been established to that node.")
+    @Test
+    public void transferPreSettledInTransactionToExistingDestination() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            interaction.begin()
+                       .consumeResponse(Begin.class)
+
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.SENDER)
+                       .attachHandle(linkHandle)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+
+                       .transferHandle(linkHandle)
+                       .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
+                       .transferTransactionalState(txnState.getCurrentTransactionId())
+                       .transferSettled(Boolean.TRUE)
+                       .transfer()
+
+                       .txnDischarge(txnState, false);
+
+            Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+            assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+        }
+    }
+
+    @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors",
+            description = "It is possible that a message sent to a routing node has an address in the to field"
+                          + " of properties which, if used in the address field of target of an attach,"
+                          + " would result in an unsuccessful link establishment (for example,"
+                          + " if the address cannot be resolved to a node). In this case the routing node"
+                          + " MUST communicate the error back to the sender of the message."
+                          + " [...]"
+                          + " <Not in spec yet>"
+                          + " AMQP-140"
+                          + " In this case the behaviour defined for transactions (of essentially marking"
+                          + " the transaction as rollback only) should take precedence. "
+                          + ""
+                          + " AMQP spec 4.3 Discharging a Transaction"
+                          + " If the coordinator is unable to complete the discharge, the coordinator MUST convey"
+                          + " the error to the controller as a transaction-error. If the source for the link to"
+                          + " the coordinator supports the rejected outcome, then the message MUST be rejected"
+                          + " with this outcome carrying the transaction-error.")
+    @Test
+    public void transferPreSettledInTransactionToUnknownDestinationWhenRejectOutcomeSupportedByTxController()
+            throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+            final Interaction interaction =
+                    openInteractionWithAnonymousRelayCapability(transport);
+
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            interaction.begin()
+                       .consumeResponse(Begin.class)
+
+                       // attaching coordinator link with supported outcomes Accept and Reject
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.SENDER)
+                       .attachHandle(linkHandle)
+                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+
+                       .transferHandle(linkHandle)
+                       .transferPayload(generateMessagePayloadToDestination("Unknown"))
+                       .transferTransactionalState(txnState.getCurrentTransactionId())
+                       .transferSettled(Boolean.TRUE)
+                       .transferDeliveryId(UnsignedInteger.valueOf(1))
+                       .transfer();
+
+            final Discharge discharge = new Discharge();
+            discharge.setTxnId(txnState.getCurrentTransactionId());
+            discharge.setFail(false);
+            interaction.transferHandle(txnState.getHandle())
+                       .transferDeliveryId(UnsignedInteger.valueOf(2))
+                       .transferSettled(Boolean.FALSE)
+                       .transferDeliveryTag(new Binary(("transaction-" + 2).getBytes(StandardCharsets.UTF_8)))
+                       .transferPayloadData(discharge).transfer();
+
+            Disposition dischargeTransactionDisposition = null;
+            Flow coordinatorFlow = null;
+            do
+            {
+                interaction.consumeResponse(Disposition.class, Flow.class);
+                Response<?> response = interaction.getLatestResponse();
+                if (response.getBody() instanceof Disposition)
+                {
+                    dischargeTransactionDisposition = (Disposition) response.getBody();
+                }
+                if (response.getBody() instanceof Flow)
+                {
+                    final Flow flowResponse = (Flow) response.getBody();
+                    if (flowResponse.getHandle().equals(txnState.getHandle()))
+                    {
+                        coordinatorFlow = flowResponse;
+                    }
+                }
+            } while (dischargeTransactionDisposition == null || coordinatorFlow == null);
+
+            assertThat(dischargeTransactionDisposition.getSettled(), is(equalTo(true)));
+            assertThat(dischargeTransactionDisposition.getState(), is(instanceOf(Rejected.class)));
+
+            Rejected rejected = (Rejected) dischargeTransactionDisposition.getState();
+            Error error = rejected.getError();
+
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK)));
+        }
+    }
+
+    @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors",
+            description = "It is possible that a message sent to a routing node has an address in the to field"
+                          + " of properties which, if used in the address field of target of an attach,"
+                          + " would result in an unsuccessful link establishment (for example,"
+                          + " if the address cannot be resolved to a node). In this case the routing node"
+                          + " MUST communicate the error back to the sender of the message."
+                          + " [...]"
+                          + " <Not in spec yet>"
+                          + " AMQP-140"
+                          + ""
+                          + " AMQP spec 4.3 Discharging a Transaction"
+                          + " If the coordinator is unable to complete the discharge, the coordinator MUST convey"
+                          + " the error to the controller as a transaction-error."
+                          + " [...]"
+                          + " If the source does not support the rejected outcome, the transactional resource MUST"
+                          + " detach the link to the coordinator, with the detach performative carrying"
+                          + " the transaction-error")
+    @Test
+    public void transferPreSettledInTransactionToUnknownDestinationWhenRejectOutcomeNotSupportedByTxController()
+            throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+            final Interaction interaction =
+                    openInteractionWithAnonymousRelayCapability(transport);
+
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+
+            interaction.begin()
+                       .consumeResponse(Begin.class)
+
+                       .attachRole(Role.SENDER)
+                       .attachName("testTransactionCoordinator-" + txnState.getHandle())
+                       .attachHandle(txnState.getHandle())
+                       .attachInitialDeliveryCount(UnsignedInteger.ZERO)
+                       .attachTarget(new Coordinator())
+                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.SENDER)
+                       .attachHandle(linkHandle)
+                       .attachTarget(new Target())
+                       .attachName("link-" + linkHandle)
+                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+
+                       .transferHandle(linkHandle)
+                       .transferPayload(generateMessagePayloadToDestination("Unknown"))
+                       .transferTransactionalState(txnState.getCurrentTransactionId())
+                       .transferSettled(Boolean.TRUE)
+                       .transfer();
+
+            final Discharge discharge = new Discharge();
+            discharge.setTxnId(txnState.getCurrentTransactionId());
+            discharge.setFail(false);
+
+            interaction.transferHandle(txnState.getHandle())
+                       .transferSettled(Boolean.FALSE)
+                       .transferDeliveryId(UnsignedInteger.valueOf(4))
+                       .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8)))
+                       .transferPayloadData(discharge).transfer();
+
+            Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError();
+            assertThat(transactionCoordinatorDetachError, is(notNullValue()));
+            assertThat(transactionCoordinatorDetachError.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK)));
+        }
+    }
+
+
+    private Interaction openInteractionWithAnonymousRelayCapability(final FrameTransport transport) throws Exception
+    {
+        final Interaction interaction = transport.newInteraction();
+        interaction.negotiateProtocol().consumeResponse()
+                   .openDesiredCapabilities(ANONYMOUS_RELAY)
+                   .open().consumeResponse(Open.class);
+
+        Open open = interaction.getLatestResponse(Open.class);
+        assumeThat(Arrays.asList(open.getOfferedCapabilities()), hasItem(ANONYMOUS_RELAY));
+        return interaction;
+    }
+
+    private QpidByteBuffer generateMessagePayloadToDestination(final String destinationName)
+    {
+        MessageEncoder messageEncoder = new MessageEncoder();
+        final Properties properties = new Properties();
+        properties.setTo(destinationName);
+        messageEncoder.setProperties(properties);
+        messageEncoder.addData(TEST_MESSAGE_CONTENT);
+        return messageEncoder.getPayload();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 930bf17..2574910 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -31,7 +31,6 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
 
@@ -39,7 +38,6 @@ import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -80,13 +78,13 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
 import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
 import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
-import org.apache.qpid.tests.protocol.SpecificationTest;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -259,6 +257,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .open()
                        .begin()
                        .attachRole(Role.SENDER)
+                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                        .attach()
                        .transferPayloadData("testData")
                        .transferSettled(true)
@@ -273,7 +272,6 @@ public class TransferTest extends BrokerAdminUsingTestBase
             interaction.consumeResponse().getLatestResponse(Begin.class);
             interaction.consumeResponse().getLatestResponse(Attach.class);
             interaction.consumeResponse().getLatestResponse(Flow.class);
-            //interaction.consumeResponse(null, Disposition.class, Detach.class, End.class);
             interaction.consumeResponse().getLatestResponse(Close.class);
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org