You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2016/07/20 09:35:40 UTC
[8/9] activemq-artemis git commit: ARTEMIS-637 Port 5.x AMQP test
client
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
new file mode 100644
index 0000000..320d174
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpMessage {
+
+ private final AmqpReceiver receiver;
+ private final Message message;
+ private final Delivery delivery;
+
+ private Map<Symbol, Object> deliveryAnnotationsMap;
+ private Map<Symbol, Object> messageAnnotationsMap;
+ private Map<String, Object> applicationPropertiesMap;
+
+ /**
+ * Creates a new AmqpMessage that wraps the information necessary to handle
+ * an outgoing message.
+ */
+ public AmqpMessage() {
+ receiver = null;
+ delivery = null;
+
+ message = Proton.message();
+ }
+
+ /**
+ * Creates a new AmqpMessage that wraps the information necessary to handle
+ * an outgoing message.
+ *
+ * @param message the Proton message that is to be sent.
+ */
+ public AmqpMessage(Message message) {
+ this(null, message, null);
+ }
+
+ /**
+ * Creates a new AmqpMessage that wraps the information necessary to handle
+ * an incoming delivery.
+ *
+ * @param receiver the AmqpReceiver that received this message.
+ * @param message the Proton message that was received.
+ * @param delivery the Delivery instance that produced this message.
+ */
+ @SuppressWarnings("unchecked")
+ public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) {
+ this.receiver = receiver;
+ this.message = message;
+ this.delivery = delivery;
+
+ if (message.getMessageAnnotations() != null) {
+ messageAnnotationsMap = message.getMessageAnnotations().getValue();
+ }
+
+ if (message.getApplicationProperties() != null) {
+ applicationPropertiesMap = message.getApplicationProperties().getValue();
+ }
+
+ if (message.getDeliveryAnnotations() != null) {
+ deliveryAnnotationsMap = message.getDeliveryAnnotations().getValue();
+ }
+ }
+
+ //----- Access to interal client resources -------------------------------//
+
+ /**
+ * @return the AMQP Delivery object linked to a received message.
+ */
+ public Delivery getWrappedDelivery() {
+ if (delivery != null) {
+ return new UnmodifiableDelivery(delivery);
+ }
+
+ return null;
+ }
+
+ /**
+ * @return the AMQP Message that is wrapped by this object.
+ */
+ public Message getWrappedMessage() {
+ return message;
+ }
+
+ /**
+ * @return the AmqpReceiver that consumed this message.
+ */
+ public AmqpReceiver getAmqpReceiver() {
+ return receiver;
+ }
+
+ //----- Message disposition control --------------------------------------//
+
+ /**
+ * Accepts the message marking it as consumed on the remote peer.
+ *
+ * @throws Exception if an error occurs during the accept.
+ */
+ public void accept() throws Exception {
+ if (receiver == null) {
+ throw new IllegalStateException("Can't accept non-received message.");
+ }
+
+ receiver.accept(delivery);
+ }
+
+ /**
+ * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
+ *
+ * @param deliveryFailed indicates that the delivery failed for some reason.
+ * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to.
+ * @throws Exception if an error occurs during the process.
+ */
+ public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
+ if (receiver == null) {
+ throw new IllegalStateException("Can't modify non-received message.");
+ }
+
+ receiver.modified(delivery, deliveryFailed, undeliverableHere);
+ }
+
+ /**
+ * Release the message, remote can redeliver it elsewhere.
+ *
+ * @throws Exception if an error occurs during the reject.
+ */
+ public void release() throws Exception {
+ if (receiver == null) {
+ throw new IllegalStateException("Can't release non-received message.");
+ }
+
+ receiver.release(delivery);
+ }
+
+ //----- Convenience methods for constructing outbound messages -----------//
+
+ /**
+ * Sets the MessageId property on an outbound message using the provided String
+ *
+ * @param messageId the String message ID value to set.
+ */
+ public void setMessageId(String messageId) {
+ checkReadOnly();
+ lazyCreateProperties();
+ getWrappedMessage().setMessageId(messageId);
+ }
+
+ /**
+ * Return the set MessageId value in String form, if there are no properties
+ * in the given message return null.
+ *
+ * @return the set message ID in String form or null if not set.
+ */
+ public String getMessageId() {
+ if (message.getProperties() == null) {
+ return null;
+ }
+
+ return message.getProperties().getMessageId().toString();
+ }
+
+ /**
+ * Return the set MessageId value in the original form, if there are no properties
+ * in the given message return null.
+ *
+ * @return the set message ID in its original form or null if not set.
+ */
+ public Object getRawMessageId() {
+ if (message.getProperties() == null) {
+ return null;
+ }
+
+ return message.getProperties().getMessageId();
+ }
+
+ /**
+ * Sets the MessageId property on an outbound message using the provided value
+ *
+ * @param messageId the message ID value to set.
+ */
+ public void setRawMessageId(Object messageId) {
+ checkReadOnly();
+ lazyCreateProperties();
+ getWrappedMessage().setMessageId(messageId);
+ }
+
+ /**
+ * Sets the CorrelationId property on an outbound message using the provided String
+ *
+ * @param correlationId the String Correlation ID value to set.
+ */
+ public void setCorrelationId(String correlationId) {
+ checkReadOnly();
+ lazyCreateProperties();
+ getWrappedMessage().setCorrelationId(correlationId);
+ }
+
+ /**
+ * Return the set CorrelationId value in String form, if there are no properties
+ * in the given message return null.
+ *
+ * @return the set correlation ID in String form or null if not set.
+ */
+ public String getCorrelationId() {
+ if (message.getProperties() == null) {
+ return null;
+ }
+
+ return message.getProperties().getCorrelationId().toString();
+ }
+
+ /**
+ * Return the set CorrelationId value in the original form, if there are no properties
+ * in the given message return null.
+ *
+ * @return the set message ID in its original form or null if not set.
+ */
+ public Object getRawCorrelationId() {
+ if (message.getProperties() == null) {
+ return null;
+ }
+
+ return message.getProperties().getCorrelationId();
+ }
+
+ /**
+ * Sets the CorrelationId property on an outbound message using the provided value
+ *
+ * @param correlationId the correlation ID value to set.
+ */
+ public void setRawCorrelationId(Object correlationId) {
+ checkReadOnly();
+ lazyCreateProperties();
+ getWrappedMessage().setCorrelationId(correlationId);
+ }
+
+ /**
+ * Sets the GroupId property on an outbound message using the provided String
+ *
+ * @param groupId the String Group ID value to set.
+ */
+ public void setGroupId(String groupId) {
+ checkReadOnly();
+ lazyCreateProperties();
+ getWrappedMessage().setGroupId(groupId);
+ }
+
+ /**
+ * Return the set GroupId value in String form, if there are no properties
+ * in the given message return null.
+ *
+ * @return the set GroupID in String form or null if not set.
+ */
+ public String getGroupId() {
+ if (message.getProperties() == null) {
+ return null;
+ }
+
+ return message.getProperties().getGroupId();
+ }
+
+ /**
+ * Sets the durable header on the outgoing message.
+ *
+ * @param durable the boolean durable value to set.
+ */
+ public void setDurable(boolean durable) {
+ checkReadOnly();
+ lazyCreateHeader();
+ getWrappedMessage().setDurable(durable);
+ }
+
+ /**
+ * Checks the durable value in the Message Headers to determine if
+ * the message was sent as a durable Message.
+ *
+ * @return true if the message is marked as being durable.
+ */
+ public boolean isDurable() {
+ if (message.getHeader() == null) {
+ return false;
+ }
+
+ return message.getHeader().getDurable();
+ }
+
+ /**
+ * Sets a given application property on an outbound message.
+ *
+ * @param key the name to assign the new property.
+ * @param value the value to set for the named property.
+ */
+ public void setApplicationProperty(String key, Object value) {
+ checkReadOnly();
+ lazyCreateApplicationProperties();
+ applicationPropertiesMap.put(key, value);
+ }
+
+ /**
+ * Gets the application property that is mapped to the given name or null
+ * if no property has been set with that name.
+ *
+ * @param key the name used to lookup the property in the application properties.
+ * @return the propety value or null if not set.
+ */
+ public Object getApplicationProperty(String key) {
+ if (applicationPropertiesMap == null) {
+ return null;
+ }
+
+ return applicationPropertiesMap.get(key);
+ }
+
+ /**
+ * Perform a proper annotation set on the AMQP Message based on a Symbol key and
+ * the target value to append to the current annotations.
+ *
+ * @param key The name of the Symbol whose value is being set.
+ * @param value The new value to set in the annotations of this message.
+ */
+ public void setMessageAnnotation(String key, Object value) {
+ checkReadOnly();
+ lazyCreateMessageAnnotations();
+ messageAnnotationsMap.put(Symbol.valueOf(key), value);
+ }
+
+ /**
+ * Given a message annotation name, lookup and return the value associated with
+ * that annotation name. If the message annotations have not been created yet
+ * then this method will always return null.
+ *
+ * @param key the Symbol name that should be looked up in the message annotations.
+ * @return the value of the annotation if it exists, or null if not set or not accessible.
+ */
+ public Object getMessageAnnotation(String key) {
+ if (messageAnnotationsMap == null) {
+ return null;
+ }
+
+ return messageAnnotationsMap.get(Symbol.valueOf(key));
+ }
+
+ /**
+ * Perform a proper delivery annotation set on the AMQP Message based on a Symbol
+ * key and the target value to append to the current delivery annotations.
+ *
+ * @param key The name of the Symbol whose value is being set.
+ * @param value The new value to set in the delivery annotations of this message.
+ */
+ public void setDeliveryAnnotation(String key, Object value) {
+ checkReadOnly();
+ lazyCreateDeliveryAnnotations();
+ deliveryAnnotationsMap.put(Symbol.valueOf(key), value);
+ }
+
+ /**
+ * Given a message annotation name, lookup and return the value associated with
+ * that annotation name. If the message annotations have not been created yet
+ * then this method will always return null.
+ *
+ * @param key the Symbol name that should be looked up in the message annotations.
+ * @return the value of the annotation if it exists, or null if not set or not accessible.
+ */
+ public Object getDeliveryAnnotation(String key) {
+ if (deliveryAnnotationsMap == null) {
+ return null;
+ }
+
+ return deliveryAnnotationsMap.get(Symbol.valueOf(key));
+ }
+
+ //----- Methods for manipulating the Message body ------------------------//
+
+ /**
+ * Sets a String value into the body of an outgoing Message, throws
+ * an exception if this is an incoming message instance.
+ *
+ * @param value the String value to store in the Message body.
+ * @throws IllegalStateException if the message is read only.
+ */
+ public void setText(String value) throws IllegalStateException {
+ checkReadOnly();
+ AmqpValue body = new AmqpValue(value);
+ getWrappedMessage().setBody(body);
+ }
+
+ /**
+ * Sets a byte array value into the body of an outgoing Message, throws
+ * an exception if this is an incoming message instance.
+ *
+ * @param bytes the byte array value to store in the Message body.
+ * @throws IllegalStateException if the message is read only.
+ */
+ public void setBytes(byte[] bytes) throws IllegalStateException {
+ checkReadOnly();
+ Data body = new Data(new Binary(bytes));
+ getWrappedMessage().setBody(body);
+ }
+
+ /**
+ * Sets a byte array value into the body of an outgoing Message, throws
+ * an exception if this is an incoming message instance.
+ *
+ * @param described the byte array value to store in the Message body.
+ * @throws IllegalStateException if the message is read only.
+ */
+ public void setDescribedType(DescribedType described) throws IllegalStateException {
+ checkReadOnly();
+ AmqpValue body = new AmqpValue(described);
+ getWrappedMessage().setBody(body);
+ }
+
+ /**
+ * Attempts to retrieve the message body as an DescribedType instance.
+ *
+ * @return an DescribedType instance if one is stored in the message body.
+ * @throws NoSuchElementException if the body does not contain a DescribedType.
+ */
+ public DescribedType getDescribedType() throws NoSuchElementException {
+ DescribedType result = null;
+
+ if (getWrappedMessage().getBody() == null) {
+ return null;
+ }
+ else {
+ if (getWrappedMessage().getBody() instanceof AmqpValue) {
+ AmqpValue value = (AmqpValue) getWrappedMessage().getBody();
+
+ if (value.getValue() == null) {
+ result = null;
+ }
+ else if (value.getValue() instanceof DescribedType) {
+ result = (DescribedType) value.getValue();
+ }
+ else {
+ throw new NoSuchElementException("Message does not contain a DescribedType body");
+ }
+ }
+ }
+
+ return result;
+ }
+
+ //----- Internal implementation ------------------------------------------//
+
+ private void checkReadOnly() throws IllegalStateException {
+ if (delivery != null) {
+ throw new IllegalStateException("Message is read only.");
+ }
+ }
+
+ private void lazyCreateMessageAnnotations() {
+ if (messageAnnotationsMap == null) {
+ messageAnnotationsMap = new HashMap<>();
+ message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
+ }
+ }
+
+ private void lazyCreateDeliveryAnnotations() {
+ if (deliveryAnnotationsMap == null) {
+ deliveryAnnotationsMap = new HashMap<>();
+ message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
+ }
+ }
+
+ private void lazyCreateApplicationProperties() {
+ if (applicationPropertiesMap == null) {
+ applicationPropertiesMap = new HashMap<>();
+ message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
+ }
+ }
+
+ private void lazyCreateHeader() {
+ if (message.getHeader() == null) {
+ message.setHeader(new Header());
+ }
+ }
+
+ private void lazyCreateProperties() {
+ if (message.getProperties() == null) {
+ message.setProperties(new Properties());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java
new file mode 100644
index 0000000..2e36e84
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_CODE;
+
+/**
+ * A Described Type wrapper for JMS no local option for MessageConsumer.
+ */
+public class AmqpNoLocalFilter implements DescribedType {
+
+ public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter();
+
+ private final String noLocal;
+
+ public AmqpNoLocalFilter() {
+ this.noLocal = "NoLocalFilter{}";
+ }
+
+ @Override
+ public Object getDescriptor() {
+ return NO_LOCAL_CODE;
+ }
+
+ @Override
+ public Object getDescribed() {
+ return this.noLocal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
new file mode 100644
index 0000000..9f3bff2
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -0,0 +1,946 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import javax.jms.InvalidDestinationException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
+import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
+
+/**
+ * Receiver class that manages a Proton receiver endpoint.
+ */
+public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
+
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<>();
+
+ private final AmqpSession session;
+ private final String address;
+ private final String receiverId;
+ private final Source userSpecifiedSource;
+
+ private String subscriptionName;
+ private String selector;
+ private boolean presettle;
+ private boolean noLocal;
+
+ private AsyncResult pullRequest;
+ private AsyncResult stopRequest;
+
+ /**
+ * Create a new receiver instance.
+ *
+ * @param session The parent session that created the receiver.
+ * @param address The address that this receiver should listen on.
+ * @param receiverId The unique ID assigned to this receiver.
+ */
+ public AmqpReceiver(AmqpSession session, String address, String receiverId) {
+
+ if (address != null && address.isEmpty()) {
+ throw new IllegalArgumentException("Address cannot be empty.");
+ }
+
+ this.userSpecifiedSource = null;
+ this.session = session;
+ this.address = address;
+ this.receiverId = receiverId;
+ }
+
+ /**
+ * Create a new receiver instance.
+ *
+ * @param session The parent session that created the receiver.
+ * @param source The Source instance to use instead of creating and configuring one.
+ * @param receiverId The unique ID assigned to this receiver.
+ */
+ public AmqpReceiver(AmqpSession session, Source source, String receiverId) {
+
+ if (source == null) {
+ throw new IllegalArgumentException("User specified Source cannot be null");
+ }
+
+ this.session = session;
+ this.userSpecifiedSource = source;
+ this.address = source.getAddress();
+ this.receiverId = receiverId;
+ }
+
+ /**
+ * Close the receiver, a closed receiver will throw exceptions if any further send
+ * calls are made.
+ *
+ * @throws IOException if an error occurs while closing the receiver.
+ */
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ close(request);
+ session.pumpToProtonTransport(request);
+ }
+ });
+
+ request.sync();
+ }
+ }
+
+ /**
+ * Detach the receiver, a closed receiver will throw exceptions if any further send
+ * calls are made.
+ *
+ * @throws IOException if an error occurs while closing the receiver.
+ */
+ public void detach() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ detach(request);
+ session.pumpToProtonTransport(request);
+ }
+ });
+
+ request.sync();
+ }
+ }
+
+ /**
+ * @return this session's parent AmqpSession.
+ */
+ public AmqpSession getSession() {
+ return session;
+ }
+
+ /**
+ * @return the address that this receiver has been configured to listen on.
+ */
+ public String getAddress() {
+ return address;
+ }
+
+ /**
+ * Attempts to wait on a message to be delivered to this receiver. The receive
+ * call will wait indefinitely for a message to be delivered.
+ *
+ * @return a newly received message sent to this receiver.
+ * @throws Exception if an error occurs during the receive attempt.
+ */
+ public AmqpMessage receive() throws Exception {
+ checkClosed();
+ return prefetch.take();
+ }
+
+ /**
+ * Attempts to receive a message sent to this receiver, waiting for the given
+ * timeout value before giving up and returning null.
+ *
+ * @param timeout the time to wait for a new message to arrive.
+ * @param unit the unit of time that the timeout value represents.
+ * @return a newly received message or null if the time to wait period expires.
+ * @throws Exception if an error occurs during the receive attempt.
+ */
+ public AmqpMessage receive(long timeout, TimeUnit unit) throws Exception {
+ checkClosed();
+ return prefetch.poll(timeout, unit);
+ }
+
+ /**
+ * If a message is already available in this receiver's prefetch buffer then
+ * it is returned immediately otherwise this methods return null without waiting.
+ *
+ * @return a newly received message or null if there is no currently available message.
+ * @throws Exception if an error occurs during the receive attempt.
+ */
+ public AmqpMessage receiveNoWait() throws Exception {
+ checkClosed();
+ return prefetch.poll();
+ }
+
+ /**
+ * Request a remote peer send a Message to this client waiting until one arrives.
+ *
+ * @return the pulled AmqpMessage or null if none was pulled from the remote.
+ * @throws IOException if an error occurs
+ */
+ public AmqpMessage pull() throws IOException {
+ return pull(-1, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Request a remote peer send a Message to this client using an immediate drain request.
+ *
+ * @return the pulled AmqpMessage or null if none was pulled from the remote.
+ * @throws IOException if an error occurs
+ */
+ public AmqpMessage pullImmediate() throws IOException {
+ return pull(0, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Request a remote peer send a Message to this client.
+ *
+ * {@literal timeout < 0} then it should remain open until a message is received.
+ * {@literal timeout = 0} then it returns a message or null if none available
+ * {@literal timeout > 0} then it should remain open for timeout amount of time.
+ *
+ * The timeout value when positive is given in milliseconds.
+ *
+ * @param timeout the amount of time to tell the remote peer to keep this pull request valid.
+ * @param unit the unit of measure that the timeout represents.
+ * @return the pulled AmqpMessage or null if none was pulled from the remote.
+ * @throws IOException if an error occurs
+ */
+ public AmqpMessage pull(final long timeout, final TimeUnit unit) throws IOException {
+ checkClosed();
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+
+ long timeoutMills = unit.toMillis(timeout);
+
+ try {
+ LOG.trace("Pull on Receiver {} with timeout = {}", getSubscriptionName(), timeoutMills);
+ if (timeoutMills < 0) {
+ // Wait until message arrives. Just give credit if needed.
+ if (getEndpoint().getCredit() == 0) {
+ LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
+ getEndpoint().flow(1);
+ }
+
+ // Await the message arrival
+ pullRequest = request;
+ }
+ else if (timeoutMills == 0) {
+ // If we have no credit then we need to issue some so that we can
+ // try to fulfill the request, then drain down what is there to
+ // ensure we consume what is available and remove all credit.
+ if (getEndpoint().getCredit() == 0) {
+ LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
+ getEndpoint().flow(1);
+ }
+
+ // Drain immediately and wait for the message(s) to arrive,
+ // or a flow indicating removal of the remaining credit.
+ stop(request);
+ }
+ else if (timeoutMills > 0) {
+ // If we have no credit then we need to issue some so that we can
+ // try to fulfill the request, then drain down what is there to
+ // ensure we consume what is available and remove all credit.
+ if (getEndpoint().getCredit() == 0) {
+ LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
+ getEndpoint().flow(1);
+ }
+
+ // Wait for the timeout for the message(s) to arrive, then drain if required
+ // and wait for remaining message(s) to arrive or a flow indicating
+ // removal of the remaining credit.
+ stopOnSchedule(timeoutMills, request);
+ }
+
+ session.pumpToProtonTransport(request);
+ }
+ catch (Exception e) {
+ request.onFailure(e);
+ }
+ }
+ });
+
+ request.sync();
+
+ return prefetch.poll();
+ }
+
+ /**
+ * Controls the amount of credit given to the receiver link.
+ *
+ * @param credit the amount of credit to grant.
+ * @throws IOException if an error occurs while sending the flow.
+ */
+ public void flow(final int credit) throws IOException {
+ checkClosed();
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ try {
+ getEndpoint().flow(credit);
+ session.pumpToProtonTransport(request);
+ request.onSuccess();
+ }
+ catch (Exception e) {
+ request.onFailure(e);
+ }
+ }
+ });
+
+ request.sync();
+ }
+
+ /**
+ * Attempts to drain a given amount of credit from the link.
+ *
+ * @param credit the amount of credit to drain.
+ * @throws IOException if an error occurs while sending the drain.
+ */
+ public void drain(final int credit) throws IOException {
+ checkClosed();
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ try {
+ getEndpoint().drain(credit);
+ session.pumpToProtonTransport(request);
+ request.onSuccess();
+ }
+ catch (Exception e) {
+ request.onFailure(e);
+ }
+ }
+ });
+
+ request.sync();
+ }
+
+ /**
+ * Stops the receiver, using all link credit and waiting for in-flight messages to arrive.
+ *
+ * @throws IOException if an error occurs while sending the drain.
+ */
+ public void stop() throws IOException {
+ checkClosed();
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ try {
+ stop(request);
+ session.pumpToProtonTransport(request);
+ }
+ catch (Exception e) {
+ request.onFailure(e);
+ }
+ }
+ });
+
+ request.sync();
+ }
+
+ /**
+ * Accepts a message that was dispatched under the given Delivery instance.
+ *
+ * @param delivery the Delivery instance to accept.
+ * @throws IOException if an error occurs while sending the accept.
+ */
+ public void accept(final Delivery delivery) throws IOException {
+ checkClosed();
+
+ if (delivery == null) {
+ throw new IllegalArgumentException("Delivery to accept cannot be null");
+ }
+
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ try {
+ if (!delivery.isSettled()) {
+ if (session.isInTransaction()) {
+ Binary txnId = session.getTransactionId().getRemoteTxId();
+ if (txnId != null) {
+ TransactionalState txState = new TransactionalState();
+ txState.setOutcome(Accepted.getInstance());
+ txState.setTxnId(txnId);
+ delivery.disposition(txState);
+ delivery.settle();
+ session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
+ }
+ }
+ else {
+ delivery.disposition(Accepted.getInstance());
+ delivery.settle();
+ }
+ }
+ session.pumpToProtonTransport(request);
+ request.onSuccess();
+ }
+ catch (Exception e) {
+ request.onFailure(e);
+ }
+ }
+ });
+
+ request.sync();
+ }
+
+ /**
+ * Mark a message that was dispatched under the given Delivery instance as Modified.
+ *
+ * @param delivery the Delivery instance to mark modified.
+ * @param deliveryFailed indicates that the delivery failed for some reason.
+ * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to.
+ * @throws IOException if an error occurs while sending the reject.
+ */
+ public void modified(final Delivery delivery,
+ final Boolean deliveryFailed,
+ final Boolean undeliverableHere) throws IOException {
+ checkClosed();
+
+ if (delivery == null) {
+ throw new IllegalArgumentException("Delivery to reject cannot be null");
+ }
+
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ try {
+ if (!delivery.isSettled()) {
+ Modified disposition = new Modified();
+ disposition.setUndeliverableHere(undeliverableHere);
+ disposition.setDeliveryFailed(deliveryFailed);
+ delivery.disposition(disposition);
+ delivery.settle();
+ session.pumpToProtonTransport(request);
+ }
+ request.onSuccess();
+ }
+ catch (Exception e) {
+ request.onFailure(e);
+ }
+ }
+ });
+
+ request.sync();
+ }
+
+ /**
+ * Release a message that was dispatched under the given Delivery instance.
+ *
+ * @param delivery the Delivery instance to release.
+ * @throws IOException if an error occurs while sending the release.
+ */
+ public void release(final Delivery delivery) throws IOException {
+ checkClosed();
+
+ if (delivery == null) {
+ throw new IllegalArgumentException("Delivery to release cannot be null");
+ }
+
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ try {
+ if (!delivery.isSettled()) {
+ delivery.disposition(Released.getInstance());
+ delivery.settle();
+ session.pumpToProtonTransport(request);
+ }
+ request.onSuccess();
+ }
+ catch (Exception e) {
+ request.onFailure(e);
+ }
+ }
+ });
+
+ request.sync();
+ }
+
+ /**
+ * @return an unmodifiable view of the underlying Receiver instance.
+ */
+ public Receiver getReceiver() {
+ return new UnmodifiableReceiver(getEndpoint());
+ }
+
+ //----- Receiver configuration properties --------------------------------//
+
+ public boolean isPresettle() {
+ return presettle;
+ }
+
+ public void setPresettle(boolean presettle) {
+ this.presettle = presettle;
+ }
+
+ public boolean isDurable() {
+ return subscriptionName != null;
+ }
+
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public void setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
+
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setSelector(String selector) {
+ this.selector = selector;
+ }
+
+ public boolean isNoLocal() {
+ return noLocal;
+ }
+
+ public void setNoLocal(boolean noLocal) {
+ this.noLocal = noLocal;
+ }
+
+ public long getDrainTimeout() {
+ return session.getConnection().getDrainTimeout();
+ }
+
+ //----- Internal implementation ------------------------------------------//
+
+ @Override
+ protected void doOpen() {
+
+ Source source = userSpecifiedSource;
+ Target target = new Target();
+
+ if (source == null && address != null) {
+ source = new Source();
+ source.setAddress(address);
+ configureSource(source);
+ }
+
+ String receiverName = receiverId + ":" + address;
+
+ if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) {
+ // In the case of Durable Topic Subscriptions the client must use the same
+ // receiver name which is derived from the subscription name property.
+ receiverName = getSubscriptionName();
+ }
+
+ Receiver receiver = session.getEndpoint().receiver(receiverName);
+ receiver.setSource(source);
+ receiver.setTarget(target);
+ if (isPresettle()) {
+ receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
+ }
+ else {
+ receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ }
+ receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ setEndpoint(receiver);
+
+ super.doOpen();
+ }
+
+ @Override
+ protected void doOpenCompletion() {
+ // Verify the attach response contained a non-null Source
+ org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource();
+ if (s != null) {
+ super.doOpenCompletion();
+ }
+ else {
+ // No link terminus was created, the peer will now detach/close us.
+ }
+ }
+
+ @Override
+ protected void doClose() {
+ getEndpoint().close();
+ }
+
+ @Override
+ protected void doDetach() {
+ getEndpoint().detach();
+ }
+
+ @Override
+ protected Exception getOpenAbortException() {
+ // Verify the attach response contained a non-null Source
+ org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource();
+ if (s != null) {
+ return super.getOpenAbortException();
+ }
+ else {
+ // No link terminus was created, the peer has detach/closed us, create IDE.
+ return new InvalidDestinationException("Link creation was refused");
+ }
+ }
+
+ @Override
+ protected void doOpenInspection() {
+ try {
+ getStateInspector().inspectOpenedResource(getReceiver());
+ }
+ catch (Throwable error) {
+ getStateInspector().markAsInvalid(error.getMessage());
+ }
+ }
+
+ @Override
+ protected void doClosedInspection() {
+ try {
+ getStateInspector().inspectClosedResource(getReceiver());
+ }
+ catch (Throwable error) {
+ getStateInspector().markAsInvalid(error.getMessage());
+ }
+ }
+
+ @Override
+ protected void doDetachedInspection() {
+ try {
+ getStateInspector().inspectDetachedResource(getReceiver());
+ }
+ catch (Throwable error) {
+ getStateInspector().markAsInvalid(error.getMessage());
+ }
+ }
+
+ protected void configureSource(Source source) {
+ Map<Symbol, DescribedType> filters = new HashMap<>();
+ Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
+
+ if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) {
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setDistributionMode(COPY);
+ }
+ else {
+ source.setDurable(TerminusDurability.NONE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ }
+
+ source.setOutcomes(outcomes);
+
+ Modified modified = new Modified();
+ modified.setDeliveryFailed(true);
+ modified.setUndeliverableHere(false);
+
+ source.setDefaultOutcome(modified);
+
+ if (isNoLocal()) {
+ filters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
+ }
+
+ if (getSelector() != null && !getSelector().trim().equals("")) {
+ filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(getSelector()));
+ }
+
+ if (!filters.isEmpty()) {
+ source.setFilter(filters);
+ }
+ }
+
+ @Override
+ public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+ Delivery incoming = null;
+ do {
+ incoming = getEndpoint().current();
+ if (incoming != null) {
+ if (incoming.isReadable() && !incoming.isPartial()) {
+ LOG.trace("{} has incoming Message(s).", this);
+ try {
+ processDelivery(incoming);
+ }
+ catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
+ getEndpoint().advance();
+ }
+ else {
+ LOG.trace("{} has a partial incoming Message(s), deferring.", this);
+ incoming = null;
+ }
+ }
+ else {
+ // We have exhausted the locally queued messages on this link.
+ // Check if we tried to stop and have now run out of credit.
+ if (getEndpoint().getRemoteCredit() <= 0) {
+ if (stopRequest != null) {
+ stopRequest.onSuccess();
+ stopRequest = null;
+ }
+ }
+ }
+ } while (incoming != null);
+
+ super.processDeliveryUpdates(connection);
+ }
+
+ private void processDelivery(Delivery incoming) throws Exception {
+ Message message = null;
+ try {
+ message = decodeIncomingMessage(incoming);
+ }
+ catch (Exception e) {
+ LOG.warn("Error on transform: {}", e.getMessage());
+ deliveryFailed(incoming, true);
+ return;
+ }
+
+ AmqpMessage amqpMessage = new AmqpMessage(this, message, incoming);
+ // Store reference to envelope in delivery context for recovery
+ incoming.setContext(amqpMessage);
+ prefetch.add(amqpMessage);
+
+ // We processed a message, signal completion
+ // of a message pull request if there is one.
+ if (pullRequest != null) {
+ pullRequest.onSuccess();
+ pullRequest = null;
+ }
+ }
+
+ @Override
+ public void processFlowUpdates(AmqpConnection connection) throws IOException {
+ if (pullRequest != null || stopRequest != null) {
+ Receiver receiver = getEndpoint();
+ if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
+ if (pullRequest != null) {
+ pullRequest.onSuccess();
+ pullRequest = null;
+ }
+
+ if (stopRequest != null) {
+ stopRequest.onSuccess();
+ stopRequest = null;
+ }
+ }
+ }
+
+ LOG.trace("Consumer {} flow updated, remote credit = {}", getSubscriptionName(), getEndpoint().getRemoteCredit());
+
+ super.processFlowUpdates(connection);
+ }
+
+ protected Message decodeIncomingMessage(Delivery incoming) {
+ int count;
+
+ byte[] chunk = new byte[2048];
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+ while ((count = getEndpoint().recv(chunk, 0, chunk.length)) > 0) {
+ stream.write(chunk, 0, count);
+ }
+
+ byte[] messageBytes = stream.toByteArray();
+
+ try {
+ Message protonMessage = Message.Factory.create();
+ protonMessage.decode(messageBytes, 0, messageBytes.length);
+ return protonMessage;
+ }
+ finally {
+ try {
+ stream.close();
+ }
+ catch (IOException e) {
+ }
+ }
+ }
+
+ protected void deliveryFailed(Delivery incoming, boolean expandCredit) {
+ Modified disposition = new Modified();
+ disposition.setUndeliverableHere(true);
+ disposition.setDeliveryFailed(true);
+ incoming.disposition(disposition);
+ incoming.settle();
+ if (expandCredit) {
+ getEndpoint().flow(1);
+ }
+ }
+
+ private void stop(final AsyncResult request) {
+ Receiver receiver = getEndpoint();
+ if (receiver.getRemoteCredit() <= 0) {
+ if (receiver.getQueued() == 0) {
+ // We have no remote credit and all the deliveries have been processed.
+ request.onSuccess();
+ }
+ else {
+ // There are still deliveries to process, wait for them to be.
+ stopRequest = request;
+ }
+ }
+ else {
+ // TODO: We don't actually want the additional messages that could be sent while
+ // draining. We could explicitly reduce credit first, or possibly use 'echo' instead
+ // of drain if it was supported. We would first need to understand what happens
+ // if we reduce credit below the number of messages already in-flight before
+ // the peer sees the update.
+ stopRequest = request;
+ receiver.drain(0);
+
+ if (getDrainTimeout() > 0) {
+ // If the remote doesn't respond we will close the consumer and break any
+ // blocked receive or stop calls that are waiting.
+ final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() {
+ @Override
+ public void run() {
+ LOG.trace("Consumer {} drain request timed out", this);
+ Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
+ locallyClosed(session.getConnection(), cause);
+ stopRequest.onFailure(cause);
+ session.pumpToProtonTransport(stopRequest);
+ }
+ }, getDrainTimeout(), TimeUnit.MILLISECONDS);
+
+ stopRequest = new ScheduledRequest(future, stopRequest);
+ }
+ }
+ }
+
+ private void stopOnSchedule(long timeout, final AsyncResult request) {
+ LOG.trace("Receiver {} scheduling stop", this);
+ // We need to drain the credit if no message(s) arrive to use it.
+ final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() {
+ @Override
+ public void run() {
+ LOG.trace("Receiver {} running scheduled stop", this);
+ if (getEndpoint().getRemoteCredit() != 0) {
+ stop(request);
+ session.pumpToProtonTransport(request);
+ }
+ }
+ }, timeout, TimeUnit.MILLISECONDS);
+
+ stopRequest = new ScheduledRequest(future, request);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{ address = " + address + "}";
+ }
+
+ private void checkClosed() {
+ if (isClosed()) {
+ throw new IllegalStateException("Receiver is already closed");
+ }
+ }
+
+ //----- Internal Transaction state callbacks -----------------------------//
+
+ void preCommit() {
+ }
+
+ void preRollback() {
+ }
+
+ void postCommit() {
+ }
+
+ void postRollback() {
+ }
+
+ //----- Inner classes used in message pull operations --------------------//
+
+ protected static final class ScheduledRequest implements AsyncResult {
+
+ private final ScheduledFuture<?> sheduledTask;
+ private final AsyncResult origRequest;
+
+ public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
+ this.sheduledTask = completionTask;
+ this.origRequest = origRequest;
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ sheduledTask.cancel(false);
+ origRequest.onFailure(cause);
+ }
+
+ @Override
+ public void onSuccess() {
+ boolean cancelled = sheduledTask.cancel(false);
+ if (cancelled) {
+ // Signal completion. Otherwise wait for the scheduled task to do it.
+ origRequest.onSuccess();
+ }
+ }
+
+ @Override
+ public boolean isComplete() {
+ return origRequest.isComplete();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java
new file mode 100644
index 0000000..0c9bb81
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+
+/**
+ * {@link IOException} derivative that defines that the remote peer has requested that this
+ * connection be redirected to some alternative peer.
+ */
+public class AmqpRedirectedException extends IOException {
+
+ private static final long serialVersionUID = 5872211116061710369L;
+
+ private final String hostname;
+ private final String networkHost;
+ private final int port;
+
+ public AmqpRedirectedException(String reason, String hostname, String networkHost, int port) {
+ super(reason);
+
+ this.hostname = hostname;
+ this.networkHost = networkHost;
+ this.port = port;
+ }
+
+ /**
+ * @return the host name of the container being redirected to.
+ */
+ public String getHostname() {
+ return hostname;
+ }
+
+ /**
+ * @return the DNS host name or IP address of the peer this connection is being redirected to.
+ */
+ public String getNetworkHost() {
+ return networkHost;
+ }
+
+ /**
+ * @return the port number on the peer this connection is being redirected to.
+ */
+ public int getPort() {
+ return port;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
new file mode 100644
index 0000000..bd66659
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+
+/**
+ * AmqpResource specification.
+ *
+ * All AMQP types should implement this interface to allow for control of state
+ * and configuration details.
+ */
+public interface AmqpResource extends AmqpEventSink {
+
+ /**
+ * Perform all the work needed to open this resource and store the request
+ * until such time as the remote peer indicates the resource has become active.
+ *
+ * @param request The initiating request that triggered this open call.
+ */
+ void open(AsyncResult request);
+
+ /**
+ * @return if the resource has moved to the opened state on the remote.
+ */
+ boolean isOpen();
+
+ /**
+ * Called to indicate that this resource is now remotely opened. Once opened a
+ * resource can start accepting incoming requests.
+ */
+ void opened();
+
+ /**
+ * Perform all work needed to close this resource and store the request
+ * until such time as the remote peer indicates the resource has been closed.
+ *
+ * @param request The initiating request that triggered this close call.
+ */
+ void close(AsyncResult request);
+
+ /**
+ * Perform all work needed to detach this resource and store the request
+ * until such time as the remote peer indicates the resource has been detached.
+ *
+ * @param request The initiating request that triggered this detach call.
+ */
+ void detach(AsyncResult request);
+
+ /**
+ * @return if the resource has moved to the closed state on the remote.
+ */
+ boolean isClosed();
+
+ /**
+ * Called to indicate that this resource is now remotely closed. Once closed a
+ * resource can not accept any incoming requests.
+ */
+ void closed();
+
+ /**
+ * Sets the failed state for this Resource and triggers a failure signal for
+ * any pending ProduverRequest.
+ */
+ void failed();
+
+ /**
+ * Called to indicate that the remote end has become closed but the resource
+ * was not awaiting a close. This could happen during an open request where
+ * the remote does not set an error condition or during normal operation.
+ *
+ * @param connection The connection that owns this resource.
+ */
+ void remotelyClosed(AmqpConnection connection);
+
+ /**
+ * Called to indicate that the local end has become closed but the resource
+ * was not awaiting a close. This could happen during an open request where
+ * the remote does not set an error condition or during normal operation.
+ *
+ * @param connection The connection that owns this resource.
+ * @param error The error that triggered the local close of this resource.
+ */
+ void locallyClosed(AmqpConnection connection, Exception error);
+
+ /**
+ * Sets the failed state for this Resource and triggers a failure signal for
+ * any pending ProduverRequest.
+ *
+ * @param cause The Exception that triggered the failure.
+ */
+ void failed(Exception cause);
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
new file mode 100644
index 0000000..404b943
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import javax.jms.InvalidDestinationException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sender class that manages a Proton sender endpoint.
+ */
+public class AmqpSender extends AmqpAbstractResource<Sender> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
+ private static final byte[] EMPTY_BYTE_ARRAY = new byte[]{};
+
+ public static final long DEFAULT_SEND_TIMEOUT = 15000;
+
+ private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ private final AmqpSession session;
+ private final String address;
+ private final String senderId;
+ private final Target userSpecifiedTarget;
+
+ private boolean presettle;
+ private long sendTimeout = DEFAULT_SEND_TIMEOUT;
+
+ private final Set<Delivery> pending = new LinkedHashSet<>();
+ private byte[] encodeBuffer = new byte[1024 * 8];
+
+ /**
+ * Create a new sender instance.
+ *
+ * @param session The parent session that created the session.
+ * @param address The address that this sender produces to.
+ * @param senderId The unique ID assigned to this sender.
+ */
+ public AmqpSender(AmqpSession session, String address, String senderId) {
+
+ if (address != null && address.isEmpty()) {
+ throw new IllegalArgumentException("Address cannot be empty.");
+ }
+
+ this.session = session;
+ this.address = address;
+ this.senderId = senderId;
+ this.userSpecifiedTarget = null;
+ }
+
+ /**
+ * Create a new sender instance using the given Target when creating the link.
+ *
+ * @param session The parent session that created the session.
+ * @param address The address that this sender produces to.
+ * @param senderId The unique ID assigned to this sender.
+ */
+ public AmqpSender(AmqpSession session, Target target, String senderId) {
+
+ if (target == null) {
+ throw new IllegalArgumentException("User specified Target cannot be null");
+ }
+
+ this.session = session;
+ this.userSpecifiedTarget = target;
+ this.address = target.getAddress();
+ this.senderId = senderId;
+ }
+
+ /**
+ * Sends the given message to this senders assigned address.
+ *
+ * @param message the message to send.
+ * @throws IOException if an error occurs during the send.
+ */
+ public void send(final AmqpMessage message) throws IOException {
+ checkClosed();
+ final ClientFuture sendRequest = new ClientFuture();
+
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ doSend(message, sendRequest);
+ session.pumpToProtonTransport(sendRequest);
+ }
+ catch (Exception e) {
+ sendRequest.onFailure(e);
+ session.getConnection().fireClientException(e);
+ }
+ }
+ });
+
+ if (sendTimeout <= 0) {
+ sendRequest.sync();
+ }
+ else {
+ sendRequest.sync(sendTimeout, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Close the sender, a closed sender will throw exceptions if any further send
+ * calls are made.
+ *
+ * @throws IOException if an error occurs while closing the sender.
+ */
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ final ClientFuture request = new ClientFuture();
+ session.getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ close(request);
+ session.pumpToProtonTransport(request);
+ }
+ });
+
+ request.sync();
+ }
+ }
+
+ /**
+ * @return this session's parent AmqpSession.
+ */
+ public AmqpSession getSession() {
+ return session;
+ }
+
+ /**
+ * @return an unmodifiable view of the underlying Sender instance.
+ */
+ public Sender getSender() {
+ return new UnmodifiableSender(getEndpoint());
+ }
+
+ /**
+ * @return the assigned address of this sender.
+ */
+ public String getAddress() {
+ return address;
+ }
+
+ //----- Sender configuration ---------------------------------------------//
+
+ /**
+ * @return will messages be settle on send.
+ */
+ public boolean isPresettle() {
+ return presettle;
+ }
+
+ /**
+ * Configure is sent messages are marked as settled on send, defaults to false.
+ *
+ * @param presettle configure if this sender will presettle all sent messages.
+ */
+ public void setPresettle(boolean presettle) {
+ this.presettle = presettle;
+ }
+
+ /**
+ * @return the currently configured send timeout.
+ */
+ public long getSendTimeout() {
+ return sendTimeout;
+ }
+
+ /**
+ * Sets the amount of time the sender will block on a send before failing.
+ *
+ * @param sendTimeout time in milliseconds to wait.
+ */
+ public void setSendTimeout(long sendTimeout) {
+ this.sendTimeout = sendTimeout;
+ }
+
+ //----- Private Sender implementation ------------------------------------//
+
+ private void checkClosed() {
+ if (isClosed()) {
+ throw new IllegalStateException("Sender is already closed");
+ }
+ }
+
+ @Override
+ protected void doOpen() {
+
+ Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
+ Source source = new Source();
+ source.setAddress(senderId);
+ source.setOutcomes(outcomes);
+
+ Target target = userSpecifiedTarget;
+ if (target == null) {
+ target = new Target();
+ target.setAddress(address);
+ }
+
+ String senderName = senderId + ":" + address;
+
+ Sender sender = session.getEndpoint().sender(senderName);
+ sender.setSource(source);
+ sender.setTarget(target);
+ if (presettle) {
+ sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+ }
+ else {
+ sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ }
+ sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ setEndpoint(sender);
+
+ super.doOpen();
+ }
+
+ @Override
+ protected void doOpenCompletion() {
+ // Verify the attach response contained a non-null target
+ org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
+ if (t != null) {
+ super.doOpenCompletion();
+ }
+ else {
+ // No link terminus was created, the peer will now detach/close us.
+ }
+ }
+
+ @Override
+ protected void doOpenInspection() {
+ try {
+ getStateInspector().inspectOpenedResource(getSender());
+ }
+ catch (Throwable error) {
+ getStateInspector().markAsInvalid(error.getMessage());
+ }
+ }
+
+ @Override
+ protected void doClosedInspection() {
+ try {
+ getStateInspector().inspectClosedResource(getSender());
+ }
+ catch (Throwable error) {
+ getStateInspector().markAsInvalid(error.getMessage());
+ }
+ }
+
+ @Override
+ protected void doDetachedInspection() {
+ try {
+ getStateInspector().inspectDetachedResource(getSender());
+ }
+ catch (Throwable error) {
+ getStateInspector().markAsInvalid(error.getMessage());
+ }
+ }
+
+ @Override
+ protected Exception getOpenAbortException() {
+ // Verify the attach response contained a non-null target
+ org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
+ if (t != null) {
+ return super.getOpenAbortException();
+ }
+ else {
+ // No link terminus was created, the peer has detach/closed us, create IDE.
+ return new InvalidDestinationException("Link creation was refused");
+ }
+ }
+
+ private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
+ LOG.trace("Producer sending message: {}", message);
+
+ Delivery delivery = null;
+ if (presettle) {
+ delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
+ }
+ else {
+ byte[] tag = tagGenerator.getNextTag();
+ delivery = getEndpoint().delivery(tag, 0, tag.length);
+ }
+
+ delivery.setContext(request);
+
+ if (session.isInTransaction()) {
+ Binary amqpTxId = session.getTransactionId().getRemoteTxId();
+ TransactionalState state = new TransactionalState();
+ state.setTxnId(amqpTxId);
+ delivery.disposition(state);
+ }
+
+ encodeAndSend(message.getWrappedMessage(), delivery);
+
+ if (presettle) {
+ delivery.settle();
+ request.onSuccess();
+ }
+ else {
+ pending.add(delivery);
+ getEndpoint().advance();
+ }
+ }
+
+ private void encodeAndSend(Message message, Delivery delivery) throws IOException {
+
+ int encodedSize;
+ while (true) {
+ try {
+ encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
+ break;
+ }
+ catch (java.nio.BufferOverflowException e) {
+ encodeBuffer = new byte[encodeBuffer.length * 2];
+ }
+ }
+
+ int sentSoFar = 0;
+
+ while (true) {
+ int sent = getEndpoint().send(encodeBuffer, sentSoFar, encodedSize - sentSoFar);
+ if (sent > 0) {
+ sentSoFar += sent;
+ if ((encodedSize - sentSoFar) == 0) {
+ break;
+ }
+ }
+ else {
+ LOG.warn("{} failed to send any data from current Message.", this);
+ }
+ }
+ }
+
+ @Override
+ public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+ List<Delivery> toRemove = new ArrayList<>();
+
+ for (Delivery delivery : pending) {
+ DeliveryState state = delivery.getRemoteState();
+ if (state == null) {
+ continue;
+ }
+
+ Outcome outcome = null;
+ if (state instanceof TransactionalState) {
+ LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
+ outcome = ((TransactionalState) state).getOutcome();
+ }
+ else if (state instanceof Outcome) {
+ outcome = (Outcome) state;
+ }
+ else {
+ LOG.warn("Message send updated with unsupported state: {}", state);
+ outcome = null;
+ }
+
+ AsyncResult request = (AsyncResult) delivery.getContext();
+ Exception deliveryError = null;
+
+ if (outcome instanceof Accepted) {
+ LOG.trace("Outcome of delivery was accepted: {}", delivery);
+ if (request != null && !request.isComplete()) {
+ request.onSuccess();
+ }
+ }
+ else if (outcome instanceof Rejected) {
+ LOG.trace("Outcome of delivery was rejected: {}", delivery);
+ ErrorCondition remoteError = ((Rejected) outcome).getError();
+ if (remoteError == null) {
+ remoteError = getEndpoint().getRemoteCondition();
+ }
+
+ deliveryError = AmqpSupport.convertToException(remoteError);
+ }
+ else if (outcome instanceof Released) {
+ LOG.trace("Outcome of delivery was released: {}", delivery);
+ deliveryError = new IOException("Delivery failed: released by receiver");
+ }
+ else if (outcome instanceof Modified) {
+ LOG.trace("Outcome of delivery was modified: {}", delivery);
+ deliveryError = new IOException("Delivery failed: failure at remote");
+ }
+
+ if (deliveryError != null) {
+ if (request != null && !request.isComplete()) {
+ request.onFailure(deliveryError);
+ }
+ else {
+ connection.fireClientException(deliveryError);
+ }
+ }
+
+ tagGenerator.returnTag(delivery.getTag());
+ delivery.settle();
+ toRemove.add(delivery);
+ }
+
+ pending.removeAll(toRemove);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{ address = " + address + "}";
+ }
+}