You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/13 20:48:26 UTC
[3/3] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5602
https://issues.apache.org/jira/browse/AMQ-5602
Functional client with added tests to start to cover various
expectations of an AMQP broker and some tests for expectations of a JMS
mapping compliant broker.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/72839b78
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/72839b78
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/72839b78
Branch: refs/heads/master
Commit: 72839b78a727bdec96dbd0a824a9f39a745b4d87
Parents: 10c47d6
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 13 15:47:30 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Mar 13 15:47:30 2015 -0400
----------------------------------------------------------------------
.../transport/amqp/AmqpProtocolConverter.java | 92 +--
.../activemq/transport/amqp/AmqpSupport.java | 130 +++-
.../transport/amqp/AmqpTestSupport.java | 48 +-
.../amqp/client/AmqpAbstractResource.java | 314 ++++++++++
.../transport/amqp/client/AmqpClient.java | 240 ++++++++
.../amqp/client/AmqpClientListener.java | 32 +
.../amqp/client/AmqpClientTestSupport.java | 77 +++
.../transport/amqp/client/AmqpConnection.java | 532 ++++++++++++++++
.../amqp/client/AmqpDefaultClientListener.java | 28 +
.../amqp/client/AmqpJmsSelectorType.java | 47 ++
.../transport/amqp/client/AmqpMessage.java | 179 ++++++
.../transport/amqp/client/AmqpNoLocalType.java | 44 ++
.../transport/amqp/client/AmqpReceiver.java | 599 +++++++++++++++++++
.../transport/amqp/client/AmqpResource.java | 163 +++++
.../transport/amqp/client/AmqpSender.java | 382 ++++++++++++
.../transport/amqp/client/AmqpSession.java | 168 ++++++
.../amqp/client/AmqpStateInspector.java | 88 +++
.../amqp/client/AmqpTransferTagGenerator.java | 103 ++++
.../amqp/client/sasl/AbstractMechanism.java | 80 +++
.../amqp/client/sasl/AnonymousMechanism.java | 43 ++
.../amqp/client/sasl/CramMD5Mechanism.java | 86 +++
.../transport/amqp/client/sasl/Mechanism.java | 125 ++++
.../amqp/client/sasl/PlainMechanism.java | 62 ++
.../amqp/client/sasl/SaslAuthenticator.java | 163 +++++
.../transport/amqp/client/util/AsyncResult.java | 47 ++
.../amqp/client/util/ClientFuture.java | 102 ++++
.../amqp/client/util/ClientTcpTransport.java | 384 ++++++++++++
.../client/util/UnmodifiableConnection.java | 179 ++++++
.../amqp/client/util/UnmodifiableDelivery.java | 147 +++++
.../amqp/client/util/UnmodifiableLink.java | 248 ++++++++
.../amqp/client/util/UnmodifiableReceiver.java | 59 ++
.../amqp/client/util/UnmodifiableSender.java | 45 ++
.../amqp/client/util/UnmodifiableSession.java | 134 +++++
.../amqp/client/util/WrappedAsyncResult.java | 59 ++
.../amqp/interop/AmqpConnectionsTest.java | 170 ++++++
.../amqp/interop/AmqpReceiverTest.java | 285 +++++++++
.../transport/amqp/interop/AmqpSenderTest.java | 94 +++
.../transport/amqp/interop/AmqpSessionTest.java | 40 ++
38 files changed, 5730 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index b9b2ff2..39c8c2b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -16,6 +16,20 @@
*/
package org.apache.activemq.transport.amqp;
+import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX;
+import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
+import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes;
+import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
@@ -90,7 +104,6 @@ import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Modified;
@@ -136,19 +149,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private static final int CHANNEL_MAX = 32767;
- private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
- private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
- private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
- private static final Symbol COPY = Symbol.getSymbol("copy");
- private static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
- private static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
- private static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[] { JMS_SELECTOR_CODE, JMS_SELECTOR_NAME };
- private static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L);
- private static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:selector-filter:string");
- private static final Object[] NO_LOCAL_FILTER_IDS = new Object[] { NO_LOCAL_CODE, NO_LOCAL_NAME };
- private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
- private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
- private static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
private final AmqpTransport amqpTransport;
private final AmqpWireFormat amqpWireFormat;
@@ -874,17 +874,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private final AtomicLong nextTransactionId = new AtomicLong();
- public byte[] toBytes(long value) {
- Buffer buffer = new Buffer(8);
- buffer.bigEndianEditor().writeLong(value);
- return buffer.data;
- }
-
- private long toLong(Binary value) {
- Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength());
- return buffer.bigEndianEditor().readLong();
- }
-
AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
@Override
@@ -946,7 +935,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
Rejected rejected = new Rejected();
- rejected.setError(createErrorCondition("failed", er.getException().getMessage()));
+ rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage()));
delivery.disposition(rejected);
} else {
delivery.disposition(Accepted.getInstance());
@@ -1639,46 +1628,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
});
}
- private boolean contains(Symbol[] symbols, Symbol key) {
- if (symbols == null || symbols.length == 0) {
- return false;
- }
-
- for (Symbol symbol : symbols) {
- if (symbol.equals(key)) {
- return true;
- }
- }
-
- return false;
- }
-
- private DescribedType findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
-
- if (filterIds == null || filterIds.length == 0) {
- throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds);
- }
-
- if (filters == null || filters.isEmpty()) {
- return null;
- }
-
- for (Object value : filters.values()) {
- if (value instanceof DescribedType) {
- DescribedType describedType = ((DescribedType) value);
- Object descriptor = describedType.getDescriptor();
-
- for (Object filterId : filterIds) {
- if (descriptor.equals(filterId)) {
- return describedType;
- }
- }
- }
- }
-
- return null;
- }
-
// //////////////////////////////////////////////////////////////////////////
//
// Implementation methods
@@ -1707,17 +1656,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
- ErrorCondition createErrorCondition(String name) {
- return createErrorCondition(name, "");
- }
-
- ErrorCondition createErrorCondition(String name, String description) {
- ErrorCondition condition = new ErrorCondition();
- condition.setCondition(Symbol.valueOf(name));
- condition.setDescription(description);
- return condition;
- }
-
@Override
public void setPrefetch(int prefetch) {
this.prefetch = prefetch;
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
index 9a01f7b..c0cfb94 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -17,19 +17,116 @@
package org.apache.activemq.transport.amqp;
import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
import org.fusesource.hawtbuf.Buffer;
/**
- *
+ * Set of useful methods and definitions used in the AMQP protocol handling
*/
public class AmqpSupport {
- static public Buffer toBuffer(ByteBuffer data) {
+ // Identification values used to locating JMS selector types.
+ public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
+ public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
+ public static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[] { JMS_SELECTOR_CODE, JMS_SELECTOR_NAME };
+ public static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L);
+ public static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:selector-filter:string");
+ public static final Object[] NO_LOCAL_FILTER_IDS = new Object[] { NO_LOCAL_CODE, NO_LOCAL_NAME };
+
+ // Capabilities used to identify destination type in some requests.
+ public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
+ public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
+
+ // Symbols used to announce connection information to remote peer.
+ public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+ public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+ public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
+ public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
+
+ // Symbols used in configuration of newly opened links.
+ public static final Symbol COPY = Symbol.getSymbol("copy");
+
+ /**
+ * Search for a given Symbol in a given array of Symbol object.
+ *
+ * @param symbols
+ * the set of Symbols to search.
+ * @param key
+ * the value to try and find in the Symbol array.
+ *
+ * @return true if the key is found in the given Symbol array.
+ */
+ public static boolean contains(Symbol[] symbols, Symbol key) {
+ if (symbols == null || symbols.length == 0) {
+ return false;
+ }
+
+ for (Symbol symbol : symbols) {
+ if (symbol.equals(key)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Search for a particular filter using a set of known indentification values
+ * in the Map of filters.
+ *
+ * @param filters
+ * The filters map that should be searched.
+ * @param filterIds
+ * The aliases for the target filter to be located.
+ *
+ * @return the filter if found in the mapping or null if not found.
+ */
+ public static DescribedType findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
+
+ if (filterIds == null || filterIds.length == 0) {
+ throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds);
+ }
+
+ if (filters == null || filters.isEmpty()) {
+ return null;
+ }
+
+ for (Object value : filters.values()) {
+ if (value instanceof DescribedType) {
+ DescribedType describedType = ((DescribedType) value);
+ Object descriptor = describedType.getDescriptor();
+
+ for (Object filterId : filterIds) {
+ if (descriptor.equals(filterId)) {
+ return describedType;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Conversion from Java ByteBuffer to a HawtBuf buffer.
+ *
+ * @param data
+ * the ByteBuffer instance to convert.
+ *
+ * @return a new HawtBuf buffer converted from the given ByteBuffer.
+ */
+ public static Buffer toBuffer(ByteBuffer data) {
if (data == null) {
return null;
}
+
Buffer rc;
+
if (data.isDirect()) {
rc = new Buffer(data.remaining());
data.get(rc.data);
@@ -37,6 +134,35 @@ public class AmqpSupport {
rc = new Buffer(data);
data.position(data.position() + data.remaining());
}
+
return rc;
}
+
+ /**
+ * Given a long value, convert it to a byte array for marshalling.
+ *
+ * @param value
+ * the value to convert.
+ *
+ * @return a new byte array that holds the big endian value of the long.
+ */
+ public static byte[] toBytes(long value) {
+ Buffer buffer = new Buffer(8);
+ buffer.bigEndianEditor().writeLong(value);
+ return buffer.data;
+ }
+
+ /**
+ * Converts a Binary value to a long assuming that the contained value is
+ * stored in Big Endian encoding.
+ *
+ * @param value
+ * the Binary object whose payload is converted to a long.
+ *
+ * @return a long value constructed from the bytes of the Binary instance.
+ */
+ public static long toLong(Binary value) {
+ Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength());
+ return buffer.bigEndianEditor().readLong();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index e20168c..33ae799 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -36,6 +36,7 @@ import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
@@ -242,18 +243,47 @@ public class AmqpTestSupport {
LOG.info("========== tearDown " + getTestName() + " ==========");
}
- public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = session.createProducer(destination);
+ public Connection createJMSConnection() throws JMSException {
+ if (!isUseOpenWireConnector()) {
+ throw new javax.jms.IllegalStateException("OpenWire TransportConnector was not configured.");
+ }
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireURI);
+
+ return factory.createConnection();
+ }
- for (int i = 1; i <= count; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("TextMessage: " + i);
- message.setIntProperty(MESSAGE_NUMBER, i);
- p.send(message);
+ public void sendMessages(String destinationName, int count, boolean topic) throws Exception {
+ Connection connection = createJMSConnection();
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = null;
+ if (topic) {
+ destination = session.createTopic(destinationName);
+ } else {
+ destination = session.createQueue(destinationName);
+ }
+
+ sendMessages(connection, destination, count);
+ } finally {
+ connection.close();
}
+ }
- session.close();
+ public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try {
+ MessageProducer p = session.createProducer(destination);
+
+ for (int i = 1; i <= count; i++) {
+ TextMessage message = session.createTextMessage();
+ message.setText("TextMessage: " + i);
+ message.setIntProperty(MESSAGE_NUMBER, i);
+ p.send(message);
+ }
+ } finally {
+ session.close();
+ }
}
public String getTestName() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
new file mode 100644
index 0000000..8a5a587
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base for all AmqpResource implementations to extend.
+ *
+ * This abstract class wraps up the basic state management bits so that the concrete
+ * object don't have to reproduce it. Provides hooks for the subclasses to initialize
+ * and shutdown.
+ */
+public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class);
+
+ protected AsyncResult openRequest;
+ protected AsyncResult closeRequest;
+
+ private AmqpStateInspector amqpStateInspector = new AmqpStateInspector();
+
+ private E endpoint;
+
+ @Override
+ public void open(AsyncResult request) {
+ this.openRequest = request;
+ doOpen();
+ getEndpoint().setContext(this);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return getEndpoint().getRemoteState() == EndpointState.ACTIVE;
+ }
+
+ @Override
+ public void opened() {
+ if (this.openRequest != null) {
+ this.openRequest.onSuccess();
+ this.openRequest = null;
+ }
+ }
+
+ @Override
+ public void close(AsyncResult request) {
+ // If already closed signal success or else the caller might never get notified.
+ if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
+ getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+
+ if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
+ // Remote already closed this resource, close locally and free.
+ if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
+ doClose();
+ getEndpoint().free();
+ }
+ }
+
+ request.onSuccess();
+ return;
+ }
+
+ this.closeRequest = request;
+ doClose();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return getEndpoint().getLocalState() == EndpointState.CLOSED;
+ }
+
+ @Override
+ public void closed() {
+ getEndpoint().close();
+ getEndpoint().free();
+
+ if (this.closeRequest != null) {
+ this.closeRequest.onSuccess();
+ this.closeRequest = null;
+ }
+ }
+
+ @Override
+ public void failed() {
+ failed(new Exception("Remote request failed."));
+ }
+
+ @Override
+ public void failed(Exception cause) {
+ if (openRequest != null) {
+ if (endpoint != null) {
+ // TODO: if this is a producer/consumer link then we may only be detached,
+ // rather than fully closed, and should respond appropriately.
+ endpoint.close();
+ }
+ openRequest.onFailure(cause);
+ openRequest = null;
+ }
+
+ if (closeRequest != null) {
+ closeRequest.onFailure(cause);
+ closeRequest = null;
+ }
+ }
+
+ @Override
+ public void remotelyClosed(AmqpConnection connection) {
+ Exception error = getRemoteError();
+ if (error == null) {
+ error = new IOException("Remote has closed without error information");
+ }
+
+ if (endpoint != null) {
+ // TODO: if this is a producer/consumer link then we may only be detached,
+ // rather than fully closed, and should respond appropriately.
+ endpoint.close();
+ }
+
+ LOG.info("Resource {} was remotely closed", this);
+
+ connection.fireClientException(error);
+ }
+
+ public E getEndpoint() {
+ return this.endpoint;
+ }
+
+ public void setEndpoint(E endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public AmqpStateInspector getStateInspector() {
+ return amqpStateInspector;
+ }
+
+ public void setStateInspector(AmqpStateInspector stateInspector) {
+ if (stateInspector == null) {
+ stateInspector = new AmqpStateInspector();
+ }
+
+ this.amqpStateInspector = stateInspector;
+ }
+
+ public EndpointState getLocalState() {
+ if (getEndpoint() == null) {
+ return EndpointState.UNINITIALIZED;
+ }
+ return getEndpoint().getLocalState();
+ }
+
+ public EndpointState getRemoteState() {
+ if (getEndpoint() == null) {
+ return EndpointState.UNINITIALIZED;
+ }
+ return getEndpoint().getRemoteState();
+ }
+
+ @Override
+ public boolean hasRemoteError() {
+ return getEndpoint().getRemoteCondition().getCondition() != null;
+ }
+
+ @Override
+ public Exception getRemoteError() {
+ String message = getRemoteErrorMessage();
+ Exception remoteError = null;
+ Symbol error = getEndpoint().getRemoteCondition().getCondition();
+ if (error != null) {
+ if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+ remoteError = new SecurityException(message);
+ } else {
+ remoteError = new Exception(message);
+ }
+ }
+
+ return remoteError;
+ }
+
+ @Override
+ public String getRemoteErrorMessage() {
+ String message = "Received unkown error from remote peer";
+ if (getEndpoint().getRemoteCondition() != null) {
+ ErrorCondition error = getEndpoint().getRemoteCondition();
+ if (error.getDescription() != null && !error.getDescription().isEmpty()) {
+ message = error.getDescription();
+ }
+ }
+
+ return message;
+ }
+
+ @Override
+ public void processRemoteOpen(AmqpConnection connection) throws IOException {
+ doOpenInspection();
+ doOpenCompletion();
+ }
+
+ @Override
+ public void processRemoteDetach(AmqpConnection connection) throws IOException {
+ doDetachedInspection();
+ if (isAwaitingClose()) {
+ LOG.debug("{} is now closed: ", this);
+ closed();
+ } else {
+ remotelyClosed(connection);
+ }
+ }
+
+ @Override
+ public void processRemoteClose(AmqpConnection connection) throws IOException {
+ doClosedInspection();
+ if (isAwaitingClose()) {
+ LOG.debug("{} is now closed: ", this);
+ closed();
+ } else if (isAwaitingOpen()) {
+ // Error on Open, create exception and signal failure.
+ LOG.warn("Open of {} failed: ", this);
+ Exception openError;
+ if (hasRemoteError()) {
+ openError = getRemoteError();
+ } else {
+ openError = getOpenAbortException();
+ }
+
+ failed(openError);
+ } else {
+ remotelyClosed(connection);
+ }
+ }
+
+ @Override
+ public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+ }
+
+ @Override
+ public void processFlowUpdates(AmqpConnection connection) throws IOException {
+ }
+
+ /**
+ * Perform the open operation on the managed endpoint. A subclass may
+ * override this method to provide additional open actions or configuration
+ * updates.
+ */
+ protected void doOpen() {
+ getEndpoint().open();
+ }
+
+ /**
+ * Perform the close operation on the managed endpoint. A subclass may
+ * override this method to provide additional close actions or alter the
+ * standard close path such as endpoint detach etc.
+ */
+ protected void doClose() {
+ getEndpoint().close();
+ }
+
+ /**
+ * Complete the open operation on the managed endpoint. A subclass may
+ * override this method to provide additional verification actions or configuration
+ * updates.
+ */
+ protected void doOpenCompletion() {
+ LOG.debug("{} is now open: ", this);
+ opened();
+ }
+
+ /**
+ * When aborting the open operation, and there isnt an error condition,
+ * provided by the peer, the returned exception will be used instead.
+ * A subclass may override this method to provide alternative behaviour.
+ */
+ protected Exception getOpenAbortException() {
+ return new IOException("Open failed unexpectedly.");
+ }
+
+ // TODO - Fina a more generic way to do this.
+ protected abstract void doOpenInspection();
+ protected abstract void doClosedInspection();
+
+ protected void doDetachedInspection() {}
+
+ //----- Private implementation utility methods ---------------------------//
+
+ private boolean isAwaitingOpen() {
+ return this.openRequest != null;
+ }
+
+ private boolean isAwaitingClose() {
+ return this.closeRequest != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
new file mode 100644
index 0000000..0b299e4
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connection instance used to connect to the Broker using Proton as
+ * the AMQP protocol handler.
+ */
+public class AmqpClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpClient.class);
+
+ private final String username;
+ private final String password;
+ private final URI remoteURI;
+
+ private AmqpStateInspector stateInspector = new AmqpStateInspector();
+ private List<Symbol> offeredCapabilities = Collections.emptyList();
+ private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
+
+ /**
+ * Creates an AmqpClient instance which can be used as a factory for connections.
+ *
+ * @param remoteURI
+ * The address of the remote peer to connect to.
+ * @param username
+ * The user name to use when authenticating the client.
+ * @param password
+ * The password to use when authenticating the client.
+ */
+ public AmqpClient(URI remoteURI, String username, String password) {
+ this.remoteURI = remoteURI;
+ this.password = password;
+ this.username = username;
+ }
+
+ /**
+ * Creates a connection with the broker at the given location, this method initiates a
+ * connect attempt immediately and will fail if the remote peer cannot be reached.
+ *
+ * @returns a new connection object used to interact with the connected peer.
+ *
+ * @throws Exception if an error occurs attempting to connect to the Broker.
+ */
+ public AmqpConnection connect() throws Exception {
+
+ AmqpConnection connection = createConnection();
+
+ LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
+ connection.connect();
+
+ return connection;
+ }
+
+ /**
+ * Creates a connection object using the configured values for user, password, remote URI
+ * etc. This method does not immediately initiate a connection to the remote leaving that
+ * to the caller which provides a connection object that can have additional configuration
+ * changes applied before the <code>connect</code> method is invoked.
+ *
+ * @returns a new connection object used to interact with the connected peer.
+ *
+ * @throws Exception if an error occurs attempting to connect to the Broker.
+ */
+ public AmqpConnection createConnection() throws Exception {
+ if (username == null && password != null) {
+ throw new IllegalArgumentException("Password must be null if user name value is null");
+ }
+
+ ClientTcpTransport transport = null;
+
+ if (remoteURI.getScheme().equals("tcp")) {
+ transport = new ClientTcpTransport(remoteURI);
+ } else {
+ throw new IllegalArgumentException("Client only support TCP currently.");
+ }
+
+ AmqpConnection connection = new AmqpConnection(transport, username, password);
+
+ connection.setOfferedCapabilities(getOfferedCapabilities());
+ connection.setOfferedProperties(getOfferedProperties());
+ connection.setStateInspector(getStateInspector());
+
+ return connection;
+ }
+
+ /**
+ * @return the user name value given when connect was called, always null before connect.
+ */
+ public String getUsername() {
+ return username;
+ }
+
+ /**
+ * @return the password value given when connect was called, always null before connect.
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * @return the currently set address to use to connect to the AMQP peer.
+ */
+ public URI getRemoteURI() {
+ return remoteURI;
+ }
+
+ /**
+ * Sets the offered capabilities that should be used when a new connection attempt
+ * is made.
+ *
+ * @param offeredCapabilities
+ * the list of capabilities to offer when connecting.
+ */
+ public void setOfferedCapabilities(List<Symbol> offeredCapabilities) {
+ if (offeredCapabilities != null) {
+ offeredCapabilities = Collections.emptyList();
+ }
+
+ this.offeredCapabilities = offeredCapabilities;
+ }
+
+ /**
+ * @return an unmodifiable view of the currently set offered capabilities
+ */
+ public List<Symbol> getOfferedCapabilities() {
+ return Collections.unmodifiableList(offeredCapabilities);
+ }
+
+ /**
+ * Sets the offered connection properties that should be used when a new connection
+ * attempt is made.
+ *
+ * @param connectionProperties
+ * the map of properties to offer when connecting.
+ */
+ public void setOfferedProperties(Map<Symbol, Object> offeredProperties) {
+ if (offeredProperties != null) {
+ offeredProperties = Collections.emptyMap();
+ }
+
+ this.offeredProperties = offeredProperties;
+ }
+
+ /**
+ * @return an unmodifiable view of the currently set connection properties.
+ */
+ public Map<Symbol, Object> getOfferedProperties() {
+ return Collections.unmodifiableMap(offeredProperties);
+ }
+
+ /**
+ * @return the currently set state inspector used to check state after various events.
+ */
+ public AmqpStateInspector getStateInspector() {
+ return stateInspector;
+ }
+
+ /**
+ * Sets the state inspector used to check that the AMQP resource is valid after
+ * specific lifecycle events such as open and close.
+ *
+ * @param stateInspector
+ * the new state inspector to use.
+ */
+ public void setStateInspector(AmqpStateInspector stateInspector) {
+ if (stateInspector == null) {
+ stateInspector = new AmqpStateInspector();
+ }
+
+ this.stateInspector = stateInspector;
+ }
+
+ @Override
+ public String toString() {
+ return "AmqpClient: " + getRemoteURI().getHost() + ":" + getRemoteURI().getPort();
+ }
+
+ /**
+ * Creates an anonymous connection with the broker at the given location.
+ *
+ * @param broker
+ * the address of the remote broker instance.
+ *
+ * @returns a new connection object used to interact with the connected peer.
+ *
+ * @throws Exception if an error occurs attempting to connect to the Broker.
+ */
+ public static AmqpConnection connect(URI broker) throws Exception {
+ return connect(broker, null, null);
+ }
+
+ /**
+ * Creates a connection with the broker at the given location.
+ *
+ * @param broker
+ * the address of the remote broker instance.
+ * @param username
+ * the user name to use to connect to the broker or null for anonymous.
+ * @param password
+ * the password to use to connect to the broker, must be null if user name is null.
+ *
+ * @returns a new connection object used to interact with the connected peer.
+ *
+ * @throws Exception if an error occurs attempting to connect to the Broker.
+ */
+ public static AmqpConnection connect(URI broker, String username, String password) throws Exception {
+ if (username == null && password != null) {
+ throw new IllegalArgumentException("Password must be null if user name value is null");
+ }
+
+ AmqpClient client = new AmqpClient(broker, username, password);
+
+ return client.connect();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java
new file mode 100644
index 0000000..3df7cf4
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+/**
+ * Events points exposed by the AmqpClient object.
+ */
+public interface AmqpClientListener {
+
+ /**
+ * Indicates some error has occurred during client operations.
+ *
+ * @param ex
+ * The error that triggered this event.
+ */
+ void onClientException(Throwable ex);
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
new file mode 100644
index 0000000..4d3f571
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.net.URI;
+
+import org.apache.activemq.transport.amqp.AmqpTestSupport;
+
+/**
+ * Test support class for tests that will be using the AMQP Proton wrapper client.
+ */
+public class AmqpClientTestSupport extends AmqpTestSupport {
+
+ public String getAmqpConnectionURIOptions() {
+ return "";
+ }
+
+ public URI getBrokerAmqpConnectionURI() {
+ try {
+ String uri = "tcp://127.0.0.1:" + amqpPort;
+
+ if (!getAmqpConnectionURIOptions().isEmpty()) {
+ uri = uri + "?" + getAmqpConnectionURIOptions();
+ }
+
+ return new URI(uri);
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ public AmqpConnection createAmqpConnection() throws Exception {
+ return createAmqpConnection(getBrokerAmqpConnectionURI());
+ }
+
+ public AmqpConnection createAmqpConnection(String username, String password) throws Exception {
+ return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
+ }
+
+ public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
+ return createAmqpConnection(brokerURI, null, null);
+ }
+
+ public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
+ return createAmqpClient(brokerURI, username, password).connect();
+ }
+
+ public AmqpClient createAmqpClient() throws Exception {
+ return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
+ }
+
+ public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
+ return createAmqpClient(brokerURI, null, null);
+ }
+
+ public AmqpClient createAmqpClient(String username, String password) throws Exception {
+ return createAmqpClient(getBrokerAmqpConnectionURI(), username, password);
+ }
+
+ public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
+ return new AmqpClient(brokerURI, username, password);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
new file mode 100644
index 0000000..a98f711
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Event.Type;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AmqpConnection extends AmqpAbstractResource<Connection> implements ClientTcpTransport.TransportListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
+
+ private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1;
+ // NOTE: Limit default channel max to signed short range to deal with
+ // brokers that don't currently handle the unsigned range well.
+ private static final int DEFAULT_CHANNEL_MAX = 32767;
+ private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
+ public static final long DEFAULT_CONNECT_TIMEOUT = 15000;
+ public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
+
+ private final ScheduledExecutorService serializer;
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicLong sessionIdGenerator = new AtomicLong();
+ private final Collector protonCollector = new CollectorImpl();
+ private final ClientTcpTransport transport;
+ private final Transport protonTransport = Transport.Factory.create();
+
+ private final String username;
+ private final String password;
+ private final URI remoteURI;
+ private final String connectionId;
+ private List<Symbol> offeredCapabilities = Collections.emptyList();
+ private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
+
+ private AmqpClientListener listener;
+ private SaslAuthenticator authenticator;
+
+ private String containerId;
+ private boolean authenticated;
+ private int channelMax = DEFAULT_CHANNEL_MAX;
+ private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+ private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
+
+ public AmqpConnection(ClientTcpTransport transport, String username, String password) {
+ setEndpoint(Connection.Factory.create());
+ getEndpoint().collect(protonCollector);
+
+ this.transport = transport;
+ this.username = username;
+ this.password = password;
+ this.connectionId = CONNECTION_ID_GENERATOR.generateId();
+ this.remoteURI = transport.getRemoteURI();
+
+ this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable runner) {
+ Thread serial = new Thread(runner);
+ serial.setDaemon(true);
+ serial.setName(toString());
+ return serial;
+ }
+ });
+
+ this.transport.setTransportListener(this);
+ }
+
+ public void connect() throws Exception {
+ if (connected.compareAndSet(false, true)) {
+ transport.connect();
+
+ final ClientFuture future = new ClientFuture();
+ serializer.execute(new Runnable() {
+ @Override
+ public void run() {
+ getEndpoint().setContainer(safeGetContainerId());
+ getEndpoint().setHostname(remoteURI.getHost());
+ if (!getOfferedCapabilities().isEmpty()) {
+ getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0]));
+ }
+ if (!getOfferedProperties().isEmpty()) {
+ getEndpoint().setProperties(getOfferedProperties());
+ }
+
+ protonTransport.setMaxFrameSize(getMaxFrameSize());
+ protonTransport.setChannelMax(getChannelMax());
+ protonTransport.bind(getEndpoint());
+ Sasl sasl = protonTransport.sasl();
+ if (sasl != null) {
+ sasl.client();
+ }
+ authenticator = new SaslAuthenticator(sasl, username, password);
+ open(future);
+
+ pumpToProtonTransport();
+ }
+ });
+
+ if (connectTimeout <= 0) {
+ future.sync();
+ } else {
+ future.sync(connectTimeout, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ public boolean isConnected() {
+ return transport.isConnected() && connected.get();
+ }
+
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ final ClientFuture request = new ClientFuture();
+ serializer.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+
+ // If we are not connected then there is nothing we can do now
+ // just signal success.
+ if (!transport.isConnected()) {
+ request.onSuccess();
+ }
+
+ if (getEndpoint() != null) {
+ close(request);
+ } else {
+ request.onSuccess();
+ }
+
+ pumpToProtonTransport();
+ } catch (Exception e) {
+ LOG.debug("Caught exception while closing proton connection");
+ }
+ }
+ });
+
+ try {
+ if (closeTimeout <= 0) {
+ request.sync();
+ } else {
+ request.sync(closeTimeout, TimeUnit.MILLISECONDS);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error caught while closing Provider: ", e.getMessage());
+ } finally {
+ if (transport != null) {
+ try {
+ transport.close();
+ } catch (Exception e) {
+ LOG.debug("Cuaght exception while closing down Transport: {}", e.getMessage());
+ }
+ }
+
+ serializer.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Creates a new Session instance used to create AMQP resources like
+ * senders and receivers.
+ *
+ * @return a new AmqpSession that can be used to create links.
+ *
+ * @throws Exception if an error occurs during creation.
+ */
+ public AmqpSession createSession() throws Exception {
+ checkClosed();
+
+ final AmqpSession session = new AmqpSession(AmqpConnection.this, getNextSessionId());
+ final ClientFuture request = new ClientFuture();
+
+ serializer.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+
+ session.setEndpoint(getEndpoint().session());
+ session.open(request);
+
+ pumpToProtonTransport();
+ }
+ });
+
+ request.sync();
+
+ return session;
+ }
+
+ //----- Configuration accessors ------------------------------------------//
+
+ /**
+ * @return the user name that was used to authenticate this connection.
+ */
+ public String getUsername() {
+ return username;
+ }
+
+ /**
+ * @return the password that was used to authenticate this connection.
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * @return the URI of the remote peer this connection attached to.
+ */
+ public URI getRemoteURI() {
+ return remoteURI;
+ }
+
+ /**
+ * @return the container ID that will be set as the container Id.
+ */
+ public String getContainerId() {
+ return this.containerId;
+ }
+
+ /**
+ * Sets the container Id that will be configured on the connection prior to
+ * connecting to the remote peer. Calling this after connect has no effect.
+ *
+ * @param containerId
+ * the container Id to use on the connection.
+ */
+ public void setContainerId(String containerId) {
+ this.containerId = containerId;
+ }
+
+ /**
+ * @return the currently set Max Frame Size value.
+ */
+ public int getMaxFrameSize() {
+ return DEFAULT_MAX_FRAME_SIZE;
+ }
+
+ public int getChannelMax() {
+ return channelMax;
+ }
+
+ public void setChannelMax(int channelMax) {
+ this.channelMax = channelMax;
+ }
+
+ public long getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(long connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public long getCloseTimeout() {
+ return closeTimeout;
+ }
+
+ public void setCloseTimeout(long closeTimeout) {
+ this.closeTimeout = closeTimeout;
+ }
+
+ public List<Symbol> getOfferedCapabilities() {
+ return offeredCapabilities;
+ }
+
+ public void setOfferedCapabilities(List<Symbol> offeredCapabilities) {
+ if (offeredCapabilities != null) {
+ offeredCapabilities = Collections.emptyList();
+ }
+
+ this.offeredCapabilities = offeredCapabilities;
+ }
+
+ public Map<Symbol, Object> getOfferedProperties() {
+ return offeredProperties;
+ }
+
+ public void setOfferedProperties(Map<Symbol, Object> offeredProperties) {
+ if (offeredProperties != null) {
+ offeredProperties = Collections.emptyMap();
+ }
+
+ this.offeredProperties = offeredProperties;
+ }
+
+ public Connection getConnection() {
+ return new UnmodifiableConnection(getEndpoint());
+ }
+
+ //----- Internal getters used from the child AmqpResource classes --------//
+
+ ScheduledExecutorService getScheduler() {
+ return this.serializer;
+ }
+
+ Connection getProtonConnection() {
+ return getEndpoint();
+ }
+
+ void pumpToProtonTransport() {
+ try {
+ boolean done = false;
+ while (!done) {
+ ByteBuffer toWrite = protonTransport.getOutputBuffer();
+ if (toWrite != null && toWrite.hasRemaining()) {
+ transport.send(toWrite);
+ protonTransport.outputConsumed();
+ } else {
+ done = true;
+ }
+ }
+ } catch (IOException e) {
+ fireClientException(e);
+ }
+ }
+
+ //----- Transport listener event hooks -----------------------------------//
+
+ @Override
+ public void onData(final Buffer input) {
+ serializer.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ ByteBuffer source = input.toByteBuffer();
+ LOG.trace("Received from Broker {} bytes:", source.remaining());
+
+ do {
+ ByteBuffer buffer = protonTransport.getInputBuffer();
+ int limit = Math.min(buffer.remaining(), source.remaining());
+ ByteBuffer duplicate = source.duplicate();
+ duplicate.limit(source.position() + limit);
+ buffer.put(duplicate);
+ protonTransport.processInput();
+ source.position(source.position() + limit);
+ } while (source.hasRemaining());
+
+ // Process the state changes from the latest data and then answer back
+ // any pending updates to the Broker.
+ processUpdates();
+ pumpToProtonTransport();
+ }
+ });
+ }
+
+ @Override
+ public void onTransportClosed() {
+ LOG.debug("The transport has unexpectedly closed");
+ }
+
+ @Override
+ public void onTransportError(Throwable cause) {
+ fireClientException(cause);
+ }
+
+ //----- Internal implementation ------------------------------------------//
+
+ @Override
+ protected void doOpenCompletion() {
+ // If the remote indicates that a close is pending, don't open.
+ if (!getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
+ super.doOpenCompletion();
+ }
+ }
+
+ @Override
+ protected void doOpenInspection() {
+ getStateInspector().inspectOpenedResource(getConnection());
+ }
+
+ @Override
+ protected void doClosedInspection() {
+ getStateInspector().inspectClosedResource(getConnection());
+ }
+
+ protected void fireClientException(Throwable ex) {
+ AmqpClientListener listener = this.listener;
+ if (listener != null) {
+ listener.onClientException(ex);
+ }
+ }
+
+ protected void checkClosed() throws IllegalStateException {
+ if (closed.get()) {
+ throw new IllegalStateException("The Connection is already closed");
+ }
+ }
+
+ private void processUpdates() {
+ try {
+ Event protonEvent = null;
+ while ((protonEvent = protonCollector.peek()) != null) {
+ if (!protonEvent.getType().equals(Type.TRANSPORT)) {
+ LOG.trace("New Proton Event: {}", protonEvent.getType());
+ }
+
+ AmqpResource amqpResource = null;
+ switch (protonEvent.getType()) {
+ case CONNECTION_REMOTE_CLOSE:
+ amqpResource = (AmqpConnection) protonEvent.getConnection().getContext();
+ amqpResource.processRemoteClose(this);
+ break;
+ case CONNECTION_REMOTE_OPEN:
+ amqpResource = (AmqpConnection) protonEvent.getConnection().getContext();
+ amqpResource.processRemoteOpen(this);
+ break;
+ case SESSION_REMOTE_CLOSE:
+ amqpResource = (AmqpSession) protonEvent.getSession().getContext();
+ amqpResource.processRemoteClose(this);
+ break;
+ case SESSION_REMOTE_OPEN:
+ amqpResource = (AmqpSession) protonEvent.getSession().getContext();
+ amqpResource.processRemoteOpen(this);
+ break;
+ case LINK_REMOTE_CLOSE:
+ amqpResource = (AmqpResource) protonEvent.getLink().getContext();
+ amqpResource.processRemoteClose(this);
+ break;
+ case LINK_REMOTE_DETACH:
+ amqpResource = (AmqpResource) protonEvent.getLink().getContext();
+ amqpResource.processRemoteDetach(this);
+ break;
+ case LINK_REMOTE_OPEN:
+ amqpResource = (AmqpResource) protonEvent.getLink().getContext();
+ amqpResource.processRemoteOpen(this);
+ break;
+ case LINK_FLOW:
+ amqpResource = (AmqpResource) protonEvent.getLink().getContext();
+ amqpResource.processFlowUpdates(this);
+ break;
+ case DELIVERY:
+ amqpResource = (AmqpResource) protonEvent.getLink().getContext();
+ amqpResource.processDeliveryUpdates(this);
+ break;
+ default:
+ break;
+ }
+
+ protonCollector.pop();
+ }
+
+ // We have to do this to pump SASL bytes in as SASL is not event driven yet.
+ if (!authenticated) {
+ processSaslAuthentication();
+ }
+ } catch (Exception ex) {
+ LOG.warn("Caught Exception during update processing: {}", ex.getMessage(), ex);
+ fireClientException(ex);
+ }
+ }
+
+ private void processSaslAuthentication() {
+ if (authenticated || authenticator == null) {
+ return;
+ }
+
+ try {
+ if (authenticator.authenticate()) {
+ authenticator = null;
+ authenticated = true;
+ }
+ } catch (SecurityException ex) {
+ failed(ex);
+ }
+ }
+
+ private String getNextSessionId() {
+ return connectionId + ":" + sessionIdGenerator.incrementAndGet();
+ }
+
+ private String safeGetContainerId() {
+ String containerId = getContainerId();
+ if (containerId == null || containerId.isEmpty()) {
+ containerId = UUID.randomUUID().toString();
+ }
+
+ return containerId;
+ }
+
+ @Override
+ public String toString() {
+ return "AmqpConnection { " + connectionId + " }";
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java
new file mode 100644
index 0000000..9b2394c
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+/**
+ * Default listener implementation that stubs out all the event methods.
+ */
+public class AmqpDefaultClientListener implements AmqpClientListener {
+
+ @Override
+ public void onClientException(Throwable ex) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java
new file mode 100644
index 0000000..d93e052
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * A Described Type wrapper for JMS selector values.
+ */
+public class AmqpJmsSelectorType implements DescribedType {
+
+ private final String selector;
+
+ public AmqpJmsSelectorType(String selector) {
+ this.selector = selector;
+ }
+
+ @Override
+ public Object getDescriptor() {
+ return UnsignedLong.valueOf(0x0000468C00000004L);
+ }
+
+ @Override
+ public Object getDescribed() {
+ return this.selector;
+ }
+
+ @Override
+ public String toString() {
+ return "AmqpJmsSelectorType{" + selector + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
new file mode 100644
index 0000000..52e5eaf
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpMessage {
+
+ private final AmqpReceiver receiver;
+ private final Message message;
+ private final Delivery delivery;
+
+ /**
+ * Creates a new AmqpMessage that wraps the information necessary to handle
+ * an outgoing message.
+ */
+ public AmqpMessage() {
+ receiver = null;
+ delivery = null;
+
+ message = Proton.message();
+ message.setDurable(true);
+ }
+
+ /**
+ * Creates a new AmqpMessage that wraps the information necessary to handle
+ * an outgoing message.
+ *
+ * @param message
+ * the Proton message that is to be sent.
+ */
+ public AmqpMessage(Message message) {
+ this(null, message, null);
+ }
+
+ /**
+ * Creates a new AmqpMessage that wraps the information necessary to handle
+ * an incoming delivery.
+ *
+ * @param receiver
+ * the AmqpReceiver that received this message.
+ * @param message
+ * the Proton message that was received.
+ * @param delivery
+ * the Delivery instance that produced this message.
+ */
+ public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) {
+ this.receiver = receiver;
+ this.message = message;
+ this.delivery = delivery;
+ }
+
+ /**
+ * Accepts the message marking it as consumed on the remote peer.
+ *
+ * @throws Exception if an error occurs during the accept.
+ */
+ public void accept() throws Exception {
+ if (receiver == null) {
+ throw new IllegalStateException("Can't accept non-received message.");
+ }
+
+ receiver.accept(delivery);
+ }
+
+ /**
+ * Rejects the message, marking it as not deliverable here and failed to deliver.
+ *
+ * @throws Exception if an error occurs during the reject.
+ */
+ public void reject() throws Exception {
+ reject(true, true);
+ }
+
+ /**
+ * Rejects the message, marking it as failed to deliver and applying the given value
+ * to the undeliverable here tag.
+ *
+ * @param undeliverableHere
+ * marks the delivery as not being able to be process by link it was sent to.
+ *
+ * @throws Exception if an error occurs during the reject.
+ */
+ public void reject(boolean undeliverableHere) throws Exception {
+ reject(undeliverableHere, true);
+ }
+
+ /**
+ * Rejects the message, marking it as not deliverable here and failed to deliver.
+ *
+ * @param undeliverableHere
+ * marks the delivery as not being able to be process by link it was sent to.
+ * @param deliveryFailed
+ * indicates that the delivery failed for some reason.
+ *
+ * @throws Exception if an error occurs during the reject.
+ */
+ public void reject(boolean undeliverableHere, boolean deliveryFailed) throws Exception {
+ if (receiver == null) {
+ throw new IllegalStateException("Can't reject non-received message.");
+ }
+
+ receiver.reject(delivery, undeliverableHere, deliveryFailed);
+ }
+
+ /**
+ * Release the message, remote can redeliver it elsewhere.
+ *
+ * @throws Exception if an error occurs during the reject.
+ */
+ public void release() throws Exception {
+ if (receiver == null) {
+ throw new IllegalStateException("Can't release non-received message.");
+ }
+
+ receiver.release(delivery);
+ }
+
+ /**
+ * @return the AMQP Delivery object linked to a received message.
+ */
+ public Delivery getWrappedDelivery() {
+ if (delivery != null) {
+ return new UnmodifiableDelivery(delivery);
+ }
+
+ return null;
+ }
+
+ /**
+ * @return the AMQP Message that is wrapped by this object.
+ */
+ public Message getWrappedMessage() {
+ return message;
+ }
+
+ /**
+ * @return the AmqpReceiver that consumed this message.
+ */
+ public AmqpReceiver getAmqpReceiver() {
+ return receiver;
+ }
+
+ /**
+ * Sets a String value into the body of an outgoing Message, throws
+ * an exception if this is an incoming message instance.
+ *
+ * @param value
+ * the String value to store in the Message body.
+ *
+ * @throws IllegalStateException if the message is read only.
+ */
+ public void setText(String value) throws IllegalStateException {
+ if (delivery != null) {
+ throw new IllegalStateException("Message is read only.");
+ }
+
+ AmqpValue body = new AmqpValue(value);
+ getWrappedMessage().setBody(body);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java
new file mode 100644
index 0000000..2d61b83
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * A Described Type wrapper for JMS no local option for MessageConsumer.
+ */
+public class AmqpNoLocalType implements DescribedType {
+
+ public static final AmqpNoLocalType NO_LOCAL = new AmqpNoLocalType();
+
+ private final String noLocal;
+
+ public AmqpNoLocalType() {
+ this.noLocal = "NoLocalFilter{}";
+ }
+
+ @Override
+ public Object getDescriptor() {
+ return UnsignedLong.valueOf(0x0000468C00000003L);
+ }
+
+ @Override
+ public Object getDescribed() {
+ return this.noLocal;
+ }
+}