You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:42 UTC

[18/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
new file mode 100644
index 0000000..b1d0782
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+
+/**
+ * Factory for creating the AMQP provider.
+ */
+public class AmqpProviderFactory extends ProviderFactory {
+
+    @Override
+    public Provider createAsyncProvider(URI remoteURI) throws Exception {
+
+        Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
+        Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "provider.");
+
+        remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+        Provider result = new AmqpProvider(remoteURI);
+
+        if (!PropertyUtil.setProperties(result, providerOptions)) {
+            String msg = ""
+                + " Not all provider options could be set on the AMQP Provider."
+                + " Check the options are spelled correctly."
+                + " Given parameters=[" + providerOptions + "]."
+                + " This provider instance cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
+
+        return result;
+    }
+
+    @Override
+    public String getName() {
+        return "AMQP";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
new file mode 100644
index 0000000..e23b4c0
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Queue Browser implementation for AMQP
+ */
+public class AmqpQueueBrowser extends AmqpConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpQueueBrowser.class);
+
+    /**
+     * @param session
+     * @param info
+     */
+    public AmqpQueueBrowser(AmqpSession session, JmsConsumerInfo info) {
+        super(session, info);
+    }
+
+    /**
+     * Starts the QueueBrowser by activating drain mode with the initial credits.
+     */
+    @Override
+    public void start(AsyncResult request) {
+        this.endpoint.flow(info.getPrefetchSize());
+        request.onSuccess();
+    }
+
+    /**
+     * QueueBrowser will attempt to initiate a pull whenever there are no pending Messages.
+     *
+     * We need to initiate a drain to see if there are any messages and if the remote sender
+     * indicates it is drained then we can send end of browse.  We only do this when there
+     * are no pending incoming deliveries and all delivered messages have become settled
+     * in order to give the remote a chance to dispatch more messages once all deliveries
+     * have been settled.
+     *
+     * @param timeout
+     *        ignored in this context.
+     */
+    @Override
+    public void pull(long timeout) {
+        if (!endpoint.getDrain() && endpoint.current() == null && endpoint.getUnsettled() == 0) {
+            LOG.trace("QueueBrowser {} will try to drain remote.", getConsumerId());
+            this.endpoint.drain(info.getPrefetchSize());
+        } else {
+            endpoint.setDrain(false);
+        }
+    }
+
+    @Override
+    public void processFlowUpdates() throws IOException {
+        if (endpoint.getDrain() && endpoint.getCredit() == endpoint.getRemoteCredit()) {
+            JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch();
+            browseDone.setConsumerId(getConsumerId());
+            try {
+                deliver(browseDone);
+            } catch (Exception e) {
+                throw IOExceptionSupport.create(e);
+            }
+        } else {
+            endpoint.setDrain(false);
+        }
+
+        super.processFlowUpdates();
+    }
+
+    @Override
+    public void processDeliveryUpdates() throws IOException {
+        if (endpoint.getDrain() && endpoint.current() != null) {
+            LOG.trace("{} incoming delivery, cancel drain.", getConsumerId());
+            endpoint.setDrain(false);
+        }
+
+        super.processDeliveryUpdates();
+
+        if (endpoint.getDrain() && endpoint.getCredit() == endpoint.getRemoteCredit()) {
+            JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch();
+            browseDone.setConsumerId(getConsumerId());
+            try {
+                deliver(browseDone);
+            } catch (Exception e) {
+                throw IOExceptionSupport.create(e);
+            }
+        } else {
+            endpoint.setDrain(false);
+        }
+    }
+
+    @Override
+    protected void configureSource(Source source) {
+        if (info.isBrowser()) {
+            source.setDistributionMode(COPY);
+        }
+
+        super.configureSource(source);
+    }
+
+    @Override
+    public boolean isBrowser() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
new file mode 100644
index 0000000..46f8130
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import org.apache.qpid.jms.provider.AsyncResult;
+
+/**
+ * AmqpResource specification.
+ *
+ * All AMQP types should implement this interface to allow for control of state
+ * and configuration details.
+ */
+public interface AmqpResource {
+
+    /**
+     * Perform all the work needed to open this resource and store the request
+     * until such time as the remote peer indicates the resource has become active.
+     *
+     * @param request
+     *        The initiating request that triggered this open call.
+     */
+    void open(AsyncResult request);
+
+    /**
+     * @return if the resource has moved to the opened state on the remote.
+     */
+    boolean isOpen();
+
+    /**
+     * @return true if the resource is awaiting the remote end to signal opened.
+     */
+    boolean isAwaitingOpen();
+
+    /**
+     * Called to indicate that this resource is now remotely opened.  Once opened a
+     * resource can start accepting incoming requests.
+     */
+    void opened();
+
+    /**
+     * Perform all work needed to close this resource and store the request
+     * until such time as the remote peer indicates the resource has been closed.
+     *
+     * @param request
+     *        The initiating request that triggered this close call.
+     */
+    void close(AsyncResult request);
+
+    /**
+     * @return if the resource has moved to the closed state on the remote.
+     */
+    boolean isClosed();
+
+    /**
+     * @return true if the resource is awaiting the remote end to signal closed.
+     */
+    boolean isAwaitingClose();
+
+    /**
+     * Called to indicate that this resource is now remotely closed.  Once closed a
+     * resource can not accept any incoming requests.
+     */
+    void closed();
+
+    /**
+     * Sets the failed state for this Resource and triggers a failure signal for
+     * any pending ProduverRequest.
+     */
+    void failed();
+
+    /**
+     * Sets the failed state for this Resource and triggers a failure signal for
+     * any pending ProduverRequest.
+     *
+     * @param cause
+     *        The Exception that triggered the failure.
+     */
+    void failed(Exception cause);
+
+    /**
+     * Called when the Proton Engine signals that the state of the given resource has
+     * changed on the remote side.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processStateChange() throws IOException;
+
+    /**
+     * Called when the Proton Engine signals an Delivery related event has been triggered
+     * for the given endpoint.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processDeliveryUpdates() throws IOException;
+
+    /**
+     * Called when the Proton Engine signals an Flow related event has been triggered
+     * for the given endpoint.
+     *
+     * @throws IOException if an error occurs while processing the update.
+     */
+    void processFlowUpdates() throws IOException;
+
+    /**
+     * @return an Exception derived from the error state of the endpoint's Remote Condition.
+     */
+    Exception getRemoteError();
+
+    /**
+     * @return an Error message derived from the error state of the endpoint's Remote Condition.
+     */
+    String getRemoteErrorMessage();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
new file mode 100644
index 0000000..3416c22
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import javax.jms.JMSSecurityException;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.sasl.Mechanism;
+import org.apache.qpid.jms.sasl.SaslMechanismFinder;
+import org.apache.qpid.proton.engine.Sasl;
+
+/**
+ * Manage the SASL authentication process
+ */
+public class AmqpSaslAuthenticator {
+
+    private final Sasl sasl;
+    private final JmsConnectionInfo info;
+    private Mechanism mechanism;
+
+    /**
+     * Create the authenticator and initialize it.
+     *
+     * @param sasl
+     *        The Proton SASL entry point this class will use to manage the authentication.
+     * @param info
+     *        The Connection information used to provide credentials to the remote peer.
+     */
+    public AmqpSaslAuthenticator(Sasl sasl, JmsConnectionInfo info) {
+        this.sasl = sasl;
+        this.info = info;
+    }
+
+    /**
+     * Process the SASL authentication cycle until such time as an outcome is determine. This
+     * method must be called by the managing entity until the return value is true indicating a
+     * successful authentication or a JMSSecurityException is thrown indicating that the
+     * handshake failed.
+     *
+     * @throws JMSSecurityException
+     */
+    public boolean authenticate() throws JMSSecurityException {
+        switch (sasl.getState()) {
+            case PN_SASL_IDLE:
+                handleSaslInit();
+                break;
+            case PN_SASL_STEP:
+                handleSaslStep();
+                break;
+            case PN_SASL_FAIL:
+                handleSaslFail();
+                break;
+            case PN_SASL_PASS:
+                return true;
+            default:
+        }
+
+        return false;
+    }
+
+    private void handleSaslInit() throws JMSSecurityException {
+        try {
+            String[] remoteMechanisms = sasl.getRemoteMechanisms();
+            if (remoteMechanisms != null && remoteMechanisms.length != 0) {
+                mechanism = SaslMechanismFinder.findMatchingMechanism(remoteMechanisms);
+                if (mechanism != null) {
+                    mechanism.setUsername(info.getUsername());
+                    mechanism.setPassword(info.getPassword());
+                    // TODO - set additional options from URI.
+                    // TODO - set a host value.
+
+                    sasl.setMechanisms(mechanism.getName());
+                    byte[] response = mechanism.getInitialResponse();
+                    if (response != null && response.length != 0) {
+                        sasl.send(response, 0, response.length);
+                    }
+                } else {
+                    // TODO - Better error message.
+                    throw new JMSSecurityException("Could not find a matching SASL mechanism for the remote peer.");
+                }
+            }
+        } catch (SaslException se) {
+            // TODO - Better error message.
+            JMSSecurityException jmsse = new JMSSecurityException("Exception while processing SASL init.");
+            jmsse.setLinkedException(se);
+            jmsse.initCause(se);
+            throw jmsse;
+        }
+    }
+
+    private void handleSaslStep() throws JMSSecurityException {
+        try {
+            if (sasl.pending() != 0) {
+                byte[] challenge = new byte[sasl.pending()];
+                sasl.recv(challenge, 0, challenge.length);
+                byte[] response = mechanism.getChallengeResponse(challenge);
+                sasl.send(response, 0, response.length);
+            }
+        } catch (SaslException se) {
+            // TODO - Better error message.
+            JMSSecurityException jmsse = new JMSSecurityException("Exception while processing SASL step.");
+            jmsse.setLinkedException(se);
+            jmsse.initCause(se);
+            throw jmsse;
+        }
+    }
+
+    private void handleSaslFail() throws JMSSecurityException {
+        // TODO - Better error message.
+        throw new JMSSecurityException("Client failed to authenticate");
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
new file mode 100644
index 0000000..44e864a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.IllegalStateException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.proton.engine.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
+
+    private final AmqpConnection connection;
+    private final AmqpTransactionContext txContext;
+
+    private final Map<JmsConsumerId, AmqpConsumer> consumers = new HashMap<JmsConsumerId, AmqpConsumer>();
+    private final Map<JmsProducerId, AmqpProducer> producers = new HashMap<JmsProducerId, AmqpProducer>();
+
+    public AmqpSession(AmqpConnection connection, JmsSessionInfo info) {
+        super(info, connection.getProtonConnection().session());
+        this.connection = connection;
+
+        this.info.getSessionId().setProviderHint(this);
+        if (this.info.isTransacted()) {
+            txContext = new AmqpTransactionContext(this);
+        } else {
+            txContext = null;
+        }
+    }
+
+    @Override
+    public void opened() {
+        if (this.txContext != null) {
+            this.txContext.open(openRequest);
+        } else {
+            super.opened();
+        }
+    }
+
+    @Override
+    protected void doOpen() {
+        this.endpoint.setIncomingCapacity(Integer.MAX_VALUE);
+        this.connection.addSession(this);
+    }
+
+    @Override
+    protected void doClose() {
+        this.connection.removeSession(this);
+    }
+
+    /**
+     * Perform an acknowledge of all delivered messages for all consumers active in this
+     * Session.
+     */
+    public void acknowledge() {
+        for (AmqpConsumer consumer : consumers.values()) {
+            consumer.acknowledge();
+        }
+    }
+
+    /**
+     * Perform re-send of all delivered but not yet acknowledged messages for all consumers
+     * active in this Session.
+     *
+     * @throws Exception if an error occurs while performing the recover.
+     */
+    public void recover() throws Exception {
+        for (AmqpConsumer consumer : consumers.values()) {
+            consumer.recover();
+        }
+    }
+
+    public AmqpProducer createProducer(JmsProducerInfo producerInfo) {
+        AmqpProducer producer = null;
+
+        // TODO - There seems to be an issue with Proton not allowing links with a Target
+        //        that has no address.  Otherwise we could just ensure that messages sent
+        //        to these anonymous targets have their 'to' value set to the destination.
+        if (producerInfo.getDestination() != null) {
+            LOG.debug("Creating fixed Producer for: {}", producerInfo.getDestination());
+            producer = new AmqpFixedProducer(this, producerInfo);
+        } else {
+            LOG.debug("Creating an Anonymous Producer: ");
+            producer = new AmqpAnonymousProducer(this, producerInfo);
+        }
+
+        producer.setPresettle(connection.isPresettleProducers());
+
+        return producer;
+    }
+
+    public AmqpProducer getProducer(JmsProducerInfo producerInfo) {
+        return getProducer(producerInfo.getProducerId());
+    }
+
+    public AmqpProducer getProducer(JmsProducerId producerId) {
+        if (producerId.getProviderHint() instanceof AmqpProducer) {
+            return (AmqpProducer) producerId.getProviderHint();
+        }
+        return this.producers.get(producerId);
+    }
+
+    public AmqpConsumer createConsumer(JmsConsumerInfo consumerInfo) {
+        AmqpConsumer result = null;
+
+        if (consumerInfo.isBrowser()) {
+            result = new AmqpQueueBrowser(this, consumerInfo);
+        } else {
+            result = new AmqpConsumer(this, consumerInfo);
+        }
+
+        result.setPresettle(connection.isPresettleConsumers());
+        return result;
+    }
+
+    public AmqpConsumer getConsumer(JmsConsumerInfo consumerInfo) {
+        return getConsumer(consumerInfo.getConsumerId());
+    }
+
+    public AmqpConsumer getConsumer(JmsConsumerId consumerId) {
+        if (consumerId.getProviderHint() instanceof AmqpConsumer) {
+            return (AmqpConsumer) consumerId.getProviderHint();
+        }
+        return this.consumers.get(consumerId);
+    }
+
+    public AmqpTransactionContext getTransactionContext() {
+        return this.txContext;
+    }
+
+    /**
+     * Begins a new Transaction using the given Transaction Id as the identifier.  The AMQP
+     * binary Transaction Id will be stored in the provider hint value of the given transaction.
+     *
+     * @param txId
+     *        The JMS Framework's assigned Transaction Id for the new TX.
+     * @param request
+     *        The request that will be signaled on completion of this operation.
+     *
+     * @throws Exception if an error occurs while performing the operation.
+     */
+    public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
+        if (!this.info.isTransacted()) {
+            throw new IllegalStateException("Non-transacted Session cannot start a TX.");
+        }
+
+        getTransactionContext().begin(txId, request);
+    }
+
+    /**
+     * Commit the currently running Transaction.
+     *
+     * @param request
+     *        The request that will be signaled on completion of this operation.
+     *
+     * @throws Exception if an error occurs while performing the operation.
+     */
+    public void commit(AsyncResult request) throws Exception {
+        if (!this.info.isTransacted()) {
+            throw new IllegalStateException("Non-transacted Session cannot start a TX.");
+        }
+
+        getTransactionContext().commit(request);
+    }
+
+    /**
+     * Roll back the currently running Transaction
+     *
+     * @param request
+     *        The request that will be signaled on completion of this operation.
+     *
+     * @throws Exception if an error occurs while performing the operation.
+     */
+    public void rollback(AsyncResult request) throws Exception {
+        if (!this.info.isTransacted()) {
+            throw new IllegalStateException("Non-transacted Session cannot start a TX.");
+        }
+
+        getTransactionContext().rollback(request);
+    }
+
+    void addResource(AmqpConsumer consumer) {
+        consumers.put(consumer.getConsumerId(), consumer);
+    }
+
+    void removeResource(AmqpConsumer consumer) {
+        consumers.remove(consumer.getConsumerId());
+    }
+
+    void addResource(AmqpProducer producer) {
+        producers.put(producer.getProducerId(), producer);
+    }
+
+    void removeResource(AmqpProducer producer) {
+        producers.remove(producer.getProducerId());
+    }
+
+    /**
+     * Adds Topic or Queue qualifiers to the destination target.  We don't add qualifiers to
+     * Temporary Topics and Queues since AMQP works a bit differently.
+     *
+     * @param destination
+     *        The destination to Qualify.
+     *
+     * @return the qualified destination name.
+     */
+    public String getQualifiedName(JmsDestination destination) {
+        if (destination == null) {
+            return null;
+        }
+
+        String result = destination.getName();
+
+        if (!destination.isTemporary()) {
+            if (destination.isTopic()) {
+                result = connection.getTopicPrefix() + destination.getName();
+            } else {
+                result = connection.getQueuePrefix() + destination.getName();
+            }
+        }
+
+        return result;
+    }
+
+    public AmqpProvider getProvider() {
+        return this.connection.getProvider();
+    }
+
+    public AmqpConnection getConnection() {
+        return this.connection;
+    }
+
+    public JmsSessionId getSessionId() {
+        return this.info.getSessionId();
+    }
+
+    public Session getProtonSession() {
+        return this.endpoint;
+    }
+
+    boolean isTransacted() {
+        return this.info.isTransacted();
+    }
+
+    boolean isAsyncAck() {
+        return this.info.isSendAcksAsync() || isTransacted();
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpSession { " + getSessionId() + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
new file mode 100644
index 0000000..af7fe7f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.JmsSslContext;
+import org.apache.qpid.jms.transports.SslTransport;
+import org.apache.qpid.jms.transports.Transport;
+
+/**
+ * AmqpProvider extension that enables SSL based transports.
+ */
+public class AmqpSslProvider extends AmqpProvider {
+
+    private final JmsSslContext sslContext;
+
+    public AmqpSslProvider(URI remoteURI) {
+        super(remoteURI);
+        this.sslContext = JmsSslContext.getCurrentSslContext();
+    }
+
+    public AmqpSslProvider(URI remoteURI, Map<String, String> extraOptions) {
+        super(remoteURI, extraOptions);
+        this.sslContext = JmsSslContext.getCurrentSslContext();
+    }
+
+    @Override
+    protected Transport createTransport(URI remoteLocation) {
+        return new SslTransport(this, remoteLocation, sslContext);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
new file mode 100644
index 0000000..7606792
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.provider.Provider;
+
+/**
+ * Extends the AmqpProviderFactory to create an SSL based Provider instance.
+ */
+public class AmqpSslProviderFactory extends AmqpProviderFactory {
+
+    @Override
+    public Provider createAsyncProvider(URI remoteURI) throws Exception {
+        return new AmqpSslProvider(remoteURI);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
new file mode 100644
index 0000000..d76e3d7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages a Temporary Destination linked to a given Connection.
+ *
+ * In order to create a temporary destination and keep it active for the life of the connection
+ * we must create a sender with a dynamic target value.  Once the sender is open we can read
+ * the actual name assigned by the broker from the target and that is the real temporary
+ * destination that we will return.
+ *
+ * The open of the Sender instance will also allow us to catch any security errors from
+ * the broker in the case where the user does not have authorization to access temporary
+ * destinations.
+ */
+public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestination, Sender> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpTemporaryDestination.class);
+
+    private final AmqpConnection connection;
+    private final AmqpSession session;
+
+    public AmqpTemporaryDestination(AmqpSession session, JmsDestination destination) {
+        super(destination);
+        this.session = session;
+        this.connection = session.getConnection();
+    }
+
+    @Override
+    public void processStateChange() {
+        // TODO - We might want to check on our producer to see if it becomes closed
+        //        which might indicate that the broker purged the temporary destination.
+
+        EndpointState remoteState = endpoint.getRemoteState();
+        if (remoteState == EndpointState.ACTIVE) {
+            LOG.trace("Temporary Destination: {} is now open", this.info);
+            opened();
+        } else if (remoteState == EndpointState.CLOSED) {
+            LOG.trace("Temporary Destination: {} is now closed", this.info);
+            closed();
+        }
+    }
+
+    @Override
+    public void opened() {
+
+        // Once our producer is opened we can read the updated name from the target address.
+        String oldDestinationName = info.getName();
+        String destinationName = this.endpoint.getRemoteTarget().getAddress();
+
+        this.info.setName(destinationName);
+
+        LOG.trace("Updated temp destination to: {} from: {}", info, oldDestinationName);
+
+        super.opened();
+    }
+
+    @Override
+    protected void doOpen() {
+
+        String sourceAddress = info.getName();
+        if (info.isQueue()) {
+            sourceAddress = connection.getTempQueuePrefix() + sourceAddress;
+        } else {
+            sourceAddress = connection.getTempQueuePrefix() + sourceAddress;
+        }
+        Source source = new Source();
+        source.setAddress(sourceAddress);
+        Target target = new Target();
+        target.setDynamic(true);
+
+        String senderName = sourceAddress;
+        endpoint = session.getProtonSession().sender(senderName);
+        endpoint.setSource(source);
+        endpoint.setTarget(target);
+        endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        this.connection.addTemporaryDestination(this);
+    }
+
+    @Override
+    protected void doClose() {
+        this.connection.removeTemporaryDestination(this);
+    }
+
+    public AmqpConnection getConnection() {
+        return this.connection;
+    }
+
+    public AmqpSession getSession() {
+        return this.session;
+    }
+
+    public Sender getProtonSender() {
+        return this.endpoint;
+    }
+
+    public JmsDestination getJmsDestination() {
+        return this.info;
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + " { " + info + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
new file mode 100644
index 0000000..1cc6fd1
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import javax.jms.IllegalStateException;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the operations surrounding AMQP transaction control.
+ *
+ * The Transaction will carry a JmsTransactionId while the Transaction is open, once a
+ * transaction has been committed or rolled back the Transaction Id is cleared.
+ */
+public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo, Sender> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
+
+    private static final Boolean ROLLBACK_MARKER = Boolean.FALSE;
+    private static final Boolean COMMIT_MARKER = Boolean.TRUE;
+
+    private final AmqpSession session;
+    private JmsTransactionId current;
+    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator();
+    private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>();
+
+    private Delivery pendingDelivery;
+    private AsyncResult pendingRequest;
+
+    /**
+     * Creates a new AmqpTransaction instance.
+     *
+     * @param session
+     *        The session that owns this transaction
+     * @param info
+     *        The JmsTransactionInfo that defines this Transaction.
+     */
+    public AmqpTransactionContext(AmqpSession session) {
+        super(session.getJmsResource());
+        this.session = session;
+    }
+
+    @Override
+    public void processDeliveryUpdates() throws IOException {
+        try {
+            if (pendingDelivery != null && pendingDelivery.remotelySettled()) {
+                DeliveryState state = pendingDelivery.getRemoteState();
+                if (state instanceof Declared) {
+                    Declared declared = (Declared) state;
+                    current.setProviderHint(declared.getTxnId());
+                    pendingDelivery.settle();
+                    LOG.info("New TX started: {}", current.getProviderHint());
+                    AsyncResult request = this.pendingRequest;
+                    this.pendingRequest = null;
+                    this.pendingDelivery = null;
+                    request.onSuccess();
+                } else if (state instanceof Rejected) {
+                    LOG.info("Last TX request failed: {}", current.getProviderHint());
+                    pendingDelivery.settle();
+                    Rejected rejected = (Rejected) state;
+                    TransactionRolledBackException ex =
+                        new TransactionRolledBackException(rejected.getError().getDescription());
+                    AsyncResult request = this.pendingRequest;
+                    this.current = null;
+                    this.pendingRequest = null;
+                    this.pendingDelivery = null;
+                    postRollback();
+                    request.onFailure(ex);
+                } else {
+                    LOG.info("Last TX request succeeded: {}", current.getProviderHint());
+                    pendingDelivery.settle();
+                    AsyncResult request = this.pendingRequest;
+                    if (pendingDelivery.getContext() != null) {
+                        if (pendingDelivery.getContext().equals(COMMIT_MARKER)) {
+                            postCommit();
+                        } else {
+                            postRollback();
+                        }
+                    }
+                    this.current = null;
+                    this.pendingRequest = null;
+                    this.pendingDelivery = null;
+                    request.onSuccess();
+                }
+            }
+        } catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    @Override
+    protected void doOpen() {
+        Coordinator coordinator = new Coordinator();
+        coordinator.setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
+        Source source = new Source();
+
+        String coordinatorName = info.getSessionId().toString();
+        endpoint = session.getProtonSession().sender(coordinatorName);
+        endpoint.setSource(source);
+        endpoint.setTarget(coordinator);
+        endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+    }
+
+    @Override
+    protected void doClose() {
+    }
+
+    public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
+        if (current != null) {
+            throw new IOException("Begin called while a TX is still Active.");
+        }
+
+        Message message = Message.Factory.create();
+        Declare declare = new Declare();
+        message.setBody(new AmqpValue(declare));
+
+        pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+        pendingRequest = request;
+        current = txId;
+
+        sendTxCommand(message);
+    }
+
+    public void commit(AsyncResult request) throws Exception {
+        if (current == null) {
+            throw new IllegalStateException("Rollback called with no active Transaction.");
+        }
+
+        preCommit();
+
+        Message message = Message.Factory.create();
+        Discharge discharge = new Discharge();
+        discharge.setFail(false);
+        discharge.setTxnId((Binary) current.getProviderHint());
+        message.setBody(new AmqpValue(discharge));
+
+        pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+        pendingDelivery.setContext(COMMIT_MARKER);
+        pendingRequest = request;
+
+        sendTxCommand(message);
+    }
+
+    public void rollback(AsyncResult request) throws Exception {
+        if (current == null) {
+            throw new IllegalStateException("Rollback called with no active Transaction.");
+        }
+
+        preRollback();
+
+        Message message = Message.Factory.create();
+        Discharge discharge = new Discharge();
+        discharge.setFail(true);
+        discharge.setTxnId((Binary) current.getProviderHint());
+        message.setBody(new AmqpValue(discharge));
+
+        pendingDelivery = endpoint.delivery(tagGenerator.getNextTag());
+        pendingDelivery.setContext(ROLLBACK_MARKER);
+        pendingRequest = request;
+
+        sendTxCommand(message);
+    }
+
+    public void registerTxConsumer(AmqpConsumer consumer) {
+        this.txConsumers.add(consumer);
+    }
+
+    public AmqpSession getSession() {
+        return this.session;
+    }
+
+    public JmsTransactionId getTransactionId() {
+        return this.current;
+    }
+
+    public Binary getAmqpTransactionId() {
+        Binary result = null;
+        if (current != null) {
+            result = (Binary) current.getProviderHint();
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return this.session.getSessionId() + ": txContext";
+    }
+
+    private void preCommit() throws Exception {
+        for (AmqpConsumer consumer : txConsumers) {
+            consumer.preCommit();
+        }
+    }
+
+    private void preRollback() throws Exception {
+        for (AmqpConsumer consumer : txConsumers) {
+            consumer.preRollback();
+        }
+    }
+
+    private void postCommit() throws Exception {
+        for (AmqpConsumer consumer : txConsumers) {
+            consumer.postCommit();
+        }
+    }
+
+    private void postRollback() throws Exception {
+        for (AmqpConsumer consumer : txConsumers) {
+            consumer.postRollback();
+        }
+    }
+
+    private void sendTxCommand(Message message) throws IOException {
+        int encodedSize = 0;
+        byte[] buffer = new byte[4 * 1024];
+        while (true) {
+            try {
+                encodedSize = message.encode(buffer, 0, buffer.length);
+                break;
+            } catch (BufferOverflowException e) {
+                buffer = new byte[buffer.length * 2];
+            }
+        }
+
+        this.endpoint.send(buffer, 0, encodedSize);
+        this.endpoint.advance();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransferTagGenerator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransferTagGenerator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransferTagGenerator.java
new file mode 100644
index 0000000..309561c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransferTagGenerator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * Utility class that can generate and if enabled pool the binary tag values
+ * used to identify transfers over an AMQP link.
+ */
+public final class AmqpTransferTagGenerator {
+
+    public static final int DEFAULT_TAG_POOL_SIZE = 1024;
+
+    private long nextTagId;
+    private int maxPoolSize = DEFAULT_TAG_POOL_SIZE;
+
+    private final Set<byte[]> tagPool;
+
+    public AmqpTransferTagGenerator() {
+        this(false);
+    }
+
+    public AmqpTransferTagGenerator(boolean pool) {
+        if (pool) {
+            this.tagPool = new LinkedHashSet<byte[]>();
+        } else {
+            this.tagPool = null;
+        }
+    }
+
+    /**
+     * Retrieves the next available tag.
+     *
+     * @return a new or unused tag depending on the pool option.
+     */
+    public byte[] getNextTag() {
+        byte[] rc;
+        if (tagPool != null && !tagPool.isEmpty()) {
+            final Iterator<byte[]> iterator = tagPool.iterator();
+            rc = iterator.next();
+            iterator.remove();
+        } else {
+            try {
+                rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                // This should never happen since we control the input.
+                throw new RuntimeException(e);
+            }
+        }
+        return rc;
+    }
+
+    /**
+     * When used as a pooled cache of tags the unused tags should always be returned once
+     * the transfer has been settled.
+     *
+     * @param tag
+     *        a previously borrowed tag that is no longer in use.
+     */
+    public void returnTag(byte[] data) {
+        if (tagPool != null && tagPool.size() < maxPoolSize) {
+            tagPool.add(data);
+        }
+    }
+
+    /**
+     * Gets the current max pool size value.
+     *
+     * @return the current max tag pool size.
+     */
+    public int getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    /**
+     * Sets the max tag pool size.  If the size is smaller than the current number
+     * of pooled tags the pool will drain over time until it matches the max.
+     *
+     * @param maxPoolSize
+     *        the maximum number of tags to hold in the pool.
+     */
+    public void setMaxPoolSize(int maxPoolSize) {
+        this.maxPoolSize = maxPoolSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
new file mode 100644
index 0000000..1cefc2a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_BYTES_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+
+import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * A JmsBytesMessageFacade that wraps around Proton AMQP Message instances to provide
+ * access to the underlying bytes contained in the message.
+ */
+public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements JmsBytesMessageFacade {
+
+    private static final String CONTENT_TYPE = "application/octet-stream";
+    private static final Buffer EMPTY_BUFFER = new Buffer(new byte[0]);
+    private static final Data EMPTY_DATA = new Data(new Binary(new byte[0]));
+
+    /**
+     * Creates a new facade instance
+     *
+     * @param connection
+     */
+    public AmqpJmsBytesMessageFacade(AmqpConnection connection) {
+        super(connection);
+        setAnnotation(JMS_MSG_TYPE, JMS_BYTES_MESSAGE);
+    }
+
+    /**
+     * Creates a new Facade around an incoming AMQP Message for dispatch to the
+     * JMS Consumer instance.
+     *
+     * @param connection
+     *        the connection that created this Facade.
+     * @param message
+     *        the incoming Message instance that is being wrapped.
+     */
+    public AmqpJmsBytesMessageFacade(AmqpConnection connection, Message message) {
+        super(connection, message);
+    }
+
+    @Override
+    public JmsBytesMessageFacade copy() {
+        AmqpJmsBytesMessageFacade copy = new AmqpJmsBytesMessageFacade(connection);
+        copyInto(copy);
+
+        copy.setContent(getContent().deepCopy());
+
+        return copy;
+    }
+
+    @Override
+    public byte getJmsMsgType() {
+        return JMS_BYTES_MESSAGE;
+    }
+
+    @Override
+    public String getContentType() {
+        return CONTENT_TYPE;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        Binary payload = getBinaryFromBody();
+        return payload != null && payload.getLength() > 0;
+    }
+
+    @Override
+    public Buffer getContent() {
+        Buffer result = EMPTY_BUFFER;
+        Binary payload = getBinaryFromBody();
+        if (payload != null && payload.getLength() > 0) {
+            result = new Buffer(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+        }
+
+        return result;
+    }
+
+    @Override
+    public void setContent(Buffer content) {
+        Data body = EMPTY_DATA;
+        if (content != null) {
+            body = new Data(new Binary(content.data, content.offset, content.length));
+        }
+
+        getAmqpMessage().setBody(body);
+    }
+
+    private Binary getBinaryFromBody() {
+        Section body = getAmqpMessage().getBody();
+        Binary result = null;
+
+        if (body == null) {
+            return result;
+        }
+
+        if (body instanceof Data) {
+            Binary payload = ((Data) body).getValue();
+            if (payload != null && payload.getLength() != 0) {
+                result = payload;
+            }
+        } else if(body instanceof AmqpValue) {
+            Object value = ((AmqpValue) body).getValue();
+            if (value == null) {
+                return result;
+            }
+
+            if (value instanceof Binary) {
+                Binary payload = (Binary)value;
+                if (payload != null && payload.getLength() != 0) {
+                    result = payload;
+                }
+            } else {
+                throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
+            }
+        } else {
+            throw new IllegalStateException("Unexpected body content type: " + body.getClass().getSimpleName());
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
new file mode 100644
index 0000000..511d1e3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MAP_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.qpid.jms.message.facade.JmsMapMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS MapMessage
+ * type.
+ */
+public class AmqpJmsMapMessageFacade extends AmqpJmsMessageFacade implements JmsMapMessageFacade {
+
+    private Map<String,Object> messageBodyMap;
+
+    /**
+     * Create a new facade ready for sending.
+     *
+     * @param connection
+     *        the connection instance that created this facade.
+     */
+    public AmqpJmsMapMessageFacade(AmqpConnection connection) {
+        super(connection);
+        initializeEmptyBody();
+        setAnnotation(JMS_MSG_TYPE, JMS_MAP_MESSAGE);
+    }
+
+    /**
+     * Creates a new Facade around an incoming AMQP Message for dispatch to the
+     * JMS Consumer instance.
+     *
+     * @param connection
+     *        the connection that created this Facade.
+     * @param message
+     *        the incoming Message instance that is being wrapped.
+     */
+    @SuppressWarnings("unchecked")
+    public AmqpJmsMapMessageFacade(AmqpConnection connection, Message message) {
+        super(connection, message);
+
+        Section body = getAmqpMessage().getBody();
+        if (body == null) {
+            initializeEmptyBody();
+        } else if (body instanceof AmqpValue) {
+            Object o = ((AmqpValue) body).getValue();
+            if (o == null) {
+                initializeEmptyBody();
+            } else if (o instanceof Map) {
+                messageBodyMap = (Map<String, Object>) o;
+            } else {
+                throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
+            }
+        } else {
+            throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
+        }
+    }
+
+    /**
+     * @return the appropriate byte value that indicates the type of message this is.
+     */
+    @Override
+    public byte getJmsMsgType() {
+        return JMS_MAP_MESSAGE;
+    }
+
+    @Override
+    public JmsMapMessageFacade copy() {
+        AmqpJmsMapMessageFacade copy = new AmqpJmsMapMessageFacade(connection);
+        copyInto(copy);
+        copy.messageBodyMap.putAll(messageBodyMap);
+        return copy;
+    }
+
+    @Override
+    public Enumeration<String> getMapNames() {
+        return Collections.enumeration(messageBodyMap.keySet());
+    }
+
+    @Override
+    public boolean itemExists(String key) {
+        return messageBodyMap.containsKey(key);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return messageBodyMap.isEmpty();
+    }
+
+    @Override
+    public Object get(String key) {
+        Object value = messageBodyMap.get(key);
+        if (value instanceof Binary) {
+            // Copy to a byte[], ensure we copy only the required portion.
+            Binary bin = ((Binary) value);
+            value = Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(), bin.getLength());
+        }
+
+        return value;
+    }
+
+    @Override
+    public void put(String key, Object value) {
+        Object entry = value;
+        if (value instanceof byte[]) {
+            entry = new Binary((byte[]) value);
+        }
+
+        messageBodyMap.put(key, entry);
+    }
+
+    @Override
+    public Object remove(String key) {
+        return messageBodyMap.remove(key);
+    }
+
+    @Override
+    public void clearBody() {
+        messageBodyMap.clear();
+    }
+
+    private void initializeEmptyBody() {
+        // Using LinkedHashMap because AMQP map equality considers order,
+        // so we should behave in as predictable a manner as possible
+        messageBodyMap = new LinkedHashMap<String, Object>();
+        getAmqpMessage().setBody(new AmqpValue(messageBodyMap));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
new file mode 100644
index 0000000..c31c50b
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_BYTES_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MAP_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_OBJECT_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_STREAM_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.qpid.jms.message.JmsBytesMessage;
+import org.apache.qpid.jms.message.JmsMapMessage;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsObjectMessage;
+import org.apache.qpid.jms.message.JmsStreamMessage;
+import org.apache.qpid.jms.message.JmsTextMessage;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Builder class used to construct the appropriate JmsMessage / JmsMessageFacade
+ * objects to wrap an incoming AMQP Message.
+ */
+public class AmqpJmsMessageBuilder {
+
+    private AmqpJmsMessageBuilder() {
+    }
+
+    /**
+     * Create a new JmsMessage and underlying JmsMessageFacade that represents the proper
+     * message type for the incoming AMQP message.
+     *
+     * @param connection
+     *        The provider AMQP Connection instance where this message arrived at.
+     * @param message
+     *        The Proton Message object that will be wrapped.
+     *
+     * @return a JmsMessage instance properly configured for dispatch to the provider listener.
+     *
+     * @throws IOException if an error occurs while creating the message objects.
+     */
+    public static JmsMessage createJmsMessage(AmqpConnection connection, Message message) throws IOException {
+
+        // First we try the easy way, if the annotation is there we don't have to work hard.
+        JmsMessage result = createFromMsgAnnotation(connection, message);
+        if (result != null) {
+            return result;
+        }
+
+        // TODO
+        throw new IOException("Could not create a JMS message from incoming message");
+    }
+
+    private static JmsMessage createFromMsgAnnotation(AmqpConnection connection, Message message) throws IOException {
+        Object annotation = getMessageAnnotation(JMS_MSG_TYPE, message);
+        if (annotation != null) {
+
+            switch ((byte) annotation) {
+                case JMS_MESSAGE:
+                    return new JmsMessage(new AmqpJmsMessageFacade(connection, message));
+                case JMS_BYTES_MESSAGE:
+                    return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(connection, message));
+                case JMS_TEXT_MESSAGE:
+                    return new JmsTextMessage(new AmqpJmsTextMessageFacade(connection, message));
+                case JMS_MAP_MESSAGE:
+                    return new JmsMapMessage(new AmqpJmsMapMessageFacade(connection, message));
+                case JMS_STREAM_MESSAGE:
+                    return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(connection, message));
+                case JMS_OBJECT_MESSAGE:
+                    return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(connection, message));
+                default:
+                    throw new IOException("Invalid JMS Message Type annotation found in message");
+            }
+        }
+
+        return null;
+    }
+
+    private static Object getMessageAnnotation(String key, Message message) {
+        if (message.getMessageAnnotations() != null) {
+            Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
+            return annotations.get(AmqpMessageSupport.getSymbol(key));
+        }
+
+        return null;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org