You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/01/29 14:27:12 UTC
[04/11] qpid-broker-j git commit: QPID-8062: [Broker-J][AMQP 1.0] Fix
handling of routing errors when target destination cannot route received
message
QPID-8062: [Broker-J][AMQP 1.0] Fix handling of routing errors when target destination cannot route received message
* close the link on receipt of unroutable message when
the source of the link does not support the rejected outcome or
non-transactional message has already been settled by the sender
* mark the publishing transaction as "rollback only" on receipt of
unroutable pre-settled message
(cherry picked from commit 06b4213286305f6d05ce496ca278b73b566f24be)
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b4e746a4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b4e746a4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b4e746a4
Branch: refs/heads/7.0.x
Commit: b4e746a4b76d7baac75c4bc04fdd93198f9fad20
Parents: ae36d90
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Jan 17 16:50:43 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Jan 29 13:57:14 2018 +0000
----------------------------------------------------------------------
.../v1_0/AnonymousRelayDestination.java | 52 ++-
.../protocol/v1_0/NodeReceivingDestination.java | 44 +-
.../protocol/v1_0/ReceivingDestination.java | 11 +-
.../v1_0/StandardReceivingLinkEndpoint.java | 70 ++-
.../tests/protocol/v1_0/MessageEncoder.java | 12 +
.../anonymousrelay/AnonymousRelayTest.java | 440 +++++++++++++++++++
.../protocol/v1_0/messaging/TransferTest.java | 8 +-
7 files changed, 571 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index de7991c..806469b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;/*
import static org.apache.qpid.server.protocol.v1_0.Session_1_0.DELAYED_DELIVERY;
import java.util.Arrays;
+import java.util.Collections;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -29,6 +30,8 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.DestinationAddress;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -37,6 +40,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
public class AnonymousRelayDestination implements ReceivingDestination
@@ -70,7 +74,12 @@ public class AnonymousRelayDestination implements ReceivingDestination
}
@Override
- public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
+ public Outcome send(final ServerMessage<?> message,
+ final ServerTransaction txn,
+ final SecurityToken securityToken,
+ final boolean rejectedOutcomeSupportedBySource,
+ final boolean deliverySettled,
+ final Binary deliveryTag) throws AmqpErrorException
{
final ReceivingDestination destination;
final String routingAddress = message.getTo();
@@ -99,12 +108,41 @@ public class AnonymousRelayDestination implements ReceivingDestination
}
else
{
- outcome = createdRejectedOutcome(AmqpError.NOT_FOUND, "Unknown destination '" + routingAddress + "'");
+ final Error notFoundError = new Error(AmqpError.NOT_FOUND,
+ String.format("Unknown destination '%s'", routingAddress));
+ notFoundError.setInfo(Collections.singletonMap(DELIVERY_TAG, deliveryTag));
+
+ // If the source of the link does not support the rejected outcome,
+ // or the message has already been settled by the sender,
+ // then the routing node MUST detach the link with an error.
+ // AMQP-140: When pre-settled messages are being sent within a transaction,
+ // then the behaviour defined for transactions should take precedence
+ // (essentially marking the transaction as rollback only).
+ if (!rejectedOutcomeSupportedBySource || (deliverySettled && !(txn instanceof LocalTransaction)))
+ {
+ throw new AmqpErrorException(notFoundError);
+ }
+ else
+ {
+ if (deliverySettled && txn instanceof LocalTransaction)
+ {
+ ((LocalTransaction) txn).setRollbackOnly();
+ }
+
+ Rejected rejected = new Rejected();
+ rejected.setError(notFoundError);
+ outcome = rejected;
+ }
}
}
else
{
- outcome = destination.send(message, txn, securityToken);
+ outcome = destination.send(message,
+ txn,
+ securityToken,
+ rejectedOutcomeSupportedBySource,
+ deliverySettled,
+ deliveryTag);
}
return outcome;
}
@@ -127,12 +165,4 @@ public class AnonymousRelayDestination implements ReceivingDestination
{
return null;
}
-
- private Outcome createdRejectedOutcome(AmqpError errorCode, String errorMessage)
- {
- Rejected rejected = new Rejected();
- final Error notFoundError = new Error(errorCode, errorMessage);
- rejected.setError(notFoundError);
- return rejected;
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index 57ad962..6d5859f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -34,6 +34,8 @@ import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.DestinationAddress;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -43,12 +45,13 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
public class NodeReceivingDestination implements ReceivingDestination
{
private static final Accepted ACCEPTED = new Accepted();
- public static final Rejected REJECTED = new Rejected();
+ private static final Rejected REJECTED = new Rejected();
private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
private final boolean _discardUnroutable;
private final EventLogger _eventLogger;
@@ -93,7 +96,12 @@ public class NodeReceivingDestination implements ReceivingDestination
}
@Override
- public Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken)
+ public Outcome send(final ServerMessage<?> message,
+ final ServerTransaction txn,
+ final SecurityToken securityToken,
+ final boolean rejectedOutcomeSupportedBySource,
+ final boolean deliverySettled,
+ final Binary deliveryTag) throws AmqpErrorException
{
final String routingAddress = "".equals(_routingAddress) ? getRoutingAddress(message) : _routingAddress;
_destination.authorisePublish(securityToken, Collections.singletonMap("routingKey", routingAddress));
@@ -128,27 +136,45 @@ public class NodeReceivingDestination implements ReceivingDestination
{
if (!_discardUnroutable)
{
+ final Error error;
if (result.isRejected())
{
- AmqpError error;
+ AmqpError errorCode;
if (result.containsReject(RejectType.LIMIT_EXCEEDED))
{
- error = AmqpError.RESOURCE_LIMIT_EXCEEDED;
+ errorCode = AmqpError.RESOURCE_LIMIT_EXCEEDED;
}
else if (result.containsReject(RejectType.PRECONDITION_FAILED))
{
- error = AmqpError.PRECONDITION_FAILED;
+ errorCode = AmqpError.PRECONDITION_FAILED;
}
else
{
- error = AmqpError.ILLEGAL_STATE;
+ errorCode = AmqpError.ILLEGAL_STATE;
}
- return createdRejectedOutcome(error, result.getRejectReason());
+ error = new Error(errorCode, result.getRejectReason());
}
else
{
- return createdRejectedOutcome(AmqpError.NOT_FOUND,
- String.format("Unknown destination '%s'", routingAddress));
+ error = new Error(AmqpError.NOT_FOUND,
+ String.format("Unknown destination '%s'", routingAddress));
+ }
+ error.setInfo(Collections.singletonMap(DELIVERY_TAG, deliveryTag));
+
+ if (!rejectedOutcomeSupportedBySource || (deliverySettled && !(txn instanceof LocalTransaction)))
+ {
+ throw new AmqpErrorException(error);
+ }
+ else
+ {
+ if (deliverySettled && txn instanceof LocalTransaction)
+ {
+ ((LocalTransaction) txn).setRollbackOnly();
+ }
+
+ Rejected rejected = new Rejected();
+ rejected.setError(error);
+ return rejected;
}
}
else
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
index a6a910f..5abd825 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.security.SecurityToken;
@@ -29,7 +31,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
public interface ReceivingDestination
{
-
+ Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag");
Symbol REJECT_UNROUTABLE = Symbol.valueOf("REJECT_UNROUTABLE");
Symbol DISCARD_UNROUTABLE = Symbol.valueOf("DISCARD_UNROUTABLE");
@@ -37,7 +39,12 @@ public interface ReceivingDestination
Outcome[] getOutcomes();
- Outcome send(final ServerMessage<?> message, final ServerTransaction txn, final SecurityToken securityToken);
+ Outcome send(final ServerMessage<?> message,
+ final ServerTransaction txn,
+ final SecurityToken securityToken,
+ final boolean rejectedOutcomeSupportedBySource,
+ final boolean deliverySettled,
+ final Binary deliveryTag) throws AmqpErrorException;
int getCredit();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index bc9dd57..a565f7f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -54,7 +54,6 @@ import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
@@ -80,7 +79,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
private static final String LINK = "link";
- private ReceivingDestination _receivingDestination;
+ private volatile ReceivingDestination _receivingDestination;
+ private volatile boolean _rejectedOutcomeSupportedBySource;
private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<>();
@@ -247,13 +247,11 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
.checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
Outcome outcome;
- Source source = getSource();
if (serverMessage.isPersistent() && !getAddressSpace().getMessageStore().isPersistent())
{
final Error preconditionFailedError = new Error(AmqpError.PRECONDITION_FAILED,
"Non-durable message store cannot accept durable message.");
- if (source.getOutcomes() != null && Arrays.asList(source.getOutcomes())
- .contains(Rejected.REJECTED_SYMBOL))
+ if (_rejectedOutcomeSupportedBySource)
{
final Rejected rejected = new Rejected();
rejected.setError(preconditionFailedError);
@@ -267,52 +265,44 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
else
{
- outcome = getReceivingDestination().send(serverMessage, transaction,
- session.getSecurityToken());
- }
-
- DeliveryState resultantState;
-
- final List<Symbol> sourceSupportedOutcomes = new ArrayList<>();
- if (source.getOutcomes() != null)
- {
- sourceSupportedOutcomes.addAll(Arrays.asList(source.getOutcomes()));
- }
- else if (source.getDefaultOutcome() == null)
- {
- sourceSupportedOutcomes.add(Accepted.ACCEPTED_SYMBOL);
- }
-
- boolean transacted = transactionId != null && transaction instanceof LocalTransaction;
- if (sourceSupportedOutcomes.contains(outcome.getSymbol()))
- {
- if (transactionId == null)
+ try
{
- resultantState = outcome;
+ outcome = getReceivingDestination().send(serverMessage,
+ transaction,
+ session.getSecurityToken(),
+ _rejectedOutcomeSupportedBySource,
+ delivery.isSettled(),
+ delivery.getDeliveryTag());
}
- else
+ catch (AmqpErrorException e)
{
- TransactionalState transactionalState = new TransactionalState();
- transactionalState.setOutcome(outcome);
- transactionalState.setTxnId(transactionId);
- resultantState = transactionalState;
+ return e.getError();
}
}
+
+ Outcome sourceDefaultOutcome = getSource().getDefaultOutcome();
+ boolean defaultOutcome = sourceDefaultOutcome != null &&
+ sourceDefaultOutcome.getSymbol().equals(outcome.getSymbol());
+ DeliveryState resultantState;
+ if (transactionId == null)
+ {
+ resultantState = defaultOutcome ? null : outcome;
+ }
else
{
- if(transacted && source.getDefaultOutcome() != null
- && outcome.getSymbol() != source.getDefaultOutcome().getSymbol())
- {
- ((LocalTransaction) transaction).setRollbackOnly();
- }
- resultantState = null;
+ TransactionalState transactionalState = new TransactionalState();
+ transactionalState.setOutcome(defaultOutcome ? null : outcome);
+ transactionalState.setTxnId(transactionId);
+ resultantState = transactionalState;
}
boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
if (transaction instanceof AsyncAutoCommitTransaction)
{
- _pendingDispositions.add(new PendingDispositionHolder(delivery.getDeliveryTag(), resultantState, settled));
+ _pendingDispositions.add(new PendingDispositionHolder(delivery.getDeliveryTag(),
+ resultantState,
+ settled));
}
else
{
@@ -321,7 +311,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
getSession().getAMQPConnection().registerMessageReceived(serverMessage.getSize());
- if (transacted)
+ if (transactionId != null)
{
getSession().getAMQPConnection().registerTransactedMessageReceived();
}
@@ -456,6 +446,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
}
getLink().setTermini(source, target);
+ _rejectedOutcomeSupportedBySource =
+ source.getOutcomes() != null && Arrays.asList(source.getOutcomes()).contains(Rejected.REJECTED_SYMBOL);
}
public ReceivingDestination getReceivingDestination()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
index 37e327e..0849376 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
@@ -28,9 +28,11 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
public class MessageEncoder
{
+ private Properties _properties;
private Header _header;
private List<String> _data = new LinkedList<>();
@@ -44,6 +46,11 @@ public class MessageEncoder
_header = header;
}
+ public void setProperties(final Properties properties)
+ {
+ _properties = properties;
+ }
+
public QpidByteBuffer getPayload()
{
List<QpidByteBuffer> payload = new ArrayList<>();
@@ -52,6 +59,11 @@ public class MessageEncoder
payload.add(_header.createEncodingRetainingSection().getEncodedForm());
}
+ if (_properties != null)
+ {
+ payload.add(_properties.createEncodingRetainingSection().getEncodedForm());
+ }
+
if (_data.isEmpty())
{
throw new IllegalStateException("Message should have at least one data section");
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
new file mode 100644
index 0000000..c3c69d1
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousrelay/AnonymousRelayTest.java
@@ -0,0 +1,440 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.anonymousrelay;
+
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
+import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class AnonymousRelayTest extends BrokerAdminUsingTestBase
+{
+ private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+ private static final String TEST_MESSAGE_CONTENT = "test";
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ final BrokerAdmin brokerAdmin = getBrokerAdmin();
+ brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ _brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2. Sending A Message",
+ description = "Messages sent over links into a routing node will be"
+ + " forwarded to the node referenced in the to field of properties of the message"
+ + " just as if a direct link has been established to that node.")
+ @Test
+ public void transferPreSettledToExistingDestination() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
+
+ interaction.begin()
+ .consumeResponse(Begin.class)
+
+ .attachRole(Role.SENDER)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
+ .transferSettled(Boolean.TRUE)
+ .transfer()
+ .sync();
+
+ Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+ }
+ }
+
+ @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors",
+ description = "It is possible that a message sent to a routing node has an address in the to field"
+ + " of properties which, if used in the address field of target of an attach,"
+ + " would result in an unsuccessful link establishment (for example,"
+ + " if the address cannot be resolved to a node). In this case the routing node"
+ + " MUST communicate the error back to the sender of the message."
+ + " [...] the message has already been settled by the sender,"
+ + " then the routing node MUST detach the link with an error.")
+ @Test
+ public void transferPreSettledToUnknownDestination() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
+
+ interaction.begin()
+ .consumeResponse(Begin.class)
+
+ .attachRole(Role.SENDER)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferPayload(generateMessagePayloadToDestination("Unknown"))
+ .transferSettled(Boolean.TRUE)
+ .transfer();
+
+ Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ Error error = detach.getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
+ }
+ }
+
+ @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors",
+ description = "It is possible that a message sent to a routing node has an address in the to field"
+ + " of properties which, if used in the address field of target of an attach,"
+ + " would result in an unsuccessful link establishment (for example,"
+ + " if the address cannot be resolved to a node). In this case the routing node"
+ + " MUST communicate the error back to the sender of the message."
+ + " If the source of the link supports the rejected outcome,"
+ + " and the message has not already been settled by the sender, then the routing node"
+ + " MUST reject the message.")
+ @Test
+ public void transferUnsettledToUnknownDestinationWhenRejectedOutcomeSupportedBySource() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
+
+ interaction.begin()
+ .consumeResponse(Begin.class)
+
+ .attachRole(Role.SENDER)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferPayload(generateMessagePayloadToDestination("Unknown"))
+ .transfer()
+ .consumeResponse();
+
+ Disposition disposition = interaction.getLatestResponse(Disposition.class);
+
+ assertThat(disposition.getSettled(), is(true));
+
+ DeliveryState dispositionState = disposition.getState();
+ assertThat(dispositionState, is(instanceOf(Rejected.class)));
+
+ Rejected rejected = (Rejected)dispositionState;
+ Error error = rejected.getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
+ }
+ }
+
+ @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors",
+ description = "It is possible that a message sent to a routing node has an address in the to field"
+ + " of properties which, if used in the address field of target of an attach,"
+ + " would result in an unsuccessful link establishment (for example,"
+ + " if the address cannot be resolved to a node). In this case the routing node"
+ + " MUST communicate the error back to the sender of the message."
+ + " [...]"
+ + " If the source of the link does not support the rejected outcome,"
+ + " [...] then the routing node MUST detach the link with an error.")
+ @Test
+ public void transferUnsettledToUnknownDestinationWhenRejectedOutcomeNotSupportedBySource() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
+
+ interaction.begin()
+ .consumeResponse(Begin.class)
+
+ .attachRole(Role.SENDER)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferPayload(generateMessagePayloadToDestination("Unknown"))
+ .transfer();
+
+ Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ Error error = detach.getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
+ }
+ }
+
+ @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2. Sending A Message",
+ description = "Messages sent over links into a routing node will be"
+ + " forwarded to the node referenced in the to field of properties of the message"
+ + " just as if a direct link has been established to that node.")
+ @Test
+ public void transferPreSettledInTransactionToExistingDestination() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
+ final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ interaction.begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.SENDER)
+ .attachHandle(linkHandle)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferHandle(linkHandle)
+ .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
+ .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferSettled(Boolean.TRUE)
+ .transfer()
+
+ .txnDischarge(txnState, false);
+
+ Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+ }
+ }
+
+ @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors",
+ description = "It is possible that a message sent to a routing node has an address in the to field"
+ + " of properties which, if used in the address field of target of an attach,"
+ + " would result in an unsuccessful link establishment (for example,"
+ + " if the address cannot be resolved to a node). In this case the routing node"
+ + " MUST communicate the error back to the sender of the message."
+ + " [...]"
+ + " <Not in spec yet>"
+ + " AMQP-140"
+ + " In this case the behaviour defined for transactions (of essentially marking"
+ + " the transaction as rollback only) should take precedence. "
+ + ""
+ + " AMQP spec 4.3 Discharging a Transaction"
+ + " If the coordinator is unable to complete the discharge, the coordinator MUST convey"
+ + " the error to the controller as a transaction-error. If the source for the link to"
+ + " the coordinator supports the rejected outcome, then the message MUST be rejected"
+ + " with this outcome carrying the transaction-error.")
+ @Test
+ public void transferPreSettledInTransactionToUnknownDestinationWhenRejectOutcomeSupportedByTxController()
+ throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+ final Interaction interaction =
+ openInteractionWithAnonymousRelayCapability(transport);
+
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ interaction.begin()
+ .consumeResponse(Begin.class)
+
+ // attaching coordinator link with supported outcomes Accept and Reject
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.SENDER)
+ .attachHandle(linkHandle)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferHandle(linkHandle)
+ .transferPayload(generateMessagePayloadToDestination("Unknown"))
+ .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferSettled(Boolean.TRUE)
+ .transferDeliveryId(UnsignedInteger.valueOf(1))
+ .transfer();
+
+ final Discharge discharge = new Discharge();
+ discharge.setTxnId(txnState.getCurrentTransactionId());
+ discharge.setFail(false);
+ interaction.transferHandle(txnState.getHandle())
+ .transferDeliveryId(UnsignedInteger.valueOf(2))
+ .transferSettled(Boolean.FALSE)
+ .transferDeliveryTag(new Binary(("transaction-" + 2).getBytes(StandardCharsets.UTF_8)))
+ .transferPayloadData(discharge).transfer();
+
+ Disposition dischargeTransactionDisposition = null;
+ Flow coordinatorFlow = null;
+ do
+ {
+ interaction.consumeResponse(Disposition.class, Flow.class);
+ Response<?> response = interaction.getLatestResponse();
+ if (response.getBody() instanceof Disposition)
+ {
+ dischargeTransactionDisposition = (Disposition) response.getBody();
+ }
+ if (response.getBody() instanceof Flow)
+ {
+ final Flow flowResponse = (Flow) response.getBody();
+ if (flowResponse.getHandle().equals(txnState.getHandle()))
+ {
+ coordinatorFlow = flowResponse;
+ }
+ }
+ } while (dischargeTransactionDisposition == null || coordinatorFlow == null);
+
+ assertThat(dischargeTransactionDisposition.getSettled(), is(equalTo(true)));
+ assertThat(dischargeTransactionDisposition.getState(), is(instanceOf(Rejected.class)));
+
+ Rejected rejected = (Rejected) dischargeTransactionDisposition.getState();
+ Error error = rejected.getError();
+
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK)));
+ }
+ }
+
+ @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors",
+ description = "It is possible that a message sent to a routing node has an address in the to field"
+ + " of properties which, if used in the address field of target of an attach,"
+ + " would result in an unsuccessful link establishment (for example,"
+ + " if the address cannot be resolved to a node). In this case the routing node"
+ + " MUST communicate the error back to the sender of the message."
+ + " [...]"
+ + " <Not in spec yet>"
+ + " AMQP-140"
+ + ""
+ + " AMQP spec 4.3 Discharging a Transaction"
+ + " If the coordinator is unable to complete the discharge, the coordinator MUST convey"
+ + " the error to the controller as a transaction-error."
+ + " [...]"
+ + " If the source does not support the rejected outcome, the transactional resource MUST"
+ + " detach the link to the coordinator, with the detach performative carrying"
+ + " the transaction-error")
+ @Test
+ public void transferPreSettledInTransactionToUnknownDestinationWhenRejectOutcomeNotSupportedByTxController()
+ throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+ final Interaction interaction =
+ openInteractionWithAnonymousRelayCapability(transport);
+
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+
+ interaction.begin()
+ .consumeResponse(Begin.class)
+
+ .attachRole(Role.SENDER)
+ .attachName("testTransactionCoordinator-" + txnState.getHandle())
+ .attachHandle(txnState.getHandle())
+ .attachInitialDeliveryCount(UnsignedInteger.ZERO)
+ .attachTarget(new Coordinator())
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.SENDER)
+ .attachHandle(linkHandle)
+ .attachTarget(new Target())
+ .attachName("link-" + linkHandle)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferHandle(linkHandle)
+ .transferPayload(generateMessagePayloadToDestination("Unknown"))
+ .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferSettled(Boolean.TRUE)
+ .transfer();
+
+ final Discharge discharge = new Discharge();
+ discharge.setTxnId(txnState.getCurrentTransactionId());
+ discharge.setFail(false);
+
+ interaction.transferHandle(txnState.getHandle())
+ .transferSettled(Boolean.FALSE)
+ .transferDeliveryId(UnsignedInteger.valueOf(4))
+ .transferDeliveryTag(new Binary(("transaction-" + 4).getBytes(StandardCharsets.UTF_8)))
+ .transferPayloadData(discharge).transfer();
+
+ Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError();
+ assertThat(transactionCoordinatorDetachError, is(notNullValue()));
+ assertThat(transactionCoordinatorDetachError.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK)));
+ }
+ }
+
+
+ private Interaction openInteractionWithAnonymousRelayCapability(final FrameTransport transport) throws Exception
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .openDesiredCapabilities(ANONYMOUS_RELAY)
+ .open().consumeResponse(Open.class);
+
+ Open open = interaction.getLatestResponse(Open.class);
+ assumeThat(Arrays.asList(open.getOfferedCapabilities()), hasItem(ANONYMOUS_RELAY));
+ return interaction;
+ }
+
+ private QpidByteBuffer generateMessagePayloadToDestination(final String destinationName)
+ {
+ MessageEncoder messageEncoder = new MessageEncoder();
+ final Properties properties = new Properties();
+ properties.setTo(destinationName);
+ messageEncoder.setProperties(properties);
+ messageEncoder.addData(TEST_MESSAGE_CONTENT);
+ return messageEncoder.getPayload();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e746a4/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 930bf17..2574910 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -31,7 +31,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
@@ -39,7 +38,6 @@ import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
-import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -80,13 +78,13 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
-import org.apache.qpid.tests.protocol.SpecificationTest;
-import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -259,6 +257,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.open()
.begin()
.attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach()
.transferPayloadData("testData")
.transferSettled(true)
@@ -273,7 +272,6 @@ public class TransferTest extends BrokerAdminUsingTestBase
interaction.consumeResponse().getLatestResponse(Begin.class);
interaction.consumeResponse().getLatestResponse(Attach.class);
interaction.consumeResponse().getLatestResponse(Flow.class);
- //interaction.consumeResponse(null, Disposition.class, Detach.class, End.class);
interaction.consumeResponse().getLatestResponse(Close.class);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org