You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/13 20:48:24 UTC

[1/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5602

Repository: activemq
Updated Branches:
  refs/heads/master 10c47d69d -> 72839b78a


http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
new file mode 100644
index 0000000..7aa8c62
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
+import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.InetAddressUtil;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple TCP based transport used by the client.
+ */
+public class ClientTcpTransport implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClientTcpTransport.class);
+
+    public interface TransportListener {
+
+        /**
+         * Called when new incoming data has become available.
+         *
+         * @param incoming
+         *        the next incoming packet of data.
+         */
+        void onData(Buffer incoming);
+
+        /**
+         * Called if the connection state becomes closed.
+         */
+        void onTransportClosed();
+
+        /**
+         * Called when an error occurs during normal Transport operations.
+         *
+         * @param cause
+         *        the error that triggered this event.
+         */
+        void onTransportError(Throwable cause);
+
+    }
+
+    private final URI remoteLocation;
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
+
+    private final Socket socket;
+    private DataOutputStream dataOut;
+    private DataInputStream dataIn;
+    private Thread runner;
+    private TransportListener listener;
+
+    private int socketBufferSize = 64 * 1024;
+    private int soTimeout = 0;
+    private int soLinger = Integer.MIN_VALUE;
+    private Boolean keepAlive;
+    private Boolean tcpNoDelay = true;
+    private boolean useLocalHost = false;
+    private int ioBufferSize = 8 * 1024;
+
+    /**
+     * Create a new instance of the transport.
+     *
+     * @param listener
+     *        The TransportListener that will receive data from this Transport instance.
+     * @param remoteLocation
+     *        The remote location where this transport should connection to.
+     */
+    public ClientTcpTransport(URI remoteLocation) {
+        this.remoteLocation = remoteLocation;
+
+        Socket temp = null;
+        try {
+            temp = createSocketFactory().createSocket();
+        } catch (IOException e) {
+            connectionError.set(e);
+        }
+
+        this.socket = temp;
+    }
+
+    public void connect() throws IOException {
+        if (connectionError.get() != null) {
+            throw IOExceptionSupport.create(connectionError.get());
+        }
+
+        if (listener == null) {
+            throw new IllegalStateException("Cannot connect until a listener has been set.");
+        }
+
+        if (socket == null) {
+            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+        }
+
+        InetSocketAddress remoteAddress = null;
+
+        if (remoteLocation != null) {
+            String host = resolveHostName(remoteLocation.getHost());
+            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+        }
+
+        socket.connect(remoteAddress);
+
+        connected.set(true);
+
+        initialiseSocket(socket);
+        initializeStreams();
+
+        runner = new Thread(null, this, "ClientTcpTransport: " + toString());
+        runner.setDaemon(false);
+        runner.start();
+    }
+
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            if (socket == null) {
+                return;
+            }
+
+            // Closing the streams flush the sockets before closing.. if the socket
+            // is hung.. then this hangs the close so we perform an asynchronous close
+            // by default which will timeout if the close doesn't happen after a delay.
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            final ExecutorService closer = Executors.newSingleThreadExecutor();
+            closer.execute(new Runnable() {
+                @Override
+                public void run() {
+                    LOG.trace("Closing socket {}", socket);
+                    try {
+                        socket.close();
+                        LOG.debug("Closed socket {}", socket);
+                    } catch (IOException e) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
+                        }
+                    } finally {
+                        latch.countDown();
+                    }
+                }
+            });
+
+            try {
+                latch.await(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } finally {
+                closer.shutdownNow();
+            }
+        }
+    }
+
+    public void send(ByteBuffer output) throws IOException {
+        checkConnected();
+        LOG.trace("Client Transport sending packet of size: {}", output.remaining());
+        WritableByteChannel channel = Channels.newChannel(dataOut);
+        channel.write(output);
+        dataOut.flush();
+    }
+
+    public void send(Buffer output) throws IOException {
+        checkConnected();
+        send(output.toByteBuffer());
+    }
+
+    public URI getRemoteURI() {
+        return this.remoteLocation;
+    }
+
+    public boolean isConnected() {
+        return this.connected.get();
+    }
+
+    public TransportListener getTransportListener() {
+        return this.listener;
+    }
+
+    public void setTransportListener(TransportListener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("Listener cannot be set to null");
+        }
+
+        this.listener = listener;
+    }
+
+    public int getSocketBufferSize() {
+        return socketBufferSize;
+    }
+
+    public void setSocketBufferSize(int socketBufferSize) {
+        this.socketBufferSize = socketBufferSize;
+    }
+
+    public int getSoTimeout() {
+        return soTimeout;
+    }
+
+    public void setSoTimeout(int soTimeout) {
+        this.soTimeout = soTimeout;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(Boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public int getSoLinger() {
+        return soLinger;
+    }
+
+    public void setSoLinger(int soLinger) {
+        this.soLinger = soLinger;
+    }
+
+    public boolean isKeepAlive() {
+        return keepAlive;
+    }
+
+    public void setKeepAlive(Boolean keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
+    }
+
+    public int getIoBufferSize() {
+        return ioBufferSize;
+    }
+
+    public void setIoBufferSize(int ioBufferSize) {
+        this.ioBufferSize = ioBufferSize;
+    }
+
+    //---------- Transport internal implementation ---------------------------//
+
+    @Override
+    public void run() {
+        LOG.trace("TCP consumer thread for {} starting", this);
+        try {
+            while (isConnected()) {
+                doRun();
+            }
+        } catch (IOException e) {
+            connectionError.set(e);
+            onException(e);
+        } catch (Throwable e) {
+            IOException ioe = new IOException("Unexpected error occured: " + e);
+            connectionError.set(ioe);
+            ioe.initCause(e);
+            onException(ioe);
+        }
+    }
+
+    protected void doRun() throws IOException {
+        int size = dataIn.available();
+        if (size <= 0) {
+            try {
+                TimeUnit.NANOSECONDS.sleep(1);
+            } catch (InterruptedException e) {
+            }
+            return;
+        }
+
+        byte[] buffer = new byte[size];
+        dataIn.readFully(buffer);
+        Buffer incoming = new Buffer(buffer);
+        listener.onData(incoming);
+    }
+
+    /**
+     * Passes any IO exceptions into the transport listener
+     */
+    public void onException(IOException e) {
+        if (listener != null) {
+            try {
+                listener.onTransportError(e);
+            } catch (RuntimeException e2) {
+                LOG.debug("Unexpected runtime exception: {}", e2.getMessage(), e2);
+            }
+        }
+    }
+
+    protected SocketFactory createSocketFactory() throws IOException {
+        return SocketFactory.getDefault();
+    }
+
+    protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
+        try {
+            sock.setReceiveBufferSize(socketBufferSize);
+            sock.setSendBufferSize(socketBufferSize);
+        } catch (SocketException se) {
+            LOG.warn("Cannot set socket buffer size = {}", socketBufferSize);
+            LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se);
+        }
+
+        sock.setSoTimeout(soTimeout);
+
+        if (keepAlive != null) {
+            sock.setKeepAlive(keepAlive.booleanValue());
+        }
+
+        if (soLinger > -1) {
+            sock.setSoLinger(true, soLinger);
+        } else if (soLinger == -1) {
+            sock.setSoLinger(false, 0);
+        }
+
+        if (tcpNoDelay != null) {
+            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
+        }
+    }
+
+    protected void initializeStreams() throws IOException {
+        try {
+            TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+            this.dataIn = new DataInputStream(buffIn);
+            TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+            this.dataOut = new DataOutputStream(outputStream);
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    protected String resolveHostName(String host) throws UnknownHostException {
+        if (isUseLocalHost()) {
+            String localName = InetAddressUtil.getLocalHostName();
+            if (localName != null && localName.equals(host)) {
+                return "localhost";
+            }
+        }
+        return host;
+    }
+
+    private void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java
new file mode 100644
index 0000000..b67b305
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Unmodifiable Connection wrapper used to prevent test code from accidentally
+ * modifying Connection state.
+ */
+public class UnmodifiableConnection implements Connection {
+
+    private final Connection connection;
+
+    public UnmodifiableConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    @Override
+    public EndpointState getLocalState() {
+        return connection.getLocalState();
+    }
+
+    @Override
+    public EndpointState getRemoteState() {
+        return connection.getRemoteState();
+    }
+
+    @Override
+    public ErrorCondition getCondition() {
+        return connection.getCondition();
+    }
+
+    @Override
+    public void setCondition(ErrorCondition condition) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public ErrorCondition getRemoteCondition() {
+        return connection.getRemoteCondition();
+    }
+
+    @Override
+    public void free() {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public void open() {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public void close() {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public Session session() {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public Session sessionHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
+        Session head = connection.sessionHead(local, remote);
+        if (head != null) {
+            head = new UnmodifiableSession(head);
+        }
+
+        return head;
+    }
+
+    @Override
+    public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
+        // TODO - If implemented this method should return an unmodifiable link isntance.
+        return null;
+    }
+
+    @Override
+    public Delivery getWorkHead() {
+        // TODO - If implemented this method should return an unmodifiable delivery isntance.
+        return null;
+    }
+
+    @Override
+    public void setContainer(String container) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public void setHostname(String hostname) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public String getHostname() {
+        return connection.getHostname();
+    }
+
+    @Override
+    public String getRemoteContainer() {
+        return connection.getRemoteContainer();
+    }
+
+    @Override
+    public String getRemoteHostname() {
+        return connection.getRemoteHostname();
+    }
+
+    @Override
+    public void setOfferedCapabilities(Symbol[] capabilities) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public void setDesiredCapabilities(Symbol[] capabilities) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public Symbol[] getRemoteOfferedCapabilities() {
+        return connection.getRemoteOfferedCapabilities();
+    }
+
+    @Override
+    public Symbol[] getRemoteDesiredCapabilities() {
+        return connection.getRemoteDesiredCapabilities();
+    }
+
+    @Override
+    public Map<Symbol, Object> getRemoteProperties() {
+        return connection.getRemoteProperties();
+    }
+
+    @Override
+    public void setProperties(Map<Symbol, Object> properties) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public Object getContext() {
+        return connection.getContext();
+    }
+
+    @Override
+    public void setContext(Object context) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+
+    @Override
+    public void collect(Collector collector) {
+        throw new UnsupportedOperationException("Cannot alter the Connection");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
new file mode 100644
index 0000000..fd99665
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Unmodifiable Delivery wrapper used to prevent test code from accidentally
+ * modifying Delivery state.
+ */
+public class UnmodifiableDelivery implements Delivery {
+
+    private final Delivery delivery;
+
+    public UnmodifiableDelivery(Delivery delivery) {
+        this.delivery = delivery;
+    }
+
+    @Override
+    public byte[] getTag() {
+        return delivery.getTag();
+    }
+
+    @Override
+    public Link getLink() {
+        if (delivery.getLink() instanceof Sender) {
+            return new UnmodifiableSender((Sender) delivery.getLink());
+        } else if (delivery.getLink() instanceof Receiver) {
+            return new UnmodifiableReceiver((Receiver) delivery.getLink());
+        } else {
+            throw new IllegalStateException("Delivery has unknown link type");
+        }
+    }
+
+    @Override
+    public DeliveryState getLocalState() {
+        return delivery.getLocalState();
+    }
+
+    @Override
+    public DeliveryState getRemoteState() {
+        return delivery.getRemoteState();
+    }
+
+    @Override
+    public int getMessageFormat() {
+        return delivery.getMessageFormat();
+    }
+
+    @Override
+    public void disposition(DeliveryState state) {
+        throw new UnsupportedOperationException("Cannot alter the Delivery state");
+    }
+
+    @Override
+    public void settle() {
+        throw new UnsupportedOperationException("Cannot alter the Delivery state");
+    }
+
+    @Override
+    public boolean isSettled() {
+        return delivery.isSettled();
+    }
+
+    @Override
+    public boolean remotelySettled() {
+        return delivery.remotelySettled();
+    }
+
+    @Override
+    public void free() {
+        throw new UnsupportedOperationException("Cannot alter the Delivery state");
+    }
+
+    @Override
+    public Delivery getWorkNext() {
+        return new UnmodifiableDelivery(delivery.getWorkNext());
+    }
+
+    @Override
+    public Delivery next() {
+        return new UnmodifiableDelivery(delivery.next());
+    }
+
+    @Override
+    public boolean isWritable() {
+        return delivery.isWritable();
+    }
+
+    @Override
+    public boolean isReadable() {
+        return delivery.isReadable();
+    }
+
+    @Override
+    public void setContext(Object o) {
+        throw new UnsupportedOperationException("Cannot alter the Delivery state");
+    }
+
+    @Override
+    public Object getContext() {
+        return delivery.getContext();
+    }
+
+    @Override
+    public boolean isUpdated() {
+        return delivery.isUpdated();
+    }
+
+    @Override
+    public void clear() {
+        throw new UnsupportedOperationException("Cannot alter the Delivery state");
+    }
+
+    @Override
+    public boolean isPartial() {
+        return delivery.isPartial();
+    }
+
+    @Override
+    public int pending() {
+        return delivery.pending();
+    }
+
+    @Override
+    public boolean isBuffered() {
+        return delivery.isBuffered();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
new file mode 100644
index 0000000..70665c0
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.util.EnumSet;
+
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.amqp.transport.Source;
+import org.apache.qpid.proton.amqp.transport.Target;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Unmodifiable Session wrapper used to prevent test code from accidentally
+ * modifying Session state.
+ */
+public class UnmodifiableLink implements Link {
+
+    private final Link link;
+
+    public UnmodifiableLink(Link link) {
+        this.link = link;
+    }
+
+    @Override
+    public EndpointState getLocalState() {
+        return link.getLocalState();
+    }
+
+    @Override
+    public EndpointState getRemoteState() {
+        return link.getRemoteState();
+    }
+
+    @Override
+    public ErrorCondition getCondition() {
+        return link.getCondition();
+    }
+
+    @Override
+    public void setCondition(ErrorCondition condition) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public ErrorCondition getRemoteCondition() {
+        return link.getRemoteCondition();
+    }
+
+    @Override
+    public void free() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void open() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void close() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void setContext(Object o) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public Object getContext() {
+        return link.getContext();
+    }
+
+    @Override
+    public String getName() {
+        return link.getName();
+    }
+
+    @Override
+    public Delivery delivery(byte[] tag) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public Delivery delivery(byte[] tag, int offset, int length) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public Delivery head() {
+        return new UnmodifiableDelivery(link.head());
+    }
+
+    @Override
+    public Delivery current() {
+        return new UnmodifiableDelivery(link.current());
+    }
+
+    @Override
+    public boolean advance() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public Source getSource() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Target getTarget() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setSource(Source address) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void setTarget(Target address) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public Source getRemoteSource() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Target getRemoteTarget() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
+        Link next = link.next(local, remote);
+
+        if (next != null) {
+            if (next instanceof Sender) {
+                next = new UnmodifiableSender((Sender) next);
+            } else {
+                next = new UnmodifiableReceiver((Receiver) next);
+            }
+        }
+
+        return next;
+    }
+
+    @Override
+    public int getCredit() {
+        return link.getCredit();
+    }
+
+    @Override
+    public int getQueued() {
+        return link.getQueued();
+    }
+
+    @Override
+    public int getUnsettled() {
+        return link.getUnsettled();
+    }
+
+    @Override
+    public Session getSession() {
+        return new UnmodifiableSession(link.getSession());
+    }
+
+    @Override
+    public SenderSettleMode getSenderSettleMode() {
+        return link.getSenderSettleMode();
+    }
+
+    @Override
+    public void setSenderSettleMode(SenderSettleMode senderSettleMode) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public SenderSettleMode getRemoteSenderSettleMode() {
+        return link.getRemoteSenderSettleMode();
+    }
+
+    @Override
+    public ReceiverSettleMode getReceiverSettleMode() {
+        return link.getReceiverSettleMode();
+    }
+
+    @Override
+    public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public ReceiverSettleMode getRemoteReceiverSettleMode() {
+        return link.getRemoteReceiverSettleMode();
+    }
+
+    @Override
+    public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public int drained() {
+        return link.drained();  // TODO - Is this a mutating call?
+    }
+
+    @Override
+    public int getRemoteCredit() {
+        return link.getRemoteCredit();
+    }
+
+    @Override
+    public boolean getDrain() {
+        return link.getDrain();
+    }
+
+    @Override
+    public void detach() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
new file mode 100644
index 0000000..1b07ed0
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import org.apache.qpid.proton.engine.Receiver;
+
+/**
+ * Unmodifiable Receiver wrapper used to prevent test code from accidentally
+ * modifying Receiver state.
+ */
+public class UnmodifiableReceiver extends UnmodifiableLink implements Receiver {
+
+    private final Receiver receiver;
+
+    public UnmodifiableReceiver(Receiver receiver) {
+        super(receiver);
+
+        this.receiver = receiver;
+    }
+
+    @Override
+    public void flow(int credits) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public int recv(byte[] bytes, int offset, int size) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void drain(int credit) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public boolean draining() {
+        return receiver.draining();
+    }
+
+    @Override
+    public void setDrain(boolean drain) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
new file mode 100644
index 0000000..1517a93
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Unmodifiable Sender wrapper used to prevent test code from accidentally
+ * modifying Sender state.
+ */
+public class UnmodifiableSender extends UnmodifiableLink implements Sender {
+
+    public UnmodifiableSender(Sender sender) {
+        super(sender);
+    }
+
+    @Override
+    public void offer(int credits) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public int send(byte[] bytes, int offset, int length) {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+
+    @Override
+    public void abort() {
+        throw new UnsupportedOperationException("Cannot alter the Link state");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
new file mode 100644
index 0000000..6a73e0f
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.util.EnumSet;
+
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Unmodifiable Session wrapper used to prevent test code from accidentally
+ * modifying Session state.
+ */
+public class UnmodifiableSession implements Session {
+
+    private final Session session;
+
+    public UnmodifiableSession(Session session) {
+        this.session = session;
+    }
+
+    @Override
+    public EndpointState getLocalState() {
+        return session.getLocalState();
+    }
+
+    @Override
+    public EndpointState getRemoteState() {
+        return session.getRemoteState();
+    }
+
+    @Override
+    public ErrorCondition getCondition() {
+        return session.getCondition();
+    }
+
+    @Override
+    public void setCondition(ErrorCondition condition) {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public ErrorCondition getRemoteCondition() {
+        return session.getRemoteCondition();
+    }
+
+    @Override
+    public void free() {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public void open() {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public void close() {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public void setContext(Object o) {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public Object getContext() {
+        return session.getContext();
+    }
+
+    @Override
+    public Sender sender(String name) {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public Receiver receiver(String name) {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public Session next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
+        Session next = session.next(local, remote);
+        if (next != null) {
+            next = new UnmodifiableSession(next);
+        }
+
+        return next;
+    }
+
+    @Override
+    public Connection getConnection() {
+        return new UnmodifiableConnection(session.getConnection());
+    }
+
+    @Override
+    public int getIncomingCapacity() {
+        return session.getIncomingCapacity();
+    }
+
+    @Override
+    public void setIncomingCapacity(int bytes) {
+        throw new UnsupportedOperationException("Cannot alter the Session");
+    }
+
+    @Override
+    public int getIncomingBytes() {
+        return session.getIncomingBytes();
+    }
+
+    @Override
+    public int getOutgoingBytes() {
+        return session.getOutgoingBytes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java
new file mode 100644
index 0000000..bb2d983
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+/**
+ * Base class used to wrap one AsyncResult with another.
+ */
+public abstract class WrappedAsyncResult implements AsyncResult {
+
+    protected final AsyncResult wrapped;
+
+    /**
+     * Create a new WrappedAsyncResult for the target AsyncResult
+     */
+    public WrappedAsyncResult(AsyncResult wrapped) {
+        this.wrapped = wrapped;
+    }
+
+    @Override
+    public void onFailure(Throwable result) {
+        if (wrapped != null) {
+            wrapped.onFailure(result);
+        }
+    }
+
+    @Override
+    public void onSuccess() {
+        if (wrapped != null) {
+            wrapped.onSuccess();
+        }
+    }
+
+    @Override
+    public boolean isComplete() {
+        if (wrapped != null) {
+            return wrapped.isComplete();
+        }
+
+        return false;
+    }
+
+    public AsyncResult getWrappedRequest() {
+        return wrapped;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
new file mode 100644
index 0000000..a35709d
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Map;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpStateInspector;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Connection;
+import org.junit.Test;
+
+/**
+ * Test broker handling of AMQP connections with various configurations.
+ */
+public class AmqpConnectionsTest extends AmqpClientTestSupport {
+
+    private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+    private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
+    private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+
+    @Test(timeout = 60000)
+    public void testCanConnect() throws Exception {
+        AmqpClient client = createAmqpClient();
+        assertNotNull(client);
+
+        AmqpConnection connection = client.connect();
+        assertNotNull(connection);
+
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection.close();
+
+        assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+    }
+
+    @Test(timeout = 60000)
+    public void testConnectionCarriesExpectedCapabilities() throws Exception {
+        AmqpClient client = createAmqpClient();
+        assertNotNull(client);
+
+        client.setStateInspector(new AmqpStateInspector() {
+
+            @Override
+            public void inspectOpenedResource(Connection connection) {
+
+                Symbol[] offered = connection.getRemoteOfferedCapabilities();
+                if (!contains(offered, ANONYMOUS_RELAY)) {
+                    markAsInvalid("Broker did not indicate it support anonymous relay");
+                }
+
+                Map<Symbol, Object> properties = connection.getRemoteProperties();
+                if (!properties.containsKey(QUEUE_PREFIX)) {
+                    markAsInvalid("Broker did not send a queue prefix value");
+                }
+
+                if (!properties.containsKey(TOPIC_PREFIX)) {
+                    markAsInvalid("Broker did not send a queue prefix value");
+                }
+            }
+        });
+
+        AmqpConnection connection = client.connect();
+        assertNotNull(connection);
+
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection.close();
+
+        assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+    }
+
+    @Test(timeout = 60000)
+    public void testCanConnectWithDifferentContainerIds() throws Exception {
+        AmqpClient client = createAmqpClient();
+        assertNotNull(client);
+
+        AmqpConnection connection1 = client.createConnection();
+        AmqpConnection connection2 = client.createConnection();
+
+        connection1.setContainerId(getTestName() + "-Client:1");
+        connection2.setContainerId(getTestName() + "-Client:2");
+
+        connection1.connect();
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection2.connect();
+        assertEquals(2, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection1.close();
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection2.close();
+        assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+    }
+
+    @Test(timeout = 60000)
+    public void testCannotConnectWithSameContainerId() throws Exception {
+        AmqpClient client = createAmqpClient();
+        assertNotNull(client);
+
+        AmqpConnection connection1 = client.createConnection();
+        AmqpConnection connection2 = client.createConnection();
+
+        connection1.setContainerId(getTestName());
+        connection2.setContainerId(getTestName());
+
+        connection1.connect();
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection2.setStateInspector(new AmqpStateInspector() {
+
+            @Override
+            public void inspectOpenedResource(Connection connection) {
+                if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
+                    markAsInvalid("Broker did not set connection establishment failed property");
+                }
+            }
+
+            @Override
+            public void inspectClosedResource(Connection connection) {
+                ErrorCondition remoteError = connection.getRemoteCondition();
+                if (remoteError == null) {
+                    markAsInvalid("Broker dd not add error condition for duplicate client ID");
+                }
+
+                if (!remoteError.getCondition().equals(AmqpError.INVALID_FIELD)) {
+                    markAsInvalid("Broker dd not set condition to " + AmqpError.INVALID_FIELD);
+                }
+            }
+        });
+
+        try {
+            connection2.connect();
+            //fail("Should not be able to connect with same container Id.");
+        } catch (Exception ex) {
+            LOG.info("Second connection with same container Id failed as expected.");
+        }
+
+        connection2.getStateInspector().assertIfStateChecksFailed();
+
+        assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
+
+        connection1.close();
+        assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
new file mode 100644
index 0000000..1245811
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.util.Wait;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverTest extends AmqpClientTestSupport {
+
+    @Override
+    protected boolean isUseOpenWireConnector() {
+        return true;
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateQueueReceiver() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        assertNotNull(getProxyToQueue(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+        receiver.close();
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateTopicReceiver() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getTopics().length);
+
+        AmqpReceiver receiver = session.createReceiver("topic://" + getTestName());
+
+        assertEquals(1, brokerService.getAdminView().getTopics().length);
+        assertNotNull(getProxyToTopic(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getTopicSubscribers().length);
+        receiver.close();
+        assertEquals(0, brokerService.getAdminView().getTopicSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testQueueReceiverReadMessage() throws Exception {
+        sendMessages(getTestName(), 1, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(1, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        receiver.flow(1);
+        assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+        receiver.close();
+
+        assertEquals(1, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
+        int MSG_COUNT = 4;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+        receiver1.flow(2);
+        assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
+        assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
+
+        AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
+
+        assertEquals(2, brokerService.getAdminView().getQueueSubscribers().length);
+
+        receiver2.flow(2);
+        assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
+        assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
+
+        assertEquals(MSG_COUNT, queueView.getDispatchCount());
+        assertEquals(0, queueView.getDequeueCount());
+
+        receiver1.close();
+        receiver2.close();
+
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Ignore("Fails due to issues with accept and no credit")
+    @Test(timeout = 60000)
+    public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
+        int MSG_COUNT = 4;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+        receiver1.flow(2);
+        AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+        message = receiver1.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+
+        assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getDequeueCount() == 2;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+        AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
+
+        assertEquals(2, brokerService.getAdminView().getQueueSubscribers().length);
+
+        receiver2.flow(2);
+        message = receiver2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+        message = receiver2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+
+        assertEquals(MSG_COUNT, queueView.getDispatchCount());
+        assertTrue("Queue should be empty now", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getDequeueCount() == 4;
+            }
+        }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(10)));
+
+        receiver1.close();
+        receiver2.close();
+
+        assertEquals(0, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception {
+        int MSG_COUNT = 20;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+
+        receiver1.flow(20);
+
+        assertTrue("Should have dispatch to prefetch", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getInFlightCount() >= 2;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+        receiver1.close();
+
+        AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
+
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+
+        receiver2.flow(MSG_COUNT * 2);
+        AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+        message = receiver2.receive(5, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.accept();
+
+        assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getDequeueCount() == 2;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+        receiver2.close();
+
+        assertEquals(MSG_COUNT - 2, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Ignore("Test fails currently due to improper implementation of drain.")
+    @Test(timeout = 60000)
+    public void testReceiverCanDrainMessages() throws Exception {
+        int MSG_COUNT = 20;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        receiver.drain(MSG_COUNT);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            message.accept();
+        }
+        receiver.close();
+
+        assertEquals(0, queueView.getQueueSize());
+
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
new file mode 100644
index 0000000..3f6a454
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test broker behavior when creating AMQP senders
+ */
+public class AmqpSenderTest extends AmqpClientTestSupport {
+
+    @Test(timeout = 60000)
+    public void testCreateQueueSender() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        assertNotNull(getProxyToQueue(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+        sender.close();
+        assertEquals(0, brokerService.getAdminView().getQueueProducers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateTopicSender() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getTopics().length);
+
+        AmqpSender sender = session.createSender("topic://" + getTestName());
+
+        assertEquals(1, brokerService.getAdminView().getTopics().length);
+        assertNotNull(getProxyToTopic(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getTopicProducers().length);
+        sender.close();
+        assertEquals(0, brokerService.getAdminView().getTopicProducers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testSendMessageToQueue() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpMessage message = new AmqpMessage();
+
+        message.setText("Test-Message");
+
+        sender.send(message);
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        assertEquals(1, queue.getQueueSize());
+
+        sender.close();
+        connection.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java
new file mode 100644
index 0000000..b8f456f
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test for creation and configuration of AMQP sessions.
+ */
+public class AmqpSessionTest extends AmqpClientTestSupport {
+
+    @Test
+    public void testCreateSession() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+        assertNotNull(session);
+        connection.close();
+    }
+}


[2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5602

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
new file mode 100644
index 0000000..ec37710
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -0,0 +1,599 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.InvalidDestinationException;
+
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Receiver class that manages a Proton receiver endpoint.
+ */
+public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
+
+    // TODO: Use constants available from Proton 0.9
+    private static final Symbol ACCEPTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:accepted:list");
+    private static final Symbol REJECTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:rejected:list");
+    private static final Symbol MODIFIED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:modified:list");
+    private static final Symbol RELEASED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:released:list");
+
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<AmqpMessage>();
+
+    private final AmqpSession session;
+    private final String address;
+    private final String receiverId;
+
+    private String subscriptionName;
+    private String selector;
+    private boolean presettle;
+    private boolean noLocal;
+
+    /**
+     * Create a new receiver instance.
+     *
+     * @param session
+     * 		  The parent session that created the receiver.
+     * @param address
+     *        The address that this receiver should listen on.
+     * @param receiverId
+     *        The unique ID assigned to this receiver.
+     */
+    public AmqpReceiver(AmqpSession session, String address, String receiverId) {
+        this.session = session;
+        this.address = address;
+        this.receiverId = receiverId;
+    }
+
+    /**
+     * Close the sender, a closed sender will throw exceptions if any further send
+     * calls are made.
+     *
+     * @throws IOException if an error occurs while closing the sender.
+     */
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            final ClientFuture request = new ClientFuture();
+            session.getScheduler().execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    checkClosed();
+                    close(request);
+                    session.pumpToProtonTransport();
+                }
+            });
+
+            request.sync();
+        }
+    }
+
+    /**
+     * @return this session's parent AmqpSession.
+     */
+    public AmqpSession getSession() {
+        return session;
+    }
+
+    /**
+     * @return the address that this receiver has been configured to listen on.
+     */
+    public String getAddress() {
+        return address;
+    }
+
+    /**
+     * Attempts to wait on a message to be delivered to this receiver.  The receive
+     * call will wait indefinitely for a message to be delivered.
+     *
+     * @return a newly received message sent to this receiver.
+     *
+     * @throws Exception if an error occurs during the receive attempt.
+     */
+    public AmqpMessage receive() throws Exception {
+        checkClosed();
+        return prefetch.take();
+    }
+
+    /**
+     * Attempts to receive a message sent to this receiver, waiting for the given
+     * timeout value before giving up and returning null.
+     *
+     * @param timeout
+     * 	      the time to wait for a new message to arrive.
+     * @param unit
+     * 		  the unit of time that the timeout value represents.
+     *
+     * @return a newly received message or null if the time to wait period expires.
+     *
+     * @throws Exception if an error occurs during the receive attempt.
+     */
+    public AmqpMessage receive(long timeout, TimeUnit unit) throws Exception {
+        checkClosed();
+        return prefetch.poll(timeout, unit);
+    }
+
+    /**
+     * If a message is already available in this receiver's prefetch buffer then
+     * it is returned immediately otherwise this methods return null without waiting.
+     *
+     * @return a newly received message or null if there is no currently available message.
+     *
+     * @throws Exception if an error occurs during the receive attempt.
+     */
+    public AmqpMessage receiveNoWait() throws Exception {
+        checkClosed();
+        return prefetch.poll();
+    }
+
+    /**
+     * Controls the amount of credit given to the receiver link.
+     *
+     * @param credit
+     *        the amount of credit to grant.
+     *
+     * @throws IOException if an error occurs while sending the flow.
+     */
+    public void flow(final int credit) throws IOException {
+        checkClosed();
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    getEndpoint().flow(credit);
+                    session.pumpToProtonTransport();
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * Attempts to drain a given amount of credit from the link.
+     *
+     * @param credit
+     *        the amount of credit to drain.
+     *
+     * @throws IOException if an error occurs while sending the drain.
+     */
+    public void drain(final int credit) throws IOException {
+        checkClosed();
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    getEndpoint().drain(credit);
+                    session.pumpToProtonTransport();
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * Accepts a message that was dispatched under the given Delivery instance.
+     *
+     * @param delivery
+     *        the Delivery instance to accept.
+     *
+     * @throws IOException if an error occurs while sending the accept.
+     */
+    public void accept(final Delivery delivery) throws IOException {
+        checkClosed();
+
+        if (delivery == null) {
+            throw new IllegalArgumentException("Delivery to accept cannot be null");
+        }
+
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    if (!delivery.isSettled()) {
+                        delivery.disposition(Accepted.getInstance());
+                        delivery.settle();
+                        session.pumpToProtonTransport();
+                    }
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * Reject a message that was dispatched under the given Delivery instance.
+     *
+     * @param delivery
+     *        the Delivery instance to reject.
+     * @param undeliverableHere
+     *        marks the delivery as not being able to be process by link it was sent to.
+     * @param deliveryFailed
+     *        indicates that the delivery failed for some reason.
+     *
+     * @throws IOException if an error occurs while sending the reject.
+     */
+    public void reject(final Delivery delivery, final boolean undeliverableHere, final boolean deliveryFailed) throws IOException {
+        checkClosed();
+
+        if (delivery == null) {
+            throw new IllegalArgumentException("Delivery to reject cannot be null");
+        }
+
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    if (!delivery.isSettled()) {
+                        Modified disposition = new Modified();
+                        disposition.setUndeliverableHere(undeliverableHere);
+                        disposition.setDeliveryFailed(deliveryFailed);
+                        delivery.disposition(disposition);
+                        delivery.settle();
+                        session.pumpToProtonTransport();
+                    }
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * Release a message that was dispatched under the given Delivery instance.
+     *
+     * @param delivery
+     *        the Delivery instance to release.
+     *
+     * @throws IOException if an error occurs while sending the release.
+     */
+    public void release(final Delivery delivery) throws IOException {
+        checkClosed();
+
+        if (delivery == null) {
+            throw new IllegalArgumentException("Delivery to release cannot be null");
+        }
+
+        final ClientFuture request = new ClientFuture();
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                try {
+                    if (!delivery.isSettled()) {
+                        delivery.disposition(Released.getInstance());
+                        delivery.settle();
+                        session.pumpToProtonTransport();
+                    }
+                    request.onSuccess();
+                } catch (Exception e) {
+                    request.onFailure(e);
+                }
+            }
+        });
+
+        request.sync();
+    }
+
+    /**
+     * @return an unmodifiable view of the underlying Receiver instance.
+     */
+    public Receiver getReceiver() {
+        return new UnmodifiableReceiver(getEndpoint());
+    }
+
+    //----- Receiver configuration properties --------------------------------//
+
+    public boolean isPresettle() {
+        return presettle;
+    }
+
+    public void setPresettle(boolean presettle) {
+        this.presettle = presettle;
+    }
+
+    public boolean isDurable() {
+        return subscriptionName != null;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public String getSelector() {
+        return selector;
+    }
+
+    public void setSelector(String selector) {
+        this.selector = selector;
+    }
+
+    public boolean isNoLocal() {
+        return noLocal;
+    }
+
+    public void setNoLocal(boolean noLocal) {
+        this.noLocal = noLocal;
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    @Override
+    protected void doOpen() {
+
+        Source source = new Source();
+        source.setAddress(address);
+        Target target = new Target();
+
+        configureSource(source);
+
+        String receiverName = receiverId + ":" + address;
+
+        if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) {
+            // In the case of Durable Topic Subscriptions the client must use the same
+            // receiver name which is derived from the subscription name property.
+            receiverName = getSubscriptionName();
+        }
+
+        Receiver receiver = session.getEndpoint().receiver(receiverName);
+        receiver.setSource(source);
+        receiver.setTarget(target);
+        if (isPresettle()) {
+            receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
+        } else {
+            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        }
+        receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        setEndpoint(receiver);
+
+        super.doOpen();
+    }
+
+    @Override
+    protected void doOpenCompletion() {
+        // Verify the attach response contained a non-null Source
+        org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource();
+        if (s != null) {
+            super.doOpenCompletion();
+        } else {
+            // No link terminus was created, the peer will now detach/close us.
+        }
+    }
+
+    @Override
+    protected void doClose() {
+        if (isDurable()) {
+            getEndpoint().detach();
+        } else {
+            getEndpoint().close();
+        }
+    }
+
+    @Override
+    protected Exception getOpenAbortException() {
+        // Verify the attach response contained a non-null Source
+        org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource();
+        if (s != null) {
+            return super.getOpenAbortException();
+        } else {
+            // No link terminus was created, the peer has detach/closed us, create IDE.
+            return new InvalidDestinationException("Link creation was refused");
+        }
+    }
+
+    @Override
+    protected void doOpenInspection() {
+        getStateInspector().inspectOpenedResource(getReceiver());
+    }
+
+    @Override
+    protected void doClosedInspection() {
+        getStateInspector().inspectClosedResource(getReceiver());
+    }
+
+    @Override
+    protected void doDetachedInspection() {
+        getStateInspector().inspectDetachedResource(getReceiver());
+    }
+
+    protected void configureSource(Source source) {
+        Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
+        Symbol[] outcomes = new Symbol[]{ACCEPTED_DESCRIPTOR_SYMBOL, REJECTED_DESCRIPTOR_SYMBOL,
+                                         RELEASED_DESCRIPTOR_SYMBOL, MODIFIED_DESCRIPTOR_SYMBOL};
+
+        if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) {
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+            source.setDurable(TerminusDurability.UNSETTLED_STATE);
+            source.setDistributionMode(COPY);
+        } else {
+            source.setDurable(TerminusDurability.NONE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+        }
+
+        source.setOutcomes(outcomes);
+
+        Modified modified = new Modified();
+        modified.setDeliveryFailed(true);
+        modified.setUndeliverableHere(false);
+
+        source.setDefaultOutcome(modified);
+
+        if (isNoLocal()) {
+            filters.put(NO_LOCAL_NAME, AmqpNoLocalType.NO_LOCAL);
+        }
+
+        if (getSelector() != null && !getSelector().trim().equals("")) {
+            filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(getSelector()));
+        }
+
+        if (!filters.isEmpty()) {
+            source.setFilter(filters);
+        }
+    }
+
+    @Override
+    public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+        Delivery incoming = null;
+        do {
+            incoming = getEndpoint().current();
+            if (incoming != null) {
+                if(incoming.isReadable() && !incoming.isPartial()) {
+                    LOG.trace("{} has incoming Message(s).", this);
+                    try {
+                        processDelivery(incoming);
+                    } catch (Exception e) {
+                        throw IOExceptionSupport.create(e);
+                    }
+                    getEndpoint().advance();
+                } else {
+                    LOG.trace("{} has a partial incoming Message(s), deferring.", this);
+                    incoming = null;
+                }
+            }
+        } while (incoming != null);
+
+        super.processDeliveryUpdates(connection);
+    }
+
+    private void processDelivery(Delivery incoming) throws Exception {
+        Message message = null;
+        try {
+            message = decodeIncomingMessage(incoming);
+        } catch (Exception e) {
+            LOG.warn("Error on transform: {}", e.getMessage());
+            deliveryFailed(incoming, true);
+            return;
+        }
+
+        AmqpMessage amqpMessage = new AmqpMessage(this, message, incoming);
+        // Store reference to envelope in delivery context for recovery
+        incoming.setContext(amqpMessage);
+        prefetch.add(amqpMessage);
+    }
+
+    protected Message decodeIncomingMessage(Delivery incoming) {
+        int count;
+
+        byte[] chunk = new byte[2048];
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+        while ((count = getEndpoint().recv(chunk, 0, chunk.length)) > 0) {
+            stream.write(chunk, 0, count);
+        }
+
+        byte[] messageBytes = stream.toByteArray();
+
+        try {
+            Message protonMessage = Message.Factory.create();
+            protonMessage.decode(messageBytes, 0, messageBytes.length);
+            return protonMessage;
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+
+    protected void deliveryFailed(Delivery incoming, boolean expandCredit) {
+        Modified disposition = new Modified();
+        disposition.setUndeliverableHere(true);
+        disposition.setDeliveryFailed(true);
+        incoming.disposition(disposition);
+        incoming.settle();
+        if (expandCredit) {
+            getEndpoint().flow(1);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "{ address = " + address + "}";
+    }
+
+    private void checkClosed() {
+        if (isClosed()) {
+            throw new IllegalStateException("Receiver is already closed");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
new file mode 100644
index 0000000..b4e6215
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+
+/**
+ * AmqpResource specification.
+ *
+ * All AMQP types should implement this interface to allow for control of state
+ * and configuration details.
+ */
+public interface AmqpResource {
+
+    /**
+     * Perform all the work needed to open this resource and store the request
+     * until such time as the remote peer indicates the resource has become active.
+     *
+     * @param request
+     *        The initiating request that triggered this open call.
+     */
+    void open(AsyncResult request);
+
+    /**
+     * @return if the resource has moved to the opened state on the remote.
+     */
+    boolean isOpen();
+
+    /**
+     * Called to indicate that this resource is now remotely opened.  Once opened a
+     * resource can start accepting incoming requests.
+     */
+    void opened();
+
+    /**
+     * Perform all work needed to close this resource and store the request
+     * until such time as the remote peer indicates the resource has been closed.
+     *
+     * @param request
+     *        The initiating request that triggered this close call.
+     */
+    void close(AsyncResult request);
+
+    /**
+     * @return if the resource has moved to the closed state on the remote.
+     */
+    boolean isClosed();
+
+    /**
+     * Called to indicate that this resource is now remotely closed.  Once closed a
+     * resource can not accept any incoming requests.
+     */
+    void closed();
+
+    /**
+     * Sets the failed state for this Resource and triggers a failure signal for
+     * any pending ProduverRequest.
+     */
+    void failed();
+
+    /**
+     * Called to indicate that the remote end has become closed but the resource
+     * was not awaiting a close.  This could happen during an open request where
+     * the remote does not set an error condition or during normal operation.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     */
+    void remotelyClosed(AmqpConnection connection);
+
+    /**
+     * Sets the failed state for this Resource and triggers a failure signal for
+     * any pending ProduverRequest.
+     *
+     * @param cause
+     *        The Exception that triggered the failure.
+     */
+    void failed(Exception cause);
+
+    /**
+     * Event handler for remote peer open of this resource.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteOpen(AmqpConnection connection) throws IOException;
+
+    /**
+     * Event handler for remote peer detach of this resource.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteDetach(AmqpConnection connection) throws IOException;
+
+    /**
+     * Event handler for remote peer close of this resource.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processRemoteClose(AmqpConnection connection) throws IOException;
+
+    /**
+     * Called when the Proton Engine signals an Delivery related event has been triggered
+     * for the given endpoint.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processDeliveryUpdates(AmqpConnection connection) throws IOException;
+
+    /**
+     * Called when the Proton Engine signals an Flow related event has been triggered
+     * for the given endpoint.
+     *
+     * @param connection
+     *        The connection that owns this resource.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processFlowUpdates(AmqpConnection connection) throws IOException;
+
+    /**
+     * @returns true if the remote end has sent an error
+     */
+    boolean hasRemoteError();
+
+    /**
+     * @return an Exception derived from the error state of the endpoint's Remote Condition.
+     */
+    Exception getRemoteError();
+
+    /**
+     * @return an Error message derived from the error state of the endpoint's Remote Condition.
+     */
+    String getRemoteErrorMessage();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
new file mode 100644
index 0000000..95b0743
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.InvalidDestinationException;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sender class that manages a Proton sender endpoint.
+ */
+public class AmqpSender extends AmqpAbstractResource<Sender> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
+    //TODO: Use constants available from Proton 0.9
+    private static final Symbol ACCEPTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:accepted:list");
+    private static final Symbol REJECTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:rejected:list");
+
+    public static final long DEFAULT_SEND_TIMEOUT = 15000;
+
+    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    private final AmqpSession session;
+    private final String address;
+    private final String senderId;
+
+    private boolean presettle;
+    private long sendTimeout = DEFAULT_SEND_TIMEOUT;
+
+    private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
+    private byte[] encodeBuffer = new byte[1024 * 8];
+
+    /**
+     * Create a new sender instance.
+     *
+     * @param session
+     * 		  The parent session that created the session.
+     * @param address
+     *        The address that this sender produces to.
+     * @param senderId
+     *        The unique ID assigned to this sender.
+     */
+    public AmqpSender(AmqpSession session, String address, String senderId) {
+        this.session = session;
+        this.address = address;
+        this.senderId = senderId;
+    }
+
+    /**
+     * Sends the given message to this senders assigned address.
+     *
+     * @param message
+     *        the message to send.
+     *
+     * @throws IOException if an error occurs during the send.
+     */
+    public void send(final AmqpMessage message) throws IOException {
+        checkClosed();
+        final ClientFuture sendRequest = new ClientFuture();
+
+        session.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    doSend(message, sendRequest);
+                    session.pumpToProtonTransport();
+                } catch (Exception e) {
+                    sendRequest.onFailure(e);
+                    session.getConnection().fireClientException(e);
+                }
+            }
+        });
+
+        if (sendTimeout <= 0) {
+            sendRequest.sync();
+        } else {
+            sendRequest.sync(sendTimeout, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * Close the sender, a closed sender will throw exceptions if any further send
+     * calls are made.
+     *
+     * @throws IOException if an error occurs while closing the sender.
+     */
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            final ClientFuture request = new ClientFuture();
+            session.getScheduler().execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    checkClosed();
+                    close(request);
+                    session.pumpToProtonTransport();
+                }
+            });
+
+            request.sync();
+        }
+    }
+
+    /**
+     * @return this session's parent AmqpSession.
+     */
+    public AmqpSession getSession() {
+        return session;
+    }
+
+    /**
+     * @return an unmodifiable view of the underlying Sender instance.
+     */
+    public Sender getSender() {
+        return new UnmodifiableSender(getEndpoint());
+    }
+
+    /**
+     * @return the assigned address of this sender.
+     */
+    public String getAddress() {
+        return address;
+    }
+
+    //----- Sender configuration ---------------------------------------------//
+
+    /**
+     * @return will messages be settle on send.
+     */
+    public boolean isPresettle() {
+        return presettle;
+    }
+
+    /**
+     * Configure is sent messages are marked as settled on send, defaults to false.
+     *
+     * @param presettle
+     * 		  configure if this sender will presettle all sent messages.
+     */
+    public void setPresettle(boolean presettle) {
+        this.presettle = presettle;
+    }
+
+    /**
+     * @return the currently configured send timeout.
+     */
+    public long getSendTimeout() {
+        return sendTimeout;
+    }
+
+    /**
+     * Sets the amount of time the sender will block on a send before failing.
+     *
+     * @param sendTimeout
+     *        time in milliseconds to wait.
+     */
+    public void setSendTimeout(long sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
+
+    //----- Private Sender implementation ------------------------------------//
+
+    private void checkClosed() {
+        if (isClosed()) {
+            throw new IllegalStateException("Sender is already closed");
+        }
+    }
+
+    @Override
+    protected void doOpen() {
+
+        Symbol[] outcomes = new Symbol[]{ACCEPTED_DESCRIPTOR_SYMBOL, REJECTED_DESCRIPTOR_SYMBOL};
+        Source source = new Source();
+        source.setAddress(senderId);
+        source.setOutcomes(outcomes);
+
+        Target target = new Target();
+        target.setAddress(address);
+
+        String senderName = senderId + ":" + address;
+
+        Sender sender = session.getEndpoint().sender(senderName);
+        sender.setSource(source);
+        sender.setTarget(target);
+        if (presettle) {
+            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+        } else {
+            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        }
+        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        setEndpoint(sender);
+
+        super.doOpen();
+    }
+
+    @Override
+    protected void doOpenCompletion() {
+        // Verify the attach response contained a non-null target
+        org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
+        if (t != null) {
+            super.doOpenCompletion();
+        } else {
+            // No link terminus was created, the peer will now detach/close us.
+        }
+    }
+
+    @Override
+    protected void doOpenInspection() {
+        getStateInspector().inspectOpenedResource(getSender());
+    }
+
+    @Override
+    protected void doClosedInspection() {
+        getStateInspector().inspectClosedResource(getSender());
+    }
+
+    @Override
+    protected void doDetachedInspection() {
+        getStateInspector().inspectDetachedResource(getSender());
+    }
+
+    @Override
+    protected Exception getOpenAbortException() {
+        // Verify the attach response contained a non-null target
+        org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
+        if (t != null) {
+            return super.getOpenAbortException();
+        } else {
+            // No link terminus was created, the peer has detach/closed us, create IDE.
+            return new InvalidDestinationException("Link creation was refused");
+        }
+    }
+
+    private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
+
+        LOG.trace("Producer sending message: {}", message);
+
+        byte[] tag = tagGenerator.getNextTag();
+        Delivery delivery = null;
+
+        if (presettle) {
+            delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
+        } else {
+            delivery = getEndpoint().delivery(tag, 0, tag.length);
+        }
+
+        delivery.setContext(request);
+
+        encodeAndSend(message.getWrappedMessage(), delivery);
+
+        if (presettle) {
+            delivery.settle();
+            request.onSuccess();
+        } else {
+            pending.add(delivery);
+            getEndpoint().advance();
+        }
+    }
+
+    private void encodeAndSend(Message message, Delivery delivery) throws IOException {
+
+        int encodedSize;
+        while (true) {
+            try {
+                encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
+                break;
+            } catch (java.nio.BufferOverflowException e) {
+                encodeBuffer = new byte[encodeBuffer.length * 2];
+            }
+        }
+
+        int sentSoFar = 0;
+
+        while (true) {
+            int sent = getEndpoint().send(encodeBuffer, sentSoFar, encodedSize - sentSoFar);
+            if (sent > 0) {
+                sentSoFar += sent;
+                if ((encodedSize - sentSoFar) == 0) {
+                    break;
+                }
+            } else {
+                LOG.warn("{} failed to send any data from current Message.", this);
+            }
+        }
+    }
+
+    @Override
+    public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+        List<Delivery> toRemove = new ArrayList<Delivery>();
+
+        for (Delivery delivery : pending) {
+            DeliveryState state = delivery.getRemoteState();
+            if (state == null) {
+                continue;
+            }
+
+            Outcome outcome = null;
+            if (state instanceof TransactionalState) {
+                LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
+                outcome = ((TransactionalState) state).getOutcome();
+            } else if (state instanceof Outcome) {
+                outcome = (Outcome) state;
+            } else {
+                LOG.warn("Message send updated with unsupported state: {}", state);
+                continue;
+            }
+
+            AsyncResult request = (AsyncResult) delivery.getContext();
+
+            if (outcome instanceof Accepted) {
+                toRemove.add(delivery);
+                LOG.trace("Outcome of delivery was accepted: {}", delivery);
+                tagGenerator.returnTag(delivery.getTag());
+                if (request != null && !request.isComplete()) {
+                    request.onSuccess();
+                }
+            } else if (outcome instanceof Rejected) {
+                Exception remoteError = getRemoteError();
+                toRemove.add(delivery);
+                LOG.trace("Outcome of delivery was rejected: {}", delivery);
+                tagGenerator.returnTag(delivery.getTag());
+                if (request != null && !request.isComplete()) {
+                    request.onFailure(remoteError);
+                } else {
+                    connection.fireClientException(getRemoteError());
+                }
+            } else {
+                LOG.warn("Message send updated with unsupported outcome: {}", outcome);
+            }
+        }
+
+        pending.removeAll(toRemove);
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "{ address = " + address + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
new file mode 100644
index 0000000..9368b26
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Session class that manages a Proton session endpoint.
+ */
+public class AmqpSession extends AmqpAbstractResource<Session> {
+
+    private final AtomicLong receiverIdGenerator = new AtomicLong();
+    private final AtomicLong senderIdGenerator = new AtomicLong();
+
+    private final AmqpConnection connection;
+    private final String sessionId;
+
+    /**
+     * Create a new session instance.
+     *
+     * @param connection
+     * 		  The parent connection that created the session.
+     * @param sessionId
+     *        The unique ID value assigned to this session.
+     */
+    public AmqpSession(AmqpConnection connection, String sessionId) {
+        this.connection = connection;
+        this.sessionId = sessionId;
+    }
+
+    /**
+     * Create a sender instance using the given address
+     *
+     * @param address
+     * 	      the address to which the sender will produce its messages.
+     *
+     * @return a newly created sender that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the sender.
+     */
+    public AmqpSender createSender(final String address) throws Exception {
+        checkClosed();
+
+        final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId());
+        final ClientFuture request = new ClientFuture();
+
+        connection.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                sender.open(request);
+                pumpToProtonTransport();
+            }
+        });
+
+        request.sync();
+
+        return sender;
+    }
+
+    /**
+     * Create a receiver instance using the given address
+     *
+     * @param address
+     * 	      the address to which the receiver will subscribe for its messages.
+     *
+     * @return a newly created receiver that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the receiver.
+     */
+    public AmqpReceiver createReceiver(String address) throws Exception {
+        checkClosed();
+
+        final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId());
+        final ClientFuture request = new ClientFuture();
+
+        connection.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                receiver.open(request);
+                pumpToProtonTransport();
+            }
+        });
+
+        request.sync();
+
+        return receiver;
+    }
+
+    /**
+     * @return this session's parent AmqpConnection.
+     */
+    public AmqpConnection getConnection() {
+        return connection;
+    }
+
+    public Session getSession() {
+        return new UnmodifiableSession(getEndpoint());
+    }
+
+    //----- Internal getters used from the child AmqpResource classes --------//
+
+    ScheduledExecutorService getScheduler() {
+        return connection.getScheduler();
+    }
+
+    Connection getProtonConnection() {
+        return connection.getProtonConnection();
+    }
+
+    void pumpToProtonTransport() {
+        connection.pumpToProtonTransport();
+    }
+
+    //----- Private implementation details -----------------------------------//
+
+    @Override
+    protected void doOpenInspection() {
+        getStateInspector().inspectOpenedResource(getSession());
+    }
+
+    @Override
+    protected void doClosedInspection() {
+        getStateInspector().inspectClosedResource(getSession());
+    }
+
+    private String getNextSenderId() {
+        return sessionId + ":" + senderIdGenerator.incrementAndGet();
+    }
+
+    private String getNextReceiverId() {
+        return sessionId + ":" + receiverIdGenerator.incrementAndGet();
+    }
+
+    private void checkClosed() {
+        if (isClosed()) {
+            throw new IllegalStateException("Session is already closed");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpSession { " + sessionId + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java
new file mode 100644
index 0000000..5471876
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Abstract base for a validation hook that is used in tests to check
+ * the state of a remote resource after a variety of lifecycle events.
+ */
+public class AmqpStateInspector {
+
+    private boolean valid = true;
+    private String errorMessage;
+
+    public void inspectOpenedResource(Connection connection) {
+
+    }
+
+    public void inspectOpenedResource(Session session) {
+
+    }
+
+    public void inspectOpenedResource(Link link) {
+
+    }
+
+    public void inspectClosedResource(Connection remoteConnection) {
+
+    }
+
+    public void inspectClosedResource(Session session) {
+
+    }
+
+    public void inspectClosedResource(Link link) {
+
+    }
+
+    public void inspectDetachedResource(Link link) {
+
+    }
+
+    public boolean isValid() {
+        return valid;
+    }
+
+    protected void setValid(boolean valid) {
+        this.valid = valid;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    protected void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    protected void markAsInvalid(String errorMessage) {
+        if (valid) {
+            setValid(false);
+            setErrorMessage(errorMessage);
+        }
+    }
+
+    public void assertIfStateChecksFailed() {
+        if (!isValid()) {
+            throw new AssertionError(errorMessage);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
new file mode 100644
index 0000000..08db018
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * Utility class that can generate and if enabled pool the binary tag values
+ * used to identify transfers over an AMQP link.
+ */
+public final class AmqpTransferTagGenerator {
+
+    public static final int DEFAULT_TAG_POOL_SIZE = 1024;
+
+    private long nextTagId;
+    private int maxPoolSize = DEFAULT_TAG_POOL_SIZE;
+
+    private final Set<byte[]> tagPool;
+
+    public AmqpTransferTagGenerator() {
+        this(false);
+    }
+
+    public AmqpTransferTagGenerator(boolean pool) {
+        if (pool) {
+            this.tagPool = new LinkedHashSet<byte[]>();
+        } else {
+            this.tagPool = null;
+        }
+    }
+
+    /**
+     * Retrieves the next available tag.
+     *
+     * @return a new or unused tag depending on the pool option.
+     */
+    public byte[] getNextTag() {
+        byte[] rc;
+        if (tagPool != null && !tagPool.isEmpty()) {
+            final Iterator<byte[]> iterator = tagPool.iterator();
+            rc = iterator.next();
+            iterator.remove();
+        } else {
+            try {
+                rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                // This should never happen since we control the input.
+                throw new RuntimeException(e);
+            }
+        }
+        return rc;
+    }
+
+    /**
+     * When used as a pooled cache of tags the unused tags should always be returned once
+     * the transfer has been settled.
+     *
+     * @param data
+     *        a previously borrowed tag that is no longer in use.
+     */
+    public void returnTag(byte[] data) {
+        if (tagPool != null && tagPool.size() < maxPoolSize) {
+            tagPool.add(data);
+        }
+    }
+
+    /**
+     * Gets the current max pool size value.
+     *
+     * @return the current max tag pool size.
+     */
+    public int getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    /**
+     * Sets the max tag pool size.  If the size is smaller than the current number
+     * of pooled tags the pool will drain over time until it matches the max.
+     *
+     * @param maxPoolSize
+     *        the maximum number of tags to hold in the pool.
+     */
+    public void setMaxPoolSize(int maxPoolSize) {
+        this.maxPoolSize = maxPoolSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
new file mode 100644
index 0000000..953038a
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for SASL Authentication Mechanism that implements the basic
+ * methods of a Mechanism class.
+ */
+public abstract class AbstractMechanism implements Mechanism {
+
+    protected static final byte[] EMPTY = new byte[0];
+
+    private String username;
+    private String password;
+    private Map<String, Object> properties = new HashMap<String, Object>();
+
+    @Override
+    public int compareTo(Mechanism other) {
+
+        if (getPriority() < other.getPriority()) {
+            return -1;
+        } else if (getPriority() > other.getPriority()) {
+            return 1;
+        }
+
+        return 0;
+    }
+
+    @Override
+    public void setUsername(String value) {
+        this.username = value;
+    }
+
+    @Override
+    public String getUsername() {
+        return username;
+    }
+
+    @Override
+    public void setPassword(String value) {
+        this.password = value;
+    }
+
+    @Override
+    public String getPassword() {
+        return this.password;
+    }
+
+    @Override
+    public void setProperties(Map<String, Object> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() {
+        return this.properties;
+    }
+
+    @Override
+    public String toString() {
+        return "SASL-" + getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
new file mode 100644
index 0000000..8ac8b61
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+/**
+ * Implements the Anonymous SASL authentication mechanism.
+ */
+public class AnonymousMechanism extends AbstractMechanism {
+
+    @Override
+    public byte[] getInitialResponse() {
+        return EMPTY;
+    }
+
+    @Override
+    public byte[] getChallengeResponse(byte[] challenge) {
+        return EMPTY;
+    }
+
+    @Override
+    public int getPriority() {
+        return PRIORITY.LOWEST.getValue();
+    }
+
+    @Override
+    public String getName() {
+        return "ANONYMOUS";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
new file mode 100644
index 0000000..638fb2e
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import java.io.UnsupportedEncodingException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.security.sasl.SaslException;
+
+/**
+ * Implements the SASL PLAIN authentication Mechanism.
+ *
+ * User name and Password values are sent without being encrypted.
+ */
+public class CramMD5Mechanism extends AbstractMechanism {
+
+    private static final String ASCII = "ASCII";
+    private static final String HMACMD5 = "HMACMD5";
+    private boolean sentResponse;
+
+    @Override
+    public int getPriority() {
+        return PRIORITY.HIGH.getValue();
+    }
+
+    @Override
+    public String getName() {
+        return "CRAM-MD5";
+    }
+
+    @Override
+    public byte[] getInitialResponse() {
+        return EMPTY;
+    }
+
+    @Override
+    public byte[] getChallengeResponse(byte[] challenge) throws SaslException {
+        if (!sentResponse && challenge != null && challenge.length != 0) {
+            try {
+                SecretKeySpec key = new SecretKeySpec(getPassword().getBytes(ASCII), HMACMD5);
+                Mac mac = Mac.getInstance(HMACMD5);
+                mac.init(key);
+
+                byte[] bytes = mac.doFinal(challenge);
+
+                StringBuffer hash = new StringBuffer(getUsername());
+                hash.append(' ');
+                for (int i = 0; i < bytes.length; i++) {
+                    String hex = Integer.toHexString(0xFF & bytes[i]);
+                    if (hex.length() == 1) {
+                        hash.append('0');
+                    }
+                    hash.append(hex);
+                }
+
+                sentResponse = true;
+                return hash.toString().getBytes(ASCII);
+            } catch (UnsupportedEncodingException e) {
+                throw new SaslException("Unable to utilise required encoding", e);
+            } catch (InvalidKeyException e) {
+                throw new SaslException("Unable to utilise key", e);
+            } catch (NoSuchAlgorithmException e) {
+                throw new SaslException("Unable to utilise required algorithm", e);
+            }
+        } else {
+            return EMPTY;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
new file mode 100644
index 0000000..e1296f9
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import java.util.Map;
+
+import javax.security.sasl.SaslException;
+
+/**
+ * Interface for all SASL authentication mechanism implementations.
+ */
+public interface Mechanism extends Comparable<Mechanism> {
+
+    /**
+     * Relative priority values used to arrange the found SASL
+     * mechanisms in a preferred order where the level of security
+     * generally defines the preference.
+     */
+    public enum PRIORITY {
+        LOWEST(0),
+        LOW(1),
+        MEDIUM(2),
+        HIGH(3),
+        HIGHEST(4);
+
+        private final int value;
+
+        private PRIORITY(int value) {
+            this.value = value;
+        }
+
+        public int getValue() {
+            return value;
+       }
+    };
+
+    /**
+     * @return return the relative priority of this SASL mechanism.
+     */
+    int getPriority();
+
+    /**
+     * @return the well known name of this SASL mechanism.
+     */
+    String getName();
+
+    /**
+     * @return the response buffer used to answer the initial SASL cycle.
+     * @throws SaslException if an error occurs computing the response.
+     */
+    byte[] getInitialResponse() throws SaslException;
+
+    /**
+     * Create a response based on a given challenge from the remote peer.
+     *
+     * @param challenge
+     *        the challenge that this Mechanism should response to.
+     *
+     * @return the response that answers the given challenge.
+     * @throws SaslException if an error occurs computing the response.
+     */
+    byte[] getChallengeResponse(byte[] challenge) throws SaslException;
+
+    /**
+     * Sets the user name value for this Mechanism.  The Mechanism can ignore this
+     * value if it does not utilize user name in it's authentication processing.
+     *
+     * @param username
+     *        The user name given.
+     */
+    void setUsername(String value);
+
+    /**
+     * Returns the configured user name value for this Mechanism.
+     *
+     * @return the currently set user name value for this Mechanism.
+     */
+    String getUsername();
+
+    /**
+     * Sets the password value for this Mechanism.  The Mechanism can ignore this
+     * value if it does not utilize a password in it's authentication processing.
+     *
+     * @param username
+     *        The user name given.
+     */
+    void setPassword(String value);
+
+    /**
+     * Returns the configured password value for this Mechanism.
+     *
+     * @return the currently set password value for this Mechanism.
+     */
+    String getPassword();
+
+    /**
+     * Sets any additional Mechanism specific properties using a Map<String, Object>
+     *
+     * @param options
+     *        the map of additional properties that this Mechanism should utilize.
+     */
+    void setProperties(Map<String, Object> options);
+
+    /**
+     * The currently set Properties for this Mechanism.
+     *
+     * @return the current set of configuration Properties for this Mechanism.
+     */
+    Map<String, Object> getProperties();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
new file mode 100644
index 0000000..ce26124
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+/**
+ * Implements the SASL PLAIN authentication Mechanism.
+ *
+ * User name and Password values are sent without being encrypted.
+ */
+public class PlainMechanism extends AbstractMechanism {
+
+    @Override
+    public int getPriority() {
+        return PRIORITY.MEDIUM.getValue();
+    }
+
+    @Override
+    public String getName() {
+        return "PLAIN";
+    }
+
+    @Override
+    public byte[] getInitialResponse() {
+
+        String username = getUsername();
+        String password = getPassword();
+
+        if (username == null) {
+            username = "";
+        }
+
+        if (password == null) {
+            password = "";
+        }
+
+        byte[] usernameBytes = username.getBytes();
+        byte[] passwordBytes = password.getBytes();
+        byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2];
+        System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
+        System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length);
+        return data;
+    }
+
+    @Override
+    public byte[] getChallengeResponse(byte[] challenge) {
+        return EMPTY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
new file mode 100644
index 0000000..818ddff
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.sasl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.jms.JMSSecurityException;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.proton.engine.Sasl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage the SASL authentication process
+ */
+public class SaslAuthenticator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SaslAuthenticator.class);
+
+    private final Sasl sasl;
+    private final String username;
+    private final String password;
+    private Mechanism mechanism;
+
+    /**
+     * Create the authenticator and initialize it.
+     *
+     * @param sasl
+     *        The Proton SASL entry point this class will use to manage the authentication.
+     * @param username
+     *        The user name that will be used to authenticate.
+     * @param password
+     *        The password that will be used to authenticate.
+     */
+    public SaslAuthenticator(Sasl sasl, String username, String password) {
+        this.sasl = sasl;
+        this.username = username;
+        this.password = password;
+    }
+
+    /**
+     * Process the SASL authentication cycle until such time as an outcome is determine. This
+     * method must be called by the managing entity until the return value is true indicating a
+     * successful authentication or a JMSSecurityException is thrown indicating that the
+     * handshake failed.
+     *
+     * @throws JMSSecurityException
+     */
+    public boolean authenticate() throws SecurityException {
+        switch (sasl.getState()) {
+            case PN_SASL_IDLE:
+                handleSaslInit();
+                break;
+            case PN_SASL_STEP:
+                handleSaslStep();
+                break;
+            case PN_SASL_FAIL:
+                handleSaslFail();
+                break;
+            case PN_SASL_PASS:
+                return true;
+            default:
+        }
+
+        return false;
+    }
+
+    private void handleSaslInit() throws SecurityException {
+        try {
+            String[] remoteMechanisms = sasl.getRemoteMechanisms();
+            if (remoteMechanisms != null && remoteMechanisms.length != 0) {
+                mechanism = findMatchingMechanism(remoteMechanisms);
+                if (mechanism != null) {
+                    mechanism.setUsername(username);
+                    mechanism.setPassword(password);
+                    // TODO - set additional options from URI.
+                    // TODO - set a host value.
+
+                    sasl.setMechanisms(mechanism.getName());
+                    byte[] response = mechanism.getInitialResponse();
+                    if (response != null && response.length != 0) {
+                        sasl.send(response, 0, response.length);
+                    }
+                } else {
+                    // TODO - Better error message.
+                    throw new SecurityException("Could not find a matching SASL mechanism for the remote peer.");
+                }
+            }
+        } catch (SaslException se) {
+            // TODO - Better error message.
+            SecurityException jmsse = new SecurityException("Exception while processing SASL init.");
+            jmsse.initCause(se);
+            throw jmsse;
+        }
+    }
+
+    private Mechanism findMatchingMechanism(String...remoteMechanisms) {
+
+        Mechanism match = null;
+        List<Mechanism> found = new ArrayList<Mechanism>();
+
+        for (String remoteMechanism : remoteMechanisms) {
+            if (remoteMechanism.equalsIgnoreCase("PLAIN")) {
+                found.add(new PlainMechanism());
+            } else if (remoteMechanism.equalsIgnoreCase("ANONYMOUS")) {
+                found.add(new AnonymousMechanism());
+            } else if (remoteMechanism.equalsIgnoreCase("CRAM-MD5")) {
+                found.add(new CramMD5Mechanism());
+            } else {
+                LOG.debug("Unknown remote mechanism {}, skipping", remoteMechanism);
+            }
+        }
+
+        if (!found.isEmpty()) {
+            // Sorts by priority using Mechanism comparison and return the last value in
+            // list which is the Mechanism deemed to be the highest priority match.
+            Collections.sort(found);
+            match = found.get(found.size() - 1);
+        }
+
+        LOG.info("Best match for SASL auth was: {}", match);
+
+        return match;
+    }
+
+    private void handleSaslStep() throws SecurityException {
+        try {
+            if (sasl.pending() != 0) {
+                byte[] challenge = new byte[sasl.pending()];
+                sasl.recv(challenge, 0, challenge.length);
+                byte[] response = mechanism.getChallengeResponse(challenge);
+                sasl.send(response, 0, response.length);
+            }
+        } catch (SaslException se) {
+            // TODO - Better error message.
+            SecurityException jmsse = new SecurityException("Exception while processing SASL step.");
+            jmsse.initCause(se);
+            throw jmsse;
+        }
+    }
+
+    private void handleSaslFail() throws SecurityException {
+        // TODO - Better error message.
+        throw new SecurityException("Client failed to authenticate");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
new file mode 100644
index 0000000..7327ba6
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+/**
+ * Defines a result interface for Asynchronous operations.
+ */
+public interface AsyncResult {
+
+    /**
+     * If the operation fails this method is invoked with the Exception
+     * that caused the failure.
+     *
+     * @param result
+     *        The error that resulted in this asynchronous operation failing.
+     */
+    void onFailure(Throwable result);
+
+    /**
+     * If the operation succeeds the resulting value produced is set to null and
+     * the waiting parties are signaled.
+     */
+    void onSuccess();
+
+    /**
+     * Returns true if the AsyncResult has completed.  The task is considered complete
+     * regardless if it succeeded or failed.
+     *
+     * @return returns true if the asynchronous operation has completed.
+     */
+    boolean isComplete();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
new file mode 100644
index 0000000..9f83a1d
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.util.IOExceptionSupport;
+
+/**
+ * Asynchronous Client Future class.
+ */
+public class ClientFuture extends WrappedAsyncResult {
+
+    protected final CountDownLatch latch = new CountDownLatch(1);
+    protected Throwable error;
+
+    public ClientFuture() {
+        super(null);
+    }
+
+    public ClientFuture(AsyncResult watcher) {
+        super(watcher);
+    }
+
+    @Override
+    public boolean isComplete() {
+        return latch.getCount() == 0;
+    }
+
+    @Override
+    public void onFailure(Throwable result) {
+        error = result;
+        latch.countDown();
+        super.onFailure(result);
+    }
+
+    @Override
+    public void onSuccess() {
+        latch.countDown();
+        super.onSuccess();
+    }
+
+    /**
+     * Timed wait for a response to a pending operation.
+     *
+     * @param amount
+     *        The amount of time to wait before abandoning the wait.
+     * @param unit
+     *        The unit to use for this wait period.
+     *
+     * @throws IOException if an error occurs while waiting for the response.
+     */
+    public void sync(long amount, TimeUnit unit) throws IOException {
+        try {
+            latch.await(amount, unit);
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+
+        failOnError();
+    }
+
+    /**
+     * Waits for a response to some pending operation.
+     *
+     * @throws IOException if an error occurs while waiting for the response.
+     */
+    public void sync() throws IOException {
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+
+        failOnError();
+    }
+
+    private void failOnError() throws IOException {
+        Throwable cause = error;
+        if (cause != null) {
+            throw IOExceptionSupport.create(cause);
+        }
+    }
+}


[3/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5602

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5602

Functional client with added tests to start to cover various
expectations of an AMQP broker and some tests for expectations of a JMS
mapping compliant broker. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/72839b78
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/72839b78
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/72839b78

Branch: refs/heads/master
Commit: 72839b78a727bdec96dbd0a824a9f39a745b4d87
Parents: 10c47d6
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 13 15:47:30 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Mar 13 15:47:30 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   |  92 +--
 .../activemq/transport/amqp/AmqpSupport.java    | 130 +++-
 .../transport/amqp/AmqpTestSupport.java         |  48 +-
 .../amqp/client/AmqpAbstractResource.java       | 314 ++++++++++
 .../transport/amqp/client/AmqpClient.java       | 240 ++++++++
 .../amqp/client/AmqpClientListener.java         |  32 +
 .../amqp/client/AmqpClientTestSupport.java      |  77 +++
 .../transport/amqp/client/AmqpConnection.java   | 532 ++++++++++++++++
 .../amqp/client/AmqpDefaultClientListener.java  |  28 +
 .../amqp/client/AmqpJmsSelectorType.java        |  47 ++
 .../transport/amqp/client/AmqpMessage.java      | 179 ++++++
 .../transport/amqp/client/AmqpNoLocalType.java  |  44 ++
 .../transport/amqp/client/AmqpReceiver.java     | 599 +++++++++++++++++++
 .../transport/amqp/client/AmqpResource.java     | 163 +++++
 .../transport/amqp/client/AmqpSender.java       | 382 ++++++++++++
 .../transport/amqp/client/AmqpSession.java      | 168 ++++++
 .../amqp/client/AmqpStateInspector.java         |  88 +++
 .../amqp/client/AmqpTransferTagGenerator.java   | 103 ++++
 .../amqp/client/sasl/AbstractMechanism.java     |  80 +++
 .../amqp/client/sasl/AnonymousMechanism.java    |  43 ++
 .../amqp/client/sasl/CramMD5Mechanism.java      |  86 +++
 .../transport/amqp/client/sasl/Mechanism.java   | 125 ++++
 .../amqp/client/sasl/PlainMechanism.java        |  62 ++
 .../amqp/client/sasl/SaslAuthenticator.java     | 163 +++++
 .../transport/amqp/client/util/AsyncResult.java |  47 ++
 .../amqp/client/util/ClientFuture.java          | 102 ++++
 .../amqp/client/util/ClientTcpTransport.java    | 384 ++++++++++++
 .../client/util/UnmodifiableConnection.java     | 179 ++++++
 .../amqp/client/util/UnmodifiableDelivery.java  | 147 +++++
 .../amqp/client/util/UnmodifiableLink.java      | 248 ++++++++
 .../amqp/client/util/UnmodifiableReceiver.java  |  59 ++
 .../amqp/client/util/UnmodifiableSender.java    |  45 ++
 .../amqp/client/util/UnmodifiableSession.java   | 134 +++++
 .../amqp/client/util/WrappedAsyncResult.java    |  59 ++
 .../amqp/interop/AmqpConnectionsTest.java       | 170 ++++++
 .../amqp/interop/AmqpReceiverTest.java          | 285 +++++++++
 .../transport/amqp/interop/AmqpSenderTest.java  |  94 +++
 .../transport/amqp/interop/AmqpSessionTest.java |  40 ++
 38 files changed, 5730 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index b9b2ff2..39c8c2b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -16,6 +16,20 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX;
+import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
+import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes;
+import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
@@ -90,7 +104,6 @@ import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Modified;
@@ -136,19 +149,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
     private static final int CHANNEL_MAX = 32767;
-    private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
-    private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
-    private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
-    private static final Symbol COPY = Symbol.getSymbol("copy");
-    private static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
-    private static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
-    private static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[] { JMS_SELECTOR_CODE, JMS_SELECTOR_NAME };
-    private static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L);
-    private static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:selector-filter:string");
-    private static final Object[] NO_LOCAL_FILTER_IDS = new Object[] { NO_LOCAL_CODE, NO_LOCAL_NAME };
-    private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
-    private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
-    private static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
 
     private final AmqpTransport amqpTransport;
     private final AmqpWireFormat amqpWireFormat;
@@ -874,17 +874,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
     private final AtomicLong nextTransactionId = new AtomicLong();
 
-    public byte[] toBytes(long value) {
-        Buffer buffer = new Buffer(8);
-        buffer.bigEndianEditor().writeLong(value);
-        return buffer.data;
-    }
-
-    private long toLong(Binary value) {
-        Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength());
-        return buffer.bigEndianEditor().readLong();
-    }
-
     AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
 
         @Override
@@ -946,7 +935,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                         if (response.isException()) {
                             ExceptionResponse er = (ExceptionResponse) response;
                             Rejected rejected = new Rejected();
-                            rejected.setError(createErrorCondition("failed", er.getException().getMessage()));
+                            rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage()));
                             delivery.disposition(rejected);
                         } else {
                             delivery.disposition(Accepted.getInstance());
@@ -1639,46 +1628,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         });
     }
 
-    private boolean contains(Symbol[] symbols, Symbol key) {
-        if (symbols == null || symbols.length == 0) {
-            return false;
-        }
-
-        for (Symbol symbol : symbols) {
-            if (symbol.equals(key)) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    private DescribedType findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
-
-        if (filterIds == null || filterIds.length == 0) {
-            throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds);
-        }
-
-        if (filters == null || filters.isEmpty()) {
-            return null;
-        }
-
-        for (Object value : filters.values()) {
-            if (value instanceof DescribedType) {
-                DescribedType describedType = ((DescribedType) value);
-                Object descriptor = describedType.getDescriptor();
-
-                for (Object filterId : filterIds) {
-                    if (descriptor.equals(filterId)) {
-                        return describedType;
-                    }
-                }
-            }
-        }
-
-        return null;
-    }
-
     // //////////////////////////////////////////////////////////////////////////
     //
     // Implementation methods
@@ -1707,17 +1656,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
     }
 
-    ErrorCondition createErrorCondition(String name) {
-        return createErrorCondition(name, "");
-    }
-
-    ErrorCondition createErrorCondition(String name, String description) {
-        ErrorCondition condition = new ErrorCondition();
-        condition.setCondition(Symbol.valueOf(name));
-        condition.setDescription(description);
-        return condition;
-    }
-
     @Override
     public void setPrefetch(int prefetch) {
         this.prefetch = prefetch;

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
index 9a01f7b..c0cfb94 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -17,19 +17,116 @@
 package org.apache.activemq.transport.amqp;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.fusesource.hawtbuf.Buffer;
 
 /**
- *
+ * Set of useful methods and definitions used in the AMQP protocol handling
  */
 public class AmqpSupport {
 
-    static public Buffer toBuffer(ByteBuffer data) {
+    // Identification values used to locating JMS selector types.
+    public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
+    public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
+    public static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[] { JMS_SELECTOR_CODE, JMS_SELECTOR_NAME };
+    public static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L);
+    public static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:selector-filter:string");
+    public static final Object[] NO_LOCAL_FILTER_IDS = new Object[] { NO_LOCAL_CODE, NO_LOCAL_NAME };
+
+    // Capabilities used to identify destination type in some requests.
+    public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
+    public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
+
+    // Symbols used to announce connection information to remote peer.
+    public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+    public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+    public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
+    public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
+
+    // Symbols used in configuration of newly opened links.
+    public static final Symbol COPY = Symbol.getSymbol("copy");
+
+    /**
+     * Search for a given Symbol in a given array of Symbol object.
+     *
+     * @param symbols
+     *        the set of Symbols to search.
+     * @param key
+     *        the value to try and find in the Symbol array.
+     *
+     * @return true if the key is found in the given Symbol array.
+     */
+    public static boolean contains(Symbol[] symbols, Symbol key) {
+        if (symbols == null || symbols.length == 0) {
+            return false;
+        }
+
+        for (Symbol symbol : symbols) {
+            if (symbol.equals(key)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Search for a particular filter using a set of known indentification values
+     * in the Map of filters.
+     *
+     * @param filters
+     *        The filters map that should be searched.
+     * @param filterIds
+     *        The aliases for the target filter to be located.
+     *
+     * @return the filter if found in the mapping or null if not found.
+     */
+    public static DescribedType findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
+
+        if (filterIds == null || filterIds.length == 0) {
+            throw new IllegalArgumentException("Invalid Filter Ids array passed: " + filterIds);
+        }
+
+        if (filters == null || filters.isEmpty()) {
+            return null;
+        }
+
+        for (Object value : filters.values()) {
+            if (value instanceof DescribedType) {
+                DescribedType describedType = ((DescribedType) value);
+                Object descriptor = describedType.getDescriptor();
+
+                for (Object filterId : filterIds) {
+                    if (descriptor.equals(filterId)) {
+                        return describedType;
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Conversion from Java ByteBuffer to a HawtBuf buffer.
+     *
+     * @param data
+     *        the ByteBuffer instance to convert.
+     *
+     * @return a new HawtBuf buffer converted from the given ByteBuffer.
+     */
+    public static Buffer toBuffer(ByteBuffer data) {
         if (data == null) {
             return null;
         }
+
         Buffer rc;
+
         if (data.isDirect()) {
             rc = new Buffer(data.remaining());
             data.get(rc.data);
@@ -37,6 +134,35 @@ public class AmqpSupport {
             rc = new Buffer(data);
             data.position(data.position() + data.remaining());
         }
+
         return rc;
     }
+
+    /**
+     * Given a long value, convert it to a byte array for marshalling.
+     *
+     * @param value
+     *        the value to convert.
+     *
+     * @return a new byte array that holds the big endian value of the long.
+     */
+    public static byte[] toBytes(long value) {
+        Buffer buffer = new Buffer(8);
+        buffer.bigEndianEditor().writeLong(value);
+        return buffer.data;
+    }
+
+    /**
+     * Converts a Binary value to a long assuming that the contained value is
+     * stored in Big Endian encoding.
+     *
+     * @param value
+     *        the Binary object whose payload is converted to a long.
+     *
+     * @return a long value constructed from the bytes of the Binary instance.
+     */
+    public static long toLong(Binary value) {
+        Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength());
+        return buffer.bigEndianEditor().readLong();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index e20168c..33ae799 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -36,6 +36,7 @@ import javax.net.ssl.KeyManager;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
@@ -242,18 +243,47 @@ public class AmqpTestSupport {
         LOG.info("========== tearDown " + getTestName() + " ==========");
     }
 
-    public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer p = session.createProducer(destination);
+    public Connection createJMSConnection() throws JMSException {
+        if (!isUseOpenWireConnector()) {
+            throw new javax.jms.IllegalStateException("OpenWire TransportConnector was not configured.");
+        }
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireURI);
+
+        return factory.createConnection();
+    }
 
-        for (int i = 1; i <= count; i++) {
-            TextMessage message = session.createTextMessage();
-            message.setText("TextMessage: " + i);
-            message.setIntProperty(MESSAGE_NUMBER, i);
-            p.send(message);
+    public void sendMessages(String destinationName, int count, boolean topic) throws Exception {
+        Connection connection = createJMSConnection();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = null;
+            if (topic) {
+                destination = session.createTopic(destinationName);
+            } else {
+                destination = session.createQueue(destinationName);
+            }
+
+            sendMessages(connection, destination, count);
+        } finally {
+            connection.close();
         }
+    }
 
-        session.close();
+    public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try {
+            MessageProducer p = session.createProducer(destination);
+
+            for (int i = 1; i <= count; i++) {
+                TextMessage message = session.createTextMessage();
+                message.setText("TextMessage: " + i);
+                message.setIntProperty(MESSAGE_NUMBER, i);
+                p.send(message);
+            }
+        } finally {
+            session.close();
+        }
     }
 
     public String getTestName() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
new file mode 100644
index 0000000..8a5a587
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.io.IOException;
+
+import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base for all AmqpResource implementations to extend.
+ *
+ * This abstract class wraps up the basic state management bits so that the concrete
+ * object don't have to reproduce it.  Provides hooks for the subclasses to initialize
+ * and shutdown.
+ */
+public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class);
+
+    protected AsyncResult openRequest;
+    protected AsyncResult closeRequest;
+
+    private AmqpStateInspector amqpStateInspector = new AmqpStateInspector();
+
+    private E endpoint;
+
+    @Override
+    public void open(AsyncResult request) {
+        this.openRequest = request;
+        doOpen();
+        getEndpoint().setContext(this);
+    }
+
+    @Override
+    public boolean isOpen() {
+        return getEndpoint().getRemoteState() == EndpointState.ACTIVE;
+    }
+
+    @Override
+    public void opened() {
+        if (this.openRequest != null) {
+            this.openRequest.onSuccess();
+            this.openRequest = null;
+        }
+    }
+
+    @Override
+    public void close(AsyncResult request) {
+        // If already closed signal success or else the caller might never get notified.
+        if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
+            getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+
+            if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
+                // Remote already closed this resource, close locally and free.
+                if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
+                    doClose();
+                    getEndpoint().free();
+                }
+            }
+
+            request.onSuccess();
+            return;
+        }
+
+        this.closeRequest = request;
+        doClose();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return getEndpoint().getLocalState() == EndpointState.CLOSED;
+    }
+
+    @Override
+    public void closed() {
+        getEndpoint().close();
+        getEndpoint().free();
+
+        if (this.closeRequest != null) {
+            this.closeRequest.onSuccess();
+            this.closeRequest = null;
+        }
+    }
+
+    @Override
+    public void failed() {
+        failed(new Exception("Remote request failed."));
+    }
+
+    @Override
+    public void failed(Exception cause) {
+        if (openRequest != null) {
+            if (endpoint != null) {
+                // TODO: if this is a producer/consumer link then we may only be detached,
+                // rather than fully closed, and should respond appropriately.
+                endpoint.close();
+            }
+            openRequest.onFailure(cause);
+            openRequest = null;
+        }
+
+        if (closeRequest != null) {
+            closeRequest.onFailure(cause);
+            closeRequest = null;
+        }
+    }
+
+    @Override
+    public void remotelyClosed(AmqpConnection connection) {
+        Exception error = getRemoteError();
+        if (error == null) {
+            error = new IOException("Remote has closed without error information");
+        }
+
+        if (endpoint != null) {
+            // TODO: if this is a producer/consumer link then we may only be detached,
+            // rather than fully closed, and should respond appropriately.
+            endpoint.close();
+        }
+
+        LOG.info("Resource {} was remotely closed", this);
+
+        connection.fireClientException(error);
+    }
+
+    public E getEndpoint() {
+        return this.endpoint;
+    }
+
+    public void setEndpoint(E endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public AmqpStateInspector getStateInspector() {
+        return amqpStateInspector;
+    }
+
+    public void setStateInspector(AmqpStateInspector stateInspector) {
+        if (stateInspector == null) {
+            stateInspector = new AmqpStateInspector();
+        }
+
+        this.amqpStateInspector = stateInspector;
+    }
+
+    public EndpointState getLocalState() {
+        if (getEndpoint() == null) {
+            return EndpointState.UNINITIALIZED;
+        }
+        return getEndpoint().getLocalState();
+    }
+
+    public EndpointState getRemoteState() {
+        if (getEndpoint() == null) {
+            return EndpointState.UNINITIALIZED;
+        }
+        return getEndpoint().getRemoteState();
+    }
+
+    @Override
+    public boolean hasRemoteError() {
+        return getEndpoint().getRemoteCondition().getCondition() != null;
+    }
+
+    @Override
+    public Exception getRemoteError() {
+        String message = getRemoteErrorMessage();
+        Exception remoteError = null;
+        Symbol error = getEndpoint().getRemoteCondition().getCondition();
+        if (error != null) {
+            if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+                remoteError = new SecurityException(message);
+            } else {
+                remoteError = new Exception(message);
+            }
+        }
+
+        return remoteError;
+    }
+
+    @Override
+    public String getRemoteErrorMessage() {
+        String message = "Received unkown error from remote peer";
+        if (getEndpoint().getRemoteCondition() != null) {
+            ErrorCondition error = getEndpoint().getRemoteCondition();
+            if (error.getDescription() != null && !error.getDescription().isEmpty()) {
+                message = error.getDescription();
+            }
+        }
+
+        return message;
+    }
+
+    @Override
+    public void processRemoteOpen(AmqpConnection connection) throws IOException {
+        doOpenInspection();
+        doOpenCompletion();
+    }
+
+    @Override
+    public void processRemoteDetach(AmqpConnection connection) throws IOException {
+        doDetachedInspection();
+        if (isAwaitingClose()) {
+            LOG.debug("{} is now closed: ", this);
+            closed();
+        } else {
+            remotelyClosed(connection);
+        }
+    }
+
+    @Override
+    public void processRemoteClose(AmqpConnection connection) throws IOException {
+        doClosedInspection();
+        if (isAwaitingClose()) {
+            LOG.debug("{} is now closed: ", this);
+            closed();
+        } else if (isAwaitingOpen()) {
+            // Error on Open, create exception and signal failure.
+            LOG.warn("Open of {} failed: ", this);
+            Exception openError;
+            if (hasRemoteError()) {
+                openError = getRemoteError();
+            } else {
+                openError = getOpenAbortException();
+            }
+
+            failed(openError);
+        } else {
+            remotelyClosed(connection);
+        }
+    }
+
+    @Override
+    public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+    }
+
+    @Override
+    public void processFlowUpdates(AmqpConnection connection) throws IOException {
+    }
+
+    /**
+     * Perform the open operation on the managed endpoint.  A subclass may
+     * override this method to provide additional open actions or configuration
+     * updates.
+     */
+    protected void doOpen() {
+        getEndpoint().open();
+    }
+
+    /**
+     * Perform the close operation on the managed endpoint.  A subclass may
+     * override this method to provide additional close actions or alter the
+     * standard close path such as endpoint detach etc.
+     */
+    protected void doClose() {
+        getEndpoint().close();
+    }
+
+    /**
+     * Complete the open operation on the managed endpoint. A subclass may
+     * override this method to provide additional verification actions or configuration
+     * updates.
+     */
+    protected void doOpenCompletion() {
+        LOG.debug("{} is now open: ", this);
+        opened();
+    }
+
+    /**
+     * When aborting the open operation, and there isnt an error condition,
+     * provided by the peer, the returned exception will be used instead.
+     * A subclass may override this method to provide alternative behaviour.
+     */
+    protected Exception getOpenAbortException() {
+        return new IOException("Open failed unexpectedly.");
+    }
+
+    // TODO - Fina a more generic way to do this.
+    protected abstract void doOpenInspection();
+    protected abstract void doClosedInspection();
+
+    protected void doDetachedInspection() {}
+
+    //----- Private implementation utility methods ---------------------------//
+
+    private boolean isAwaitingOpen() {
+        return this.openRequest != null;
+    }
+
+    private boolean isAwaitingClose() {
+        return this.closeRequest != null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
new file mode 100644
index 0000000..0b299e4
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connection instance used to connect to the Broker using Proton as
+ * the AMQP protocol handler.
+ */
+public class AmqpClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpClient.class);
+
+    private final String username;
+    private final String password;
+    private final URI remoteURI;
+
+    private AmqpStateInspector stateInspector = new AmqpStateInspector();
+    private List<Symbol> offeredCapabilities = Collections.emptyList();
+    private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
+
+    /**
+     * Creates an AmqpClient instance which can be used as a factory for connections.
+     *
+     * @param remoteURI
+     *        The address of the remote peer to connect to.
+     * @param username
+     *	      The user name to use when authenticating the client.
+     * @param password
+     *		  The password to use when authenticating the client.
+     */
+    public AmqpClient(URI remoteURI, String username, String password) {
+        this.remoteURI = remoteURI;
+        this.password = password;
+        this.username = username;
+    }
+
+    /**
+     * Creates a connection with the broker at the given location, this method initiates a
+     * connect attempt immediately and will fail if the remote peer cannot be reached.
+     *
+     * @returns a new connection object used to interact with the connected peer.
+     *
+     * @throws Exception if an error occurs attempting to connect to the Broker.
+     */
+    public AmqpConnection connect() throws Exception {
+
+        AmqpConnection connection = createConnection();
+
+        LOG.debug("Attempting to create new connection to peer: {}", remoteURI);
+        connection.connect();
+
+        return connection;
+    }
+
+    /**
+     * Creates a connection object using the configured values for user, password, remote URI
+     * etc.  This method does not immediately initiate a connection to the remote leaving that
+     * to the caller which provides a connection object that can have additional configuration
+     * changes applied before the <code>connect</code> method is invoked.
+     *
+     * @returns a new connection object used to interact with the connected peer.
+     *
+     * @throws Exception if an error occurs attempting to connect to the Broker.
+     */
+    public AmqpConnection createConnection() throws Exception {
+        if (username == null && password != null) {
+            throw new IllegalArgumentException("Password must be null if user name value is null");
+        }
+
+        ClientTcpTransport transport = null;
+
+        if (remoteURI.getScheme().equals("tcp")) {
+            transport = new ClientTcpTransport(remoteURI);
+        } else {
+            throw new IllegalArgumentException("Client only support TCP currently.");
+        }
+
+        AmqpConnection connection = new AmqpConnection(transport, username, password);
+
+        connection.setOfferedCapabilities(getOfferedCapabilities());
+        connection.setOfferedProperties(getOfferedProperties());
+        connection.setStateInspector(getStateInspector());
+
+        return connection;
+    }
+
+    /**
+     * @return the user name value given when connect was called, always null before connect.
+     */
+    public String getUsername() {
+        return username;
+    }
+
+    /**
+     * @return the password value given when connect was called, always null before connect.
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * @return the currently set address to use to connect to the AMQP peer.
+     */
+    public URI getRemoteURI() {
+        return remoteURI;
+    }
+
+    /**
+     * Sets the offered capabilities that should be used when a new connection attempt
+     * is made.
+     *
+     * @param offeredCapabilities
+     *        the list of capabilities to offer when connecting.
+     */
+    public void setOfferedCapabilities(List<Symbol> offeredCapabilities) {
+        if (offeredCapabilities != null) {
+            offeredCapabilities = Collections.emptyList();
+        }
+
+        this.offeredCapabilities = offeredCapabilities;
+    }
+
+    /**
+     * @return an unmodifiable view of the currently set offered capabilities
+     */
+    public List<Symbol> getOfferedCapabilities() {
+        return Collections.unmodifiableList(offeredCapabilities);
+    }
+
+    /**
+     * Sets the offered connection properties that should be used when a new connection
+     * attempt is made.
+     *
+     * @param connectionProperties
+     *        the map of properties to offer when connecting.
+     */
+    public void setOfferedProperties(Map<Symbol, Object> offeredProperties) {
+        if (offeredProperties != null) {
+            offeredProperties = Collections.emptyMap();
+        }
+
+        this.offeredProperties = offeredProperties;
+    }
+
+    /**
+     * @return an unmodifiable view of the currently set connection properties.
+     */
+    public Map<Symbol, Object> getOfferedProperties() {
+        return Collections.unmodifiableMap(offeredProperties);
+    }
+
+    /**
+     * @return the currently set state inspector used to check state after various events.
+     */
+    public AmqpStateInspector getStateInspector() {
+        return stateInspector;
+    }
+
+    /**
+     * Sets the state inspector used to check that the AMQP resource is valid after
+     * specific lifecycle events such as open and close.
+     *
+     * @param stateInspector
+     *        the new state inspector to use.
+     */
+    public void setStateInspector(AmqpStateInspector stateInspector) {
+        if (stateInspector == null) {
+            stateInspector = new AmqpStateInspector();
+        }
+
+        this.stateInspector = stateInspector;
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpClient: " + getRemoteURI().getHost() + ":" + getRemoteURI().getPort();
+    }
+
+    /**
+     * Creates an anonymous connection with the broker at the given location.
+     *
+     * @param broker
+     *        the address of the remote broker instance.
+     *
+     * @returns a new connection object used to interact with the connected peer.
+     *
+     * @throws Exception if an error occurs attempting to connect to the Broker.
+     */
+    public static AmqpConnection connect(URI broker) throws Exception {
+        return connect(broker, null, null);
+    }
+
+    /**
+     * Creates a connection with the broker at the given location.
+     *
+     * @param broker
+     *        the address of the remote broker instance.
+     * @param username
+     *        the user name to use to connect to the broker or null for anonymous.
+     * @param password
+     *        the password to use to connect to the broker, must be null if user name is null.
+     *
+     * @returns a new connection object used to interact with the connected peer.
+     *
+     * @throws Exception if an error occurs attempting to connect to the Broker.
+     */
+    public static AmqpConnection connect(URI broker, String username, String password) throws Exception {
+        if (username == null && password != null) {
+            throw new IllegalArgumentException("Password must be null if user name value is null");
+        }
+
+        AmqpClient client = new AmqpClient(broker, username, password);
+
+        return client.connect();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java
new file mode 100644
index 0000000..3df7cf4
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+/**
+ * Events points exposed by the AmqpClient object.
+ */
+public interface AmqpClientListener {
+
+    /**
+     * Indicates some error has occurred during client operations.
+     *
+     * @param ex
+     *        The error that triggered this event.
+     */
+    void onClientException(Throwable ex);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
new file mode 100644
index 0000000..4d3f571
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import java.net.URI;
+
+import org.apache.activemq.transport.amqp.AmqpTestSupport;
+
+/**
+ * Test support class for tests that will be using the AMQP Proton wrapper client.
+ */
+public class AmqpClientTestSupport extends AmqpTestSupport {
+
+    public String getAmqpConnectionURIOptions() {
+        return "";
+    }
+
+    public URI getBrokerAmqpConnectionURI() {
+        try {
+            String uri = "tcp://127.0.0.1:" + amqpPort;
+
+            if (!getAmqpConnectionURIOptions().isEmpty()) {
+                uri = uri + "?" + getAmqpConnectionURIOptions();
+            }
+
+            return new URI(uri);
+        } catch (Exception e) {
+            throw new RuntimeException();
+        }
+    }
+
+    public AmqpConnection createAmqpConnection() throws Exception {
+        return createAmqpConnection(getBrokerAmqpConnectionURI());
+    }
+
+    public AmqpConnection createAmqpConnection(String username, String password) throws Exception {
+        return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
+    }
+
+    public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
+        return createAmqpConnection(brokerURI, null, null);
+    }
+
+    public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
+        return createAmqpClient(brokerURI, username, password).connect();
+    }
+
+    public AmqpClient createAmqpClient() throws Exception {
+        return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
+    }
+
+    public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
+        return createAmqpClient(brokerURI, null, null);
+    }
+
+    public AmqpClient createAmqpClient(String username, String password) throws Exception {
+        return createAmqpClient(getBrokerAmqpConnectionURI(), username, password);
+    }
+
+    public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
+        return new AmqpClient(brokerURI, username, password);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
new file mode 100644
index 0000000..a98f711
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
+import org.apache.activemq.transport.amqp.client.util.ClientFuture;
+import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Event.Type;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AmqpConnection extends AmqpAbstractResource<Connection> implements ClientTcpTransport.TransportListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
+
+    private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1;
+    // NOTE: Limit default channel max to signed short range to deal with
+    //       brokers that don't currently handle the unsigned range well.
+    private static final int DEFAULT_CHANNEL_MAX = 32767;
+    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
+    public static final long DEFAULT_CONNECT_TIMEOUT = 15000;
+    public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
+
+    private final ScheduledExecutorService serializer;
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicLong sessionIdGenerator = new AtomicLong();
+    private final Collector protonCollector = new CollectorImpl();
+    private final ClientTcpTransport transport;
+    private final Transport protonTransport = Transport.Factory.create();
+
+    private final String username;
+    private final String password;
+    private final URI remoteURI;
+    private final String connectionId;
+    private List<Symbol> offeredCapabilities = Collections.emptyList();
+    private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
+
+    private AmqpClientListener listener;
+    private SaslAuthenticator authenticator;
+
+    private String containerId;
+    private boolean authenticated;
+    private int channelMax = DEFAULT_CHANNEL_MAX;
+    private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+    private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
+
+    public AmqpConnection(ClientTcpTransport transport, String username, String password) {
+        setEndpoint(Connection.Factory.create());
+        getEndpoint().collect(protonCollector);
+
+        this.transport = transport;
+        this.username = username;
+        this.password = password;
+        this.connectionId = CONNECTION_ID_GENERATOR.generateId();
+        this.remoteURI = transport.getRemoteURI();
+
+        this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+            @Override
+            public Thread newThread(Runnable runner) {
+                Thread serial = new Thread(runner);
+                serial.setDaemon(true);
+                serial.setName(toString());
+                return serial;
+            }
+        });
+
+        this.transport.setTransportListener(this);
+    }
+
+    public void connect() throws Exception {
+        if (connected.compareAndSet(false, true)) {
+            transport.connect();
+
+            final ClientFuture future = new ClientFuture();
+            serializer.execute(new Runnable() {
+                @Override
+                public void run() {
+                    getEndpoint().setContainer(safeGetContainerId());
+                    getEndpoint().setHostname(remoteURI.getHost());
+                    if (!getOfferedCapabilities().isEmpty()) {
+                        getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0]));
+                    }
+                    if (!getOfferedProperties().isEmpty()) {
+                        getEndpoint().setProperties(getOfferedProperties());
+                    }
+
+                    protonTransport.setMaxFrameSize(getMaxFrameSize());
+                    protonTransport.setChannelMax(getChannelMax());
+                    protonTransport.bind(getEndpoint());
+                    Sasl sasl = protonTransport.sasl();
+                    if (sasl != null) {
+                        sasl.client();
+                    }
+                    authenticator = new SaslAuthenticator(sasl, username, password);
+                    open(future);
+
+                    pumpToProtonTransport();
+                }
+            });
+
+            if (connectTimeout <= 0) {
+                future.sync();
+            } else {
+                future.sync(connectTimeout, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    public boolean isConnected() {
+        return transport.isConnected() && connected.get();
+    }
+
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            final ClientFuture request = new ClientFuture();
+            serializer.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+
+                        // If we are not connected then there is nothing we can do now
+                        // just signal success.
+                        if (!transport.isConnected()) {
+                            request.onSuccess();
+                        }
+
+                        if (getEndpoint() != null) {
+                            close(request);
+                        } else {
+                            request.onSuccess();
+                        }
+
+                        pumpToProtonTransport();
+                    } catch (Exception e) {
+                        LOG.debug("Caught exception while closing proton connection");
+                    }
+                }
+            });
+
+            try {
+                if (closeTimeout <= 0) {
+                    request.sync();
+                } else {
+                    request.sync(closeTimeout, TimeUnit.MILLISECONDS);
+                }
+            } catch (IOException e) {
+                LOG.warn("Error caught while closing Provider: ", e.getMessage());
+            } finally {
+                if (transport != null) {
+                    try {
+                        transport.close();
+                    } catch (Exception e) {
+                        LOG.debug("Cuaght exception while closing down Transport: {}", e.getMessage());
+                    }
+                }
+
+                serializer.shutdown();
+            }
+        }
+    }
+
+    /**
+     * Creates a new Session instance used to create AMQP resources like
+     * senders and receivers.
+     *
+     * @return a new AmqpSession that can be used to create links.
+     *
+     * @throws Exception if an error occurs during creation.
+     */
+    public AmqpSession createSession() throws Exception {
+        checkClosed();
+
+        final AmqpSession session = new AmqpSession(AmqpConnection.this, getNextSessionId());
+        final ClientFuture request = new ClientFuture();
+
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+
+                session.setEndpoint(getEndpoint().session());
+                session.open(request);
+
+                pumpToProtonTransport();
+            }
+        });
+
+        request.sync();
+
+        return session;
+    }
+
+    //----- Configuration accessors ------------------------------------------//
+
+    /**
+     * @return the user name that was used to authenticate this connection.
+     */
+    public String getUsername() {
+        return username;
+    }
+
+    /**
+     * @return the password that was used to authenticate this connection.
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * @return the URI of the remote peer this connection attached to.
+     */
+    public URI getRemoteURI() {
+        return remoteURI;
+    }
+
+    /**
+     * @return the container ID that will be set as the container Id.
+     */
+    public String getContainerId() {
+        return this.containerId;
+    }
+
+    /**
+     * Sets the container Id that will be configured on the connection prior to
+     * connecting to the remote peer.  Calling this after connect has no effect.
+     *
+     * @param containerId
+     * 		  the container Id to use on the connection.
+     */
+    public void setContainerId(String containerId) {
+        this.containerId = containerId;
+    }
+
+    /**
+     * @return the currently set Max Frame Size value.
+     */
+    public int getMaxFrameSize() {
+        return DEFAULT_MAX_FRAME_SIZE;
+    }
+
+    public int getChannelMax() {
+        return channelMax;
+    }
+
+    public void setChannelMax(int channelMax) {
+        this.channelMax = channelMax;
+    }
+
+    public long getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public void setConnectTimeout(long connectTimeout) {
+        this.connectTimeout = connectTimeout;
+    }
+
+    public long getCloseTimeout() {
+        return closeTimeout;
+    }
+
+    public void setCloseTimeout(long closeTimeout) {
+        this.closeTimeout = closeTimeout;
+    }
+
+    public List<Symbol> getOfferedCapabilities() {
+        return offeredCapabilities;
+    }
+
+    public void setOfferedCapabilities(List<Symbol> offeredCapabilities) {
+        if (offeredCapabilities != null) {
+            offeredCapabilities = Collections.emptyList();
+        }
+
+        this.offeredCapabilities = offeredCapabilities;
+    }
+
+    public Map<Symbol, Object> getOfferedProperties() {
+        return offeredProperties;
+    }
+
+    public void setOfferedProperties(Map<Symbol, Object> offeredProperties) {
+        if (offeredProperties != null) {
+            offeredProperties = Collections.emptyMap();
+        }
+
+        this.offeredProperties = offeredProperties;
+    }
+
+    public Connection getConnection() {
+        return new UnmodifiableConnection(getEndpoint());
+    }
+
+    //----- Internal getters used from the child AmqpResource classes --------//
+
+    ScheduledExecutorService getScheduler() {
+        return this.serializer;
+    }
+
+    Connection getProtonConnection() {
+        return getEndpoint();
+    }
+
+    void pumpToProtonTransport() {
+        try {
+            boolean done = false;
+            while (!done) {
+                ByteBuffer toWrite = protonTransport.getOutputBuffer();
+                if (toWrite != null && toWrite.hasRemaining()) {
+                    transport.send(toWrite);
+                    protonTransport.outputConsumed();
+                } else {
+                    done = true;
+                }
+            }
+        } catch (IOException e) {
+            fireClientException(e);
+        }
+    }
+
+    //----- Transport listener event hooks -----------------------------------//
+
+    @Override
+    public void onData(final Buffer input) {
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+                ByteBuffer source = input.toByteBuffer();
+                LOG.trace("Received from Broker {} bytes:", source.remaining());
+
+                do {
+                    ByteBuffer buffer = protonTransport.getInputBuffer();
+                    int limit = Math.min(buffer.remaining(), source.remaining());
+                    ByteBuffer duplicate = source.duplicate();
+                    duplicate.limit(source.position() + limit);
+                    buffer.put(duplicate);
+                    protonTransport.processInput();
+                    source.position(source.position() + limit);
+                } while (source.hasRemaining());
+
+                // Process the state changes from the latest data and then answer back
+                // any pending updates to the Broker.
+                processUpdates();
+                pumpToProtonTransport();
+            }
+        });
+    }
+
+    @Override
+    public void onTransportClosed() {
+        LOG.debug("The transport has unexpectedly closed");
+    }
+
+    @Override
+    public void onTransportError(Throwable cause) {
+        fireClientException(cause);
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    @Override
+    protected void doOpenCompletion() {
+        // If the remote indicates that a close is pending, don't open.
+        if (!getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
+            super.doOpenCompletion();
+        }
+    }
+
+    @Override
+    protected void doOpenInspection() {
+        getStateInspector().inspectOpenedResource(getConnection());
+    }
+
+    @Override
+    protected void doClosedInspection() {
+        getStateInspector().inspectClosedResource(getConnection());
+    }
+
+    protected void fireClientException(Throwable ex) {
+        AmqpClientListener listener = this.listener;
+        if (listener != null) {
+            listener.onClientException(ex);
+        }
+    }
+
+    protected void checkClosed() throws IllegalStateException {
+        if (closed.get()) {
+            throw new IllegalStateException("The Connection is already closed");
+        }
+    }
+
+    private void processUpdates() {
+        try {
+            Event protonEvent = null;
+            while ((protonEvent = protonCollector.peek()) != null) {
+                if (!protonEvent.getType().equals(Type.TRANSPORT)) {
+                    LOG.trace("New Proton Event: {}", protonEvent.getType());
+                }
+
+                AmqpResource amqpResource = null;
+                switch (protonEvent.getType()) {
+                    case CONNECTION_REMOTE_CLOSE:
+                        amqpResource = (AmqpConnection) protonEvent.getConnection().getContext();
+                        amqpResource.processRemoteClose(this);
+                        break;
+                    case CONNECTION_REMOTE_OPEN:
+                        amqpResource = (AmqpConnection) protonEvent.getConnection().getContext();
+                        amqpResource.processRemoteOpen(this);
+                        break;
+                    case SESSION_REMOTE_CLOSE:
+                        amqpResource = (AmqpSession) protonEvent.getSession().getContext();
+                        amqpResource.processRemoteClose(this);
+                        break;
+                    case SESSION_REMOTE_OPEN:
+                        amqpResource = (AmqpSession) protonEvent.getSession().getContext();
+                        amqpResource.processRemoteOpen(this);
+                        break;
+                    case LINK_REMOTE_CLOSE:
+                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
+                        amqpResource.processRemoteClose(this);
+                        break;
+                    case LINK_REMOTE_DETACH:
+                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
+                        amqpResource.processRemoteDetach(this);
+                        break;
+                    case LINK_REMOTE_OPEN:
+                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
+                        amqpResource.processRemoteOpen(this);
+                        break;
+                    case LINK_FLOW:
+                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
+                        amqpResource.processFlowUpdates(this);
+                        break;
+                    case DELIVERY:
+                        amqpResource = (AmqpResource) protonEvent.getLink().getContext();
+                        amqpResource.processDeliveryUpdates(this);
+                        break;
+                    default:
+                        break;
+                }
+
+                protonCollector.pop();
+            }
+
+            // We have to do this to pump SASL bytes in as SASL is not event driven yet.
+            if (!authenticated) {
+                processSaslAuthentication();
+            }
+        } catch (Exception ex) {
+            LOG.warn("Caught Exception during update processing: {}", ex.getMessage(), ex);
+            fireClientException(ex);
+        }
+    }
+
+    private void processSaslAuthentication() {
+        if (authenticated || authenticator == null) {
+            return;
+        }
+
+        try {
+            if (authenticator.authenticate()) {
+                authenticator = null;
+                authenticated = true;
+            }
+        } catch (SecurityException ex) {
+            failed(ex);
+        }
+    }
+
+    private String getNextSessionId() {
+        return connectionId + ":" + sessionIdGenerator.incrementAndGet();
+    }
+
+    private String safeGetContainerId() {
+        String containerId = getContainerId();
+        if (containerId == null || containerId.isEmpty()) {
+            containerId = UUID.randomUUID().toString();
+        }
+
+        return containerId;
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpConnection { " + connectionId + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java
new file mode 100644
index 0000000..9b2394c
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+/**
+ * Default listener implementation that stubs out all the event methods.
+ */
+public class AmqpDefaultClientListener implements AmqpClientListener {
+
+    @Override
+    public void onClientException(Throwable ex) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java
new file mode 100644
index 0000000..d93e052
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorType.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * A Described Type wrapper for JMS selector values.
+ */
+public class AmqpJmsSelectorType implements DescribedType {
+
+    private final String selector;
+
+    public AmqpJmsSelectorType(String selector) {
+        this.selector = selector;
+    }
+
+    @Override
+    public Object getDescriptor() {
+        return UnsignedLong.valueOf(0x0000468C00000004L);
+    }
+
+    @Override
+    public Object getDescribed() {
+        return this.selector;
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpJmsSelectorType{" + selector + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
new file mode 100644
index 0000000..52e5eaf
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpMessage {
+
+    private final AmqpReceiver receiver;
+    private final Message message;
+    private final Delivery delivery;
+
+    /**
+     * Creates a new AmqpMessage that wraps the information necessary to handle
+     * an outgoing message.
+     */
+    public AmqpMessage() {
+        receiver = null;
+        delivery = null;
+
+        message = Proton.message();
+        message.setDurable(true);
+    }
+
+    /**
+     * Creates a new AmqpMessage that wraps the information necessary to handle
+     * an outgoing message.
+     *
+     * @param message
+     *        the Proton message that is to be sent.
+     */
+    public AmqpMessage(Message message) {
+        this(null, message, null);
+    }
+
+    /**
+     * Creates a new AmqpMessage that wraps the information necessary to handle
+     * an incoming delivery.
+     *
+     * @param receiver
+     *        the AmqpReceiver that received this message.
+     * @param message
+     *        the Proton message that was received.
+     * @param delivery
+     *        the Delivery instance that produced this message.
+     */
+    public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) {
+        this.receiver = receiver;
+        this.message = message;
+        this.delivery = delivery;
+    }
+
+    /**
+     * Accepts the message marking it as consumed on the remote peer.
+     *
+     * @throws Exception if an error occurs during the accept.
+     */
+    public void accept() throws Exception {
+        if (receiver == null) {
+            throw new IllegalStateException("Can't accept non-received message.");
+        }
+
+        receiver.accept(delivery);
+    }
+
+    /**
+     * Rejects the message, marking it as not deliverable here and failed to deliver.
+     *
+     * @throws Exception if an error occurs during the reject.
+     */
+    public void reject() throws Exception {
+        reject(true, true);
+    }
+
+    /**
+     * Rejects the message, marking it as failed to deliver and applying the given value
+     * to the undeliverable here tag.
+     *
+     * @param undeliverableHere
+     *        marks the delivery as not being able to be process by link it was sent to.
+     *
+     * @throws Exception if an error occurs during the reject.
+     */
+    public void reject(boolean undeliverableHere) throws Exception {
+        reject(undeliverableHere, true);
+    }
+
+    /**
+     * Rejects the message, marking it as not deliverable here and failed to deliver.
+     *
+     * @param undeliverableHere
+     *        marks the delivery as not being able to be process by link it was sent to.
+     * @param deliveryFailed
+     *        indicates that the delivery failed for some reason.
+     *
+     * @throws Exception if an error occurs during the reject.
+     */
+    public void reject(boolean undeliverableHere, boolean deliveryFailed) throws Exception {
+        if (receiver == null) {
+            throw new IllegalStateException("Can't reject non-received message.");
+        }
+
+        receiver.reject(delivery, undeliverableHere, deliveryFailed);
+    }
+
+    /**
+     * Release the message, remote can redeliver it elsewhere.
+     *
+     * @throws Exception if an error occurs during the reject.
+     */
+    public void release() throws Exception {
+        if (receiver == null) {
+            throw new IllegalStateException("Can't release non-received message.");
+        }
+
+        receiver.release(delivery);
+    }
+
+    /**
+     * @return the AMQP Delivery object linked to a received message.
+     */
+    public Delivery getWrappedDelivery() {
+        if (delivery != null) {
+            return new UnmodifiableDelivery(delivery);
+        }
+
+        return null;
+    }
+
+    /**
+     * @return the AMQP Message that is wrapped by this object.
+     */
+    public Message getWrappedMessage() {
+        return message;
+    }
+
+    /**
+     * @return the AmqpReceiver that consumed this message.
+     */
+    public AmqpReceiver getAmqpReceiver() {
+        return receiver;
+    }
+
+    /**
+     * Sets a String value into the body of an outgoing Message, throws
+     * an exception if this is an incoming message instance.
+     *
+     * @param value
+     *        the String value to store in the Message body.
+     *
+     * @throws IllegalStateException if the message is read only.
+     */
+    public void setText(String value) throws IllegalStateException {
+        if (delivery != null) {
+            throw new IllegalStateException("Message is read only.");
+        }
+
+        AmqpValue body = new AmqpValue(value);
+        getWrappedMessage().setBody(body);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java
new file mode 100644
index 0000000..2d61b83
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * A Described Type wrapper for JMS no local option for MessageConsumer.
+ */
+public class AmqpNoLocalType implements DescribedType {
+
+    public static final AmqpNoLocalType NO_LOCAL = new AmqpNoLocalType();
+
+    private final String noLocal;
+
+    public AmqpNoLocalType() {
+        this.noLocal = "NoLocalFilter{}";
+    }
+
+    @Override
+    public Object getDescriptor() {
+        return UnsignedLong.valueOf(0x0000468C00000003L);
+    }
+
+    @Override
+    public Object getDescribed() {
+        return this.noLocal;
+    }
+}