You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/24 23:09:42 UTC

[1/4] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5591

Repository: activemq
Updated Branches:
  refs/heads/master e33b3f593 -> 3306467a6


http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
new file mode 100644
index 0000000..068a170
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination;
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.InvalidSelectorException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wraps the AMQP Session and provides the services needed to manage the remote
+ * peer requests for link establishment.
+ */
+public class AmqpSession implements AmqpResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
+
+    private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId, AmqpSender>();
+
+    private final AmqpConnection connection;
+    private final Session protonSession;
+    private final SessionId sessionId;
+
+    private long nextProducerId = 0;
+    private long nextConsumerId = 0;
+
+    /**
+     * Create new AmqpSession instance whose parent is the given AmqpConnection.
+     *
+     * @param connection
+     *        the parent connection for this session.
+     * @param sessionId
+     *        the ActiveMQ SessionId that is used to identify this session.
+     * @param session
+     *        the AMQP Session that this class manages.
+     */
+    public AmqpSession(AmqpConnection connection, SessionId sessionId, Session session) {
+        this.connection = connection;
+        this.sessionId = sessionId;
+        this.protonSession = session;
+    }
+
+    @Override
+    public void open() {
+        LOG.trace("Session {} opened", getSessionId());
+
+        getEndpoint().setContext(this);
+        getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
+        getEndpoint().open();
+
+        connection.sendToActiveMQ(new SessionInfo(getSessionId()));
+    }
+
+    @Override
+    public void close() {
+        LOG.trace("Session {} closed", getSessionId());
+
+        getEndpoint().setContext(null);
+        getEndpoint().close();
+        getEndpoint().free();
+
+        connection.sendToActiveMQ(new RemoveInfo(getSessionId()));
+    }
+
+    /**
+     * Commits all pending work for all resources managed under this session.
+     *
+     * @throws Exception if an error occurs while attempting to commit work.
+     */
+    public void commit() throws Exception {
+        for (AmqpSender consumer : consumers.values()) {
+            consumer.commit();
+        }
+    }
+
+    /**
+     * Rolls back any pending work being down under this session.
+     *
+     * @throws Exception if an error occurs while attempting to roll back work.
+     */
+    public void rollback() throws Exception {
+        for (AmqpSender consumer : consumers.values()) {
+            consumer.rollback();
+        }
+    }
+
+    /**
+     * Used to direct all Session managed Senders to push any queued Messages
+     * out to the remote peer.
+     *
+     * @throws Exception if an error occurs while flushing the messages.
+     */
+    public void flushPendingMessages() throws Exception {
+        for (AmqpSender consumer : consumers.values()) {
+            consumer.pumpOutbound();
+        }
+    }
+
+    public void createCoordinator(final Receiver protonReceiver) throws Exception {
+        AmqpTransactionCoordinator txCoordinator = new AmqpTransactionCoordinator(this, protonReceiver);
+        txCoordinator.flow(connection.getConfiguredReceiverCredit());
+        txCoordinator.open();
+    }
+
+    public void createReceiver(final Receiver protonReceiver) throws Exception {
+        org.apache.qpid.proton.amqp.transport.Target remoteTarget = protonReceiver.getRemoteTarget();
+
+        ProducerInfo producerInfo = new ProducerInfo(getNextProducerId());
+        final AmqpReceiver receiver = new AmqpReceiver(this, protonReceiver, producerInfo);
+
+        try {
+            Target target = (Target) remoteTarget;
+            ActiveMQDestination destination = null;
+            String targetNodeName = target.getAddress();
+
+            if (target.getDynamic()) {
+                destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities());
+                Target actualTarget = new Target();
+                actualTarget.setAddress(destination.getQualifiedName());
+                actualTarget.setDynamic(true);
+                protonReceiver.setTarget(actualTarget);
+                receiver.addCloseAction(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        connection.deleteTemporaryDestination((ActiveMQTempDestination) receiver.getDestination());
+                    }
+                });
+            } else if (targetNodeName != null && !targetNodeName.isEmpty()) {
+                destination = createDestination(remoteTarget);
+            }
+
+            receiver.setDestination(destination);
+            connection.sendToActiveMQ(producerInfo, new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    if (response.isException()) {
+                        ErrorCondition error = null;
+                        Throwable exception = ((ExceptionResponse) response).getException();
+                        if (exception instanceof SecurityException) {
+                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
+                        } else {
+                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
+                        }
+
+                        receiver.close(error);
+                    } else {
+                        receiver.flow(connection.getConfiguredReceiverCredit());
+                        receiver.open();
+                    }
+                    pumpProtonToSocket();
+                }
+            });
+
+        } catch (AmqpProtocolException exception) {
+            receiver.close(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage()));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void createSender(final Sender protonSender) throws Exception {
+        org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) protonSender.getRemoteSource();
+
+        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
+        final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo);
+
+        try {
+            final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>();
+            protonSender.setContext(sender);
+
+            boolean noLocal = false;
+            String selector = null;
+
+            if (source != null) {
+                Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
+                if (filter != null) {
+                    selector = filter.getValue().getDescribed().toString();
+                    // Validate the Selector.
+                    try {
+                        SelectorParser.parse(selector);
+                    } catch (InvalidSelectorException e) {
+                        sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
+                        return;
+                    }
+
+                    supportedFilters.put(filter.getKey(), filter.getValue());
+                }
+
+                filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
+                if (filter != null) {
+                    noLocal = true;
+                    supportedFilters.put(filter.getKey(), filter.getValue());
+                }
+            }
+
+            ActiveMQDestination destination;
+            if (source == null) {
+                // Attempt to recover previous subscription
+                destination = connection.lookupSubscription(protonSender.getName());
+
+                if (destination != null) {
+                    source = new org.apache.qpid.proton.amqp.messaging.Source();
+                    source.setAddress(destination.getQualifiedName());
+                    source.setDurable(TerminusDurability.UNSETTLED_STATE);
+                    source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+                    source.setDistributionMode(COPY);
+                } else {
+                    sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName()));
+                    return;
+                }
+            } else if (source.getDynamic()) {
+                // lets create a temp dest.
+                destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());
+                source = new org.apache.qpid.proton.amqp.messaging.Source();
+                source.setAddress(destination.getQualifiedName());
+                source.setDynamic(true);
+                sender.addCloseAction(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination());
+                    }
+                });
+            } else {
+                destination = createDestination(source);
+            }
+
+            source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
+            protonSender.setSource(source);
+
+            int senderCredit = protonSender.getRemoteCredit();
+
+            consumerInfo.setSelector(selector);
+            consumerInfo.setNoRangeAcks(true);
+            consumerInfo.setDestination(destination);
+            consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
+            consumerInfo.setDispatchAsync(true);
+            consumerInfo.setNoLocal(noLocal);
+
+            if (source.getDistributionMode() == COPY && destination.isQueue()) {
+                consumerInfo.setBrowser(true);
+            }
+
+            if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
+                 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
+                consumerInfo.setSubscriptionName(protonSender.getName());
+            }
+
+            connection.sendToActiveMQ(consumerInfo, new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    if (response.isException()) {
+                        ErrorCondition error = null;
+                        Throwable exception = ((ExceptionResponse) response).getException();
+                        if (exception instanceof SecurityException) {
+                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
+                        } else if (exception instanceof InvalidSelectorException) {
+                            error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
+                        } else {
+                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
+                        }
+
+                        sender.close(error);
+                    } else {
+                        sender.open();
+                    }
+                    pumpProtonToSocket();
+                }
+            });
+
+        } catch (AmqpProtocolException e) {
+            sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage()));
+        }
+    }
+
+    /**
+     * Send all pending work out to the remote peer.
+     */
+    public void pumpProtonToSocket() {
+        connection.pumpProtonToSocket();
+    }
+
+    public void regosterSender(ConsumerId consumerId, AmqpSender sender) {
+        consumers.put(consumerId, sender);
+        connection.regosterSender(consumerId, sender);
+    }
+
+    public void unregisterSender(ConsumerId consumerId) {
+        consumers.remove(consumerId);
+        connection.unregosterSender(consumerId);
+    }
+
+    //----- Configuration accessors ------------------------------------------//
+
+    public AmqpConnection getConnection() {
+        return connection;
+    }
+
+    public SessionId getSessionId() {
+        return sessionId;
+    }
+
+    public Session getEndpoint() {
+        return protonSession;
+    }
+
+    //----- Internal Implementation ------------------------------------------//
+
+    protected ConsumerId getNextConsumerId() {
+        return new ConsumerId(sessionId, nextConsumerId++);
+    }
+
+    protected ProducerId getNextProducerId() {
+        return new ProducerId(sessionId, nextProducerId++);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
new file mode 100644
index 0000000..14fa3ad
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes;
+import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the AMQP Transaction Coordinator support to manage local
+ * transactions between an AMQP client and the broker.
+ */
+public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
+
+    private long nextTransactionId;
+
+    /**
+     * Creates a new Transaction coordinator used to manage AMQP transactions.
+     *
+     * @param session
+     *        the AmqpSession under which the coordinator was created.
+     * @param receiver
+     *        the AMQP receiver link endpoint for this coordinator.
+     */
+    public AmqpTransactionCoordinator(AmqpSession session, Receiver endpoint) {
+        super(session, endpoint);
+    }
+
+    @Override
+    protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
+        Message message = Proton.message();
+        int offset = deliveryBytes.offset;
+        int len = deliveryBytes.length;
+
+        while (len > 0) {
+            final int decoded = message.decode(deliveryBytes.data, offset, len);
+            assert decoded > 0 : "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+        }
+
+        final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext();
+        ConnectionId connectionId = session.getConnection().getConnectionId();
+        final Object action = ((AmqpValue) message.getBody()).getValue();
+
+        LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes);
+        if (action instanceof Declare) {
+            Declare declare = (Declare) action;
+            if (declare.getGlobalId() != null) {
+                throw new Exception("don't know how to handle a declare /w a set GlobalId");
+            }
+
+            long txid = getNextTransactionId();
+            TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
+            sendToActiveMQ(txinfo, null);
+            LOG.trace("started transaction {}", txid);
+
+            Declared declared = new Declared();
+            declared.setTxnId(new Binary(toBytes(txid)));
+            delivery.disposition(declared);
+            delivery.settle();
+        } else if (action instanceof Discharge) {
+            Discharge discharge = (Discharge) action;
+            long txid = toLong(discharge.getTxnId());
+
+            final byte operation;
+            if (discharge.getFail()) {
+                LOG.trace("rollback transaction {}", txid);
+                operation = TransactionInfo.ROLLBACK;
+            } else {
+                LOG.trace("commit transaction {}", txid);
+                operation = TransactionInfo.COMMIT_ONE_PHASE;
+            }
+
+            if (operation == TransactionInfo.ROLLBACK) {
+                session.rollback();
+            } else {
+                session.commit();
+            }
+
+            TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
+            sendToActiveMQ(txinfo, new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    if (response.isException()) {
+                        ExceptionResponse er = (ExceptionResponse) response;
+                        Rejected rejected = new Rejected();
+                        rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage()));
+                        delivery.disposition(rejected);
+                    } else {
+                        delivery.disposition(Accepted.getInstance());
+                    }
+                    LOG.debug("TX: {} settling {}", operation, action);
+                    delivery.settle();
+                    session.pumpProtonToSocket();
+                }
+            });
+
+            if (operation == TransactionInfo.ROLLBACK) {
+                session.flushPendingMessages();
+            }
+
+        } else {
+            throw new Exception("Expected coordinator message type: " + action.getClass());
+        }
+    }
+
+    private long getNextTransactionId() {
+        return ++nextTransactionId;
+    }
+
+    @Override
+    public ActiveMQDestination getDestination() {
+        return null;
+    }
+
+    @Override
+    public void setDestination(ActiveMQDestination destination) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java
new file mode 100644
index 0000000..a1e3d84
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * Utility class that can generate and if enabled pool the binary tag values
+ * used to identify transfers over an AMQP link.
+ */
+public final class AmqpTransferTagGenerator {
+
+    public static final int DEFAULT_TAG_POOL_SIZE = 1024;
+
+    private long nextTagId;
+    private int maxPoolSize = DEFAULT_TAG_POOL_SIZE;
+
+    private final Set<byte[]> tagPool;
+
+    public AmqpTransferTagGenerator() {
+        this(false);
+    }
+
+    public AmqpTransferTagGenerator(boolean pool) {
+        if (pool) {
+            this.tagPool = new LinkedHashSet<byte[]>();
+        } else {
+            this.tagPool = null;
+        }
+    }
+
+    /**
+     * Retrieves the next available tag.
+     *
+     * @return a new or unused tag depending on the pool option.
+     */
+    public byte[] getNextTag() {
+        byte[] rc;
+        if (tagPool != null && !tagPool.isEmpty()) {
+            final Iterator<byte[]> iterator = tagPool.iterator();
+            rc = iterator.next();
+            iterator.remove();
+        } else {
+            try {
+                rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                // This should never happen since we control the input.
+                throw new RuntimeException(e);
+            }
+        }
+        return rc;
+    }
+
+    /**
+     * When used as a pooled cache of tags the unused tags should always be returned once
+     * the transfer has been settled.
+     *
+     * @param data
+     *        a previously borrowed tag that is no longer in use.
+     */
+    public void returnTag(byte[] data) {
+        if (tagPool != null && tagPool.size() < maxPoolSize) {
+            tagPool.add(data);
+        }
+    }
+
+    /**
+     * Gets the current max pool size value.
+     *
+     * @return the current max tag pool size.
+     */
+    public int getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    /**
+     * Sets the max tag pool size.  If the size is smaller than the current number
+     * of pooled tags the pool will drain over time until it matches the max.
+     *
+     * @param maxPoolSize
+     *        the maximum number of tags to hold in the pool.
+     */
+    public void setMaxPoolSize(int maxPoolSize) {
+        this.maxPoolSize = maxPoolSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
index 9c47006..30c79bf 100644
--- a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
@@ -5,13 +5,13 @@
 ## The ASF licenses this file to You under the Apache License, Version 2.0
 ## (the "License"); you may not use this file except in compliance with
 ## the License.  You may obtain a copy of the License at
-## 
+##
 ## http://www.apache.org/licenses/LICENSE-2.0
-## 
+##
 ## Unless required by applicable law or agreed to in writing, software
 ## distributed under the License is distributed on an "AS IS" BASIS,
 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.amqp.AMQPSslTransportFactory
+class=org.apache.activemq.transport.amqp.AmqpSslTransportFactory

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index cf4fa95..4f1c861 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -46,6 +46,7 @@ import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.spring.SpringSslContext;
 import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -111,7 +112,7 @@ public class AmqpTestSupport {
         SSLContext.setDefault(ctx);
 
         // Setup SSL context...
-        final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());
+        final File classesDir = new File(AmqpConnection.class.getProtectionDomain().getCodeSource().getLocation().getFile());
         File keystore = new File(classesDir, "../../src/test/resources/keystore");
         final SpringSslContext sslContext = new SpringSslContext();
         sslContext.setKeyStore(keystore.getCanonicalPath());

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
index dfbbc0b..2df5141 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.transport.amqp.interop;
 
-import static org.apache.activemq.transport.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.LIFETIME_POLICY;
 import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
 import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
 import static org.junit.Assert.assertEquals;
@@ -194,7 +194,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
 
         // Set the dynamic node lifetime-policy
         Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
-        dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance());
+        dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
         source.setDynamicNodeProperties(dynamicNodeProperties);
 
         // Set the capability to indicate the node type being created
@@ -216,7 +216,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
 
         // Set the dynamic node lifetime-policy
         Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
-        dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance());
+        dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
         target.setDynamicNodeProperties(dynamicNodeProperties);
 
         // Set the capability to indicate the node type being created


[4/4] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5591

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5591

Refactoring of the AMQP protocol stack to allow for more flexibility in
adding support for some additional AMQP semantics and group together
common functionality handling to avoid having to fix simillar issues in
multiple places.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3306467a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3306467a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3306467a

Branch: refs/heads/master
Commit: 3306467a6407e164fb7b8304eb6c0dc5cb67a696
Parents: e33b3f5
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 24 18:09:28 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 24 18:09:28 2015 -0400

----------------------------------------------------------------------
 .../amqp/AMQPProtocolDiscriminator.java         |  121 --
 .../transport/amqp/AMQPSslTransportFactory.java |   75 -
 .../activemq/transport/amqp/AmqpHeader.java     |    2 +
 .../transport/amqp/AmqpInactivityMonitor.java   |   10 +-
 .../transport/amqp/AmqpProtocolConverter.java   | 1699 +-----------------
 .../amqp/AmqpProtocolDiscriminator.java         |  114 ++
 .../transport/amqp/AmqpSslTransportFactory.java |   75 +
 .../activemq/transport/amqp/AmqpSupport.java    |   39 +-
 .../activemq/transport/amqp/AmqpTransport.java  |    5 +-
 .../transport/amqp/AmqpTransportFilter.java     |   22 +-
 .../activemq/transport/amqp/AmqpWireFormat.java |   20 +
 .../transport/amqp/AmqpWireFormatFactory.java   |    2 +-
 .../transport/amqp/IAmqpProtocolConverter.java  |   36 -
 .../transport/amqp/ResponseHandler.java         |   19 +-
 .../amqp/protocol/AmqpAbstractLink.java         |  167 ++
 .../amqp/protocol/AmqpAbstractReceiver.java     |  106 ++
 .../transport/amqp/protocol/AmqpConnection.java |  742 ++++++++
 .../transport/amqp/protocol/AmqpLink.java       |   96 +
 .../transport/amqp/protocol/AmqpReceiver.java   |  254 +++
 .../transport/amqp/protocol/AmqpResource.java   |   34 +
 .../transport/amqp/protocol/AmqpSender.java     |  451 +++++
 .../transport/amqp/protocol/AmqpSession.java    |  365 ++++
 .../protocol/AmqpTransactionCoordinator.java    |  162 ++
 .../amqp/protocol/AmqpTransferTagGenerator.java |  103 ++
 .../org/apache/activemq/transport/amqp+ssl      |    6 +-
 .../transport/amqp/AmqpTestSupport.java         |    3 +-
 .../amqp/interop/AmqpTempDestinationTest.java   |    6 +-
 27 files changed, 2803 insertions(+), 1931 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
deleted file mode 100644
index f5b457b..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.Command;
-
-/**
- * Used to assign the best implementation of a AmqpProtocolConverter to the
- * AmqpTransport based on the AmqpHeader that the client sends us.
- */
-public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
-
-    public static final int DEFAULT_PREFETCH = 1000;
-
-    private final AmqpTransport transport;
-    private final BrokerService brokerService;
-
-    private int producerCredit = DEFAULT_PREFETCH;
-
-    interface Discriminator {
-        boolean matches(AmqpHeader header);
-
-        IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService);
-    }
-
-    static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
-    static {
-        DISCRIMINATORS.add(new Discriminator() {
-
-            @Override
-            public IAmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService) {
-                return new AmqpProtocolConverter(transport, brokerService);
-            }
-
-            @Override
-            public boolean matches(AmqpHeader header) {
-                switch (header.getProtocolId()) {
-                    case 0:
-                    case 3:
-                        if (header.getMajor() == 1 && header.getMinor() == 0 && header.getRevision() == 0) {
-                            return true;
-                        }
-                }
-                return false;
-            }
-        });
-    }
-
-    final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
-
-    public AMQPProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService) {
-        this.transport = transport;
-        this.brokerService = brokerService;
-    }
-
-    @Override
-    public void onAMQPData(Object command) throws Exception {
-        if (command.getClass() == AmqpHeader.class) {
-            AmqpHeader header = (AmqpHeader) command;
-
-            Discriminator match = null;
-            for (Discriminator discriminator : DISCRIMINATORS) {
-                if (discriminator.matches(header)) {
-                    match = discriminator;
-                }
-            }
-
-            // Lets use first in the list if none are a good match.
-            if (match == null) {
-                match = DISCRIMINATORS.get(0);
-            }
-
-            IAmqpProtocolConverter next = match.create(transport, brokerService);
-            next.setProducerCredit(producerCredit);
-            transport.setProtocolConverter(next);
-            for (Command send : pendingCommands) {
-                next.onActiveMQCommand(send);
-            }
-            pendingCommands.clear();
-            next.onAMQPData(command);
-        } else {
-            throw new IllegalStateException();
-        }
-    }
-
-    @Override
-    public void onAMQPException(IOException error) {
-    }
-
-    @Override
-    public void onActiveMQCommand(Command command) throws Exception {
-        pendingCommands.add(command);
-    }
-
-    @Override
-    public void updateTracer() {
-    }
-
-    @Override
-    public void setProducerCredit(int producerCredit) {
-        this.producerCredit = producerCredit;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
deleted file mode 100644
index 12bd526..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.transport.MutexTransport;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.tcp.SslTransportFactory;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.wireformat.WireFormat;
-
-/**
- * A <a href="http://amqp.org/">AMQP</a> over SSL transport factory
- */
-public class AMQPSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
-
-    private BrokerService brokerService = null;
-
-    @Override
-    protected String getDefaultWireFormatType() {
-        return "amqp";
-    }
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new AmqpTransportFilter(transport, format, brokerService);
-        IntrospectionSupport.setProperties(transport, options);
-        return super.compositeConfigure(transport, format, options);
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
-        transport = super.serverConfigure(transport, format, options);
-
-        // strip off the mutex transport.
-        if (transport instanceof MutexTransport) {
-            transport = ((MutexTransport) transport).getNext();
-        }
-
-        return transport;
-    }
-
-    @Override
-    public void setBrokerService(BrokerService brokerService) {
-        this.brokerService = brokerService;
-    }
-
-    @Override
-    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
-        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
-        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
-        filter.setInactivityMonitor(monitor);
-        return monitor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
index 2597b2d..d019277 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
@@ -19,6 +19,8 @@ package org.apache.activemq.transport.amqp;
 import org.fusesource.hawtbuf.Buffer;
 
 /**
+ * Represents the AMQP protocol handshake packet that is sent during the
+ * initial exchange with a remote peer.
  */
 public class AmqpHeader {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
index 065559d..8cf6488 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
@@ -44,7 +44,7 @@ public class AmqpInactivityMonitor extends TransportFilter {
     private static Timer ACTIVITY_CHECK_TIMER;
 
     private final AtomicBoolean failed = new AtomicBoolean(false);
-    private IAmqpProtocolConverter protocolConverter;
+    private AmqpProtocolConverter protocolConverter;
 
     private long connectionTimeout = AmqpWireFormat.DEFAULT_CONNECTION_TIMEOUT;
     private SchedulerTimerTask connectCheckerTask;
@@ -98,15 +98,15 @@ public class AmqpInactivityMonitor extends TransportFilter {
         }
     }
 
-    public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
+    public void setProtocolConverter(AmqpProtocolConverter protocolConverter) {
         this.protocolConverter = protocolConverter;
     }
 
-    public IAmqpProtocolConverter getProtocolConverter() {
+    public AmqpProtocolConverter getProtocolConverter() {
         return protocolConverter;
     }
 
-    synchronized void startConnectChecker(long connectionTimeout) {
+    public synchronized void startConnectChecker(long connectionTimeout) {
         this.connectionTimeout = connectionTimeout;
         if (connectionTimeout > 0 && connectCheckerTask == null) {
             connectCheckerTask = new SchedulerTimerTask(connectChecker);
@@ -124,7 +124,7 @@ public class AmqpInactivityMonitor extends TransportFilter {
         }
     }
 
-    synchronized void stopConnectChecker() {
+    public synchronized void stopConnectChecker() {
         if (connectCheckerTask != null) {
             connectCheckerTask.cancel();
             connectCheckerTask = null;


[2/4] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5591

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index 5dfdf75..c65145a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -25,7 +25,6 @@ import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.amqp.message.InboundTransformer;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -41,18 +40,17 @@ import org.slf4j.LoggerFactory;
 public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
     private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
     static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES");
-    static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
-    private IAmqpProtocolConverter protocolConverter;
+    public static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
+    private AmqpProtocolConverter protocolConverter;
     private AmqpWireFormat wireFormat;
     private AmqpInactivityMonitor monitor;
 
     private boolean trace;
-    private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
     private final ReentrantLock lock = new ReentrantLock();
 
     public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) {
         super(next);
-        this.protocolConverter = new AMQPProtocolDiscriminator(this, brokerService);
+        this.protocolConverter = new AmqpProtocolDiscriminator(this, brokerService);
         if (wireFormat instanceof AmqpWireFormat) {
             this.wireFormat = (AmqpWireFormat) wireFormat;
         }
@@ -170,20 +168,20 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
 
     @Override
     public String getTransformer() {
-        return transformer;
+        return wireFormat.getTransformer();
     }
 
     public void setTransformer(String transformer) {
-        this.transformer = transformer;
+        wireFormat.setTransformer(transformer);
     }
 
     @Override
-    public IAmqpProtocolConverter getProtocolConverter() {
+    public AmqpProtocolConverter getProtocolConverter() {
         return protocolConverter;
     }
 
     @Override
-    public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
+    public void setProtocolConverter(AmqpProtocolConverter protocolConverter) {
         this.protocolConverter = protocolConverter;
     }
 
@@ -195,7 +193,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     }
 
     public void setProducerCredit(int producerCredit) {
-        protocolConverter.setProducerCredit(producerCredit);
+        wireFormat.setProducerCredit(producerCredit);
+    }
+
+    public int getProducerCredit() {
+        return wireFormat.getProducerCredit();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index dc0e3d5..3734cc5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 
+import org.apache.activemq.transport.amqp.message.InboundTransformer;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -37,6 +38,7 @@ public class AmqpWireFormat implements WireFormat {
     public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
     public static final int NO_AMQP_MAX_FRAME_SIZE = -1;
     public static final long DEFAULT_CONNECTION_TIMEOUT = 30000L;
+    public static final int DEFAULT_PRODUCER_CREDIT = 1000;
 
     private static final int SASL_PROTOCOL = 3;
 
@@ -44,6 +46,8 @@ public class AmqpWireFormat implements WireFormat {
     private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
     private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
     private long connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
+    private int producerCredit = DEFAULT_PRODUCER_CREDIT;
+    private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
 
     private boolean magicRead = false;
     private ResetListener resetListener;
@@ -207,4 +211,20 @@ public class AmqpWireFormat implements WireFormat {
     public void setConnectAttemptTimeout(long connectAttemptTimeout) {
         this.connectAttemptTimeout = connectAttemptTimeout;
     }
+
+    public void setProducerCredit(int producerCredit) {
+        this.producerCredit = producerCredit;
+    }
+
+    public int getProducerCredit() {
+        return producerCredit;
+    }
+
+    public String getTransformer() {
+        return transformer;
+    }
+
+    public void setTransformer(String transformer) {
+        this.transformer = transformer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
index 75856da..f4de950 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
@@ -20,7 +20,7 @@ import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 
 /**
- * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ * Creates the default AMQP WireFormat object used to configure the protocol support.
  */
 public class AmqpWireFormatFactory implements WireFormatFactory {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
deleted file mode 100644
index 8296ef2..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import java.io.IOException;
-
-import org.apache.activemq.command.Command;
-
-/**
- */
-public interface IAmqpProtocolConverter {
-
-    void onAMQPData(Object command) throws Exception;
-
-    void onAMQPException(IOException error);
-
-    void onActiveMQCommand(Command command) throws Exception;
-
-    void updateTracer();
-
-    void setProducerCredit(int producerCredit);
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
index 392ed77..901fd69 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
@@ -21,8 +21,21 @@ import java.io.IOException;
 import org.apache.activemq.command.Response;
 
 /**
- * Interface used by the AMQPProtocolConverter for callbacks.
+ * Interface used by the AmqpProtocolConverter for callbacks from the broker.
  */
-interface ResponseHandler {
-    void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException;
+public interface ResponseHandler {
+
+    /**
+     * Called when the Broker has handled a previously issued request and
+     * has a response ready.
+     *
+     * @param converter
+     *        the protocol converter that is awaiting the response.
+     * @param response
+     *        the response from the broker.
+     *
+     * @throws IOException if an error occurs while processing the response.
+     */
+    void onResponse(AmqpProtocolConverter converter, Response response) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java
new file mode 100644
index 0000000..d4fe301
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Abstract AmqpLink implementation that provide basic Link services.
+ */
+public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLink {
+
+    protected final AmqpSession session;
+    protected final LINK_TYPE endpoint;
+
+    protected boolean closed;
+    protected boolean opened;
+    protected List<Runnable> closeActions = new ArrayList<Runnable>();
+
+    /**
+     * Creates a new AmqpLink type.
+     *
+     * @param session
+     *        the AmqpSession that servers as the parent of this Link.
+     * @param endpoint
+     *        the link endpoint this object represents.
+     */
+    public AmqpAbstractLink(AmqpSession session, LINK_TYPE endpoint) {
+        this.session = session;
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public void open() {
+        if (!opened) {
+            getEndpoint().setContext(this);
+            getEndpoint().open();
+
+            opened = true;
+        }
+    }
+
+    @Override
+    public void detach() {
+        if (!closed) {
+            if (getEndpoint() != null) {
+                getEndpoint().setContext(null);
+                getEndpoint().detach();
+                getEndpoint().free();
+            }
+        }
+    }
+
+    @Override
+    public void close(ErrorCondition error) {
+        if (!closed) {
+
+            if (getEndpoint() != null) {
+                if (getEndpoint() instanceof Sender) {
+                    getEndpoint().setSource(null);
+                } else {
+                    getEndpoint().setTarget(null);
+                }
+                getEndpoint().setCondition(error);
+            }
+
+            close();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (!closed) {
+
+            if (getEndpoint() != null) {
+                getEndpoint().setContext(null);
+                getEndpoint().close();
+                getEndpoint().free();
+            }
+
+            for (Runnable action : closeActions) {
+                action.run();
+            }
+
+            closeActions.clear();
+            opened = false;
+            closed = true;
+        }
+    }
+
+    /**
+     * @return true if this link has already been opened.
+     */
+    public boolean isOpened() {
+        return opened;
+    }
+
+    /**
+     * @return true if this link has already been closed.
+     */
+    public boolean isClosed() {
+        return closed;
+    }
+
+    /**
+     * @return the Proton Link type this link represents.
+     */
+    public LINK_TYPE getEndpoint() {
+        return endpoint;
+    }
+
+    /**
+     * @return the parent AmqpSession for this Link instance.
+     */
+    public AmqpSession getSession() {
+        return session;
+    }
+
+    @Override
+    public void addCloseAction(Runnable action) {
+        closeActions.add(action);
+    }
+
+    /**
+     * Shorcut method to hand off an ActiveMQ Command to the broker and assign
+     * a ResponseHandler to deal with any reply from the broker.
+     *
+     * @param command
+     *        the Command object to send to the Broker.
+     */
+    protected void sendToActiveMQ(Command command) {
+        session.getConnection().sendToActiveMQ(command, null);
+    }
+
+    /**
+     * Shorcut method to hand off an ActiveMQ Command to the broker and assign
+     * a ResponseHandler to deal with any reply from the broker.
+     *
+     * @param command
+     *        the Command object to send to the Broker.
+     * @param handler
+     *        the ResponseHandler that will handle the Broker's response.
+     */
+    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
+        session.getConnection().sendToActiveMQ(command, handler);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
new file mode 100644
index 0000000..7436a78
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base that provides common services for AMQP Receiver types.
+ */
+public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractReceiver.class);
+
+    protected ByteArrayOutputStream current = new ByteArrayOutputStream();
+    protected final byte[] recvBuffer = new byte[1024 * 8];
+
+    /**
+     * Handle create of new AMQP Receiver instance.
+     *
+     * @param session
+     *        the AmqpSession that servers as the parent of this Link.
+     * @param endpoint
+     *        the Receiver endpoint being managed by this class.
+     */
+    public AmqpAbstractReceiver(AmqpSession session, Receiver endpoint) {
+        super(session, endpoint);
+    }
+
+    @Override
+    public void detach() {
+    }
+
+    @Override
+    public void flow() throws Exception {
+    }
+
+    /**
+     * Provide the receiver endpoint with the given amount of credits.
+     *
+     * @param credits
+     *        the credit value to pass on to the wrapped Receiver.
+     */
+    public void flow(int credits) {
+        getEndpoint().flow(credits);
+    }
+
+    @Override
+    public void commit() throws Exception {
+    }
+
+    @Override
+    public void rollback() throws Exception {
+    }
+
+    @Override
+    public void delivery(Delivery delivery) throws Exception {
+
+        if (!delivery.isReadable()) {
+            LOG.debug("Delivery was not readable!");
+            return;
+        }
+
+        if (current == null) {
+            current = new ByteArrayOutputStream();
+        }
+
+        int count;
+        while ((count = getEndpoint().recv(recvBuffer, 0, recvBuffer.length)) > 0) {
+            current.write(recvBuffer, 0, count);
+        }
+
+        // Expecting more deliveries..
+        if (count == 0) {
+            return;
+        }
+
+        try {
+            processDelivery(delivery, current.toBuffer());
+        } finally {
+            getEndpoint().advance();
+            current = null;
+        }
+    }
+
+    protected abstract void processDelivery(Delivery delivery, Buffer deliveryBytes) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
new file mode 100644
index 0000000..a902315
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -0,0 +1,742 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX;
+import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.InvalidClientIDException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.TopicRegion;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.security.AuthenticationBroker;
+import org.apache.activemq.security.SecurityContext;
+import org.apache.activemq.transport.amqp.AmqpHeader;
+import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
+import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.transport.amqp.AmqpTransport;
+import org.apache.activemq.transport.amqp.AmqpTransportFilter;
+import org.apache.activemq.transport.amqp.AmqpWireFormat;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the mechanics of managing a single remote peer connection.
+ */
+public class AmqpConnection implements AmqpProtocolConverter {
+
+    private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
+    private static final int CHANNEL_MAX = 32767;
+
+    private final Transport protonTransport = Proton.transport();
+    private final Connection protonConnection = Proton.connection();
+    private final Collector eventCollector = new CollectorImpl();
+
+    private final AmqpTransport amqpTransport;
+    private final AmqpWireFormat amqpWireFormat;
+    private final BrokerService brokerService;
+    private AuthenticationBroker authenticator;
+    private Sasl sasl;
+
+    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+    private final AtomicInteger lastCommandId = new AtomicInteger();
+    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
+    private final ConnectionInfo connectionInfo = new ConnectionInfo();
+    private long nextSessionId = 0;
+    private long nextTempDestinationId = 0;
+    private boolean closing = false;
+    private boolean closedSocket = false;
+
+    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
+    private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>();
+
+    public AmqpConnection(AmqpTransport transport, BrokerService brokerService) {
+        this.amqpTransport = transport;
+        AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
+        if (monitor != null) {
+            monitor.setProtocolConverter(this);
+        }
+        this.amqpWireFormat = transport.getWireFormat();
+        this.brokerService = brokerService;
+
+        // the configured maxFrameSize on the URI.
+        int maxFrameSize = amqpWireFormat.getMaxAmqpFrameSize();
+        if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) {
+            this.protonTransport.setMaxFrameSize(maxFrameSize);
+        }
+
+        this.protonTransport.bind(this.protonConnection);
+        this.protonTransport.setChannelMax(CHANNEL_MAX);
+
+        this.protonConnection.collect(eventCollector);
+
+        updateTracer();
+    }
+
+    /**
+     * Load and return a <code>[]Symbol</code> that contains the connection capabilities
+     * offered to new connections
+     *
+     * @return the capabilities that are offered to new clients on connect.
+     */
+    protected Symbol[] getConnectionCapabilitiesOffered() {
+        return new Symbol[]{ ANONYMOUS_RELAY };
+    }
+
+    /**
+     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
+     * that this connection supplies to incoming connections.
+     *
+     * @return the properties that are offered to the incoming connection.
+     */
+    protected Map<Symbol, Object> getConnetionProperties() {
+        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+
+        properties.put(QUEUE_PREFIX, "queue://");
+        properties.put(TOPIC_PREFIX, "topic://");
+
+        return properties;
+    }
+
+    /**
+     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
+     * that this connection supplies to incoming connections when the open has failed
+     * and the remote should expect a close to follow.
+     *
+     * @return the properties that are offered to the incoming connection.
+     */
+    protected Map<Symbol, Object> getFailedConnetionProperties() {
+        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+
+        properties.put(CONNECTION_OPEN_FAILED, true);
+
+        return properties;
+    }
+
+    @Override
+    public void updateTracer() {
+        if (amqpTransport.isTrace()) {
+            ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
+                @Override
+                public void receivedFrame(TransportFrame transportFrame) {
+                    TRACE_FRAMES.trace("{} | RECV: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
+                }
+
+                @Override
+                public void sentFrame(TransportFrame transportFrame) {
+                    TRACE_FRAMES.trace("{} | SENT: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
+                }
+            });
+        }
+    }
+
+    //----- Connection Properties Accessors ----------------------------------//
+
+    /**
+     * @return the amount of credit assigned to AMQP receiver links created from
+     *         sender links on the remote peer.
+     */
+    public int getConfiguredReceiverCredit() {
+        return amqpWireFormat.getProducerCredit();
+    }
+
+    /**
+     * @return the transformer type that was configured for this AMQP transport.
+     */
+    public String getConfiguredTransformer() {
+        return amqpWireFormat.getTransformer();
+    }
+
+    /**
+     * @return the ActiveMQ ConnectionId that identifies this AMQP Connection.
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    /**
+     * @return the Client ID used to create the connection with ActiveMQ
+     */
+    public String getClientId() {
+        return connectionInfo.getClientId();
+    }
+
+    //----- Proton Event handling and IO support -----------------------------//
+
+    void pumpProtonToSocket() {
+        try {
+            boolean done = false;
+            while (!done) {
+                ByteBuffer toWrite = protonTransport.getOutputBuffer();
+                if (toWrite != null && toWrite.hasRemaining()) {
+                    LOG.trace("Sending {} bytes out", toWrite.limit());
+                    amqpTransport.sendToAmqp(toWrite);
+                    protonTransport.outputConsumed();
+                } else {
+                    done = true;
+                }
+            }
+        } catch (IOException e) {
+            amqpTransport.onException(e);
+        }
+    }
+
+    @Override
+    public void onAMQPData(Object command) throws Exception {
+        Buffer frame;
+        if (command.getClass() == AmqpHeader.class) {
+            AmqpHeader header = (AmqpHeader) command;
+
+            if (amqpWireFormat.isHeaderValid(header)) {
+                LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header);
+            } else {
+                LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header);
+                AmqpHeader reply = amqpWireFormat.getMinimallySupportedHeader();
+                amqpTransport.sendToAmqp(reply.getBuffer());
+                handleException(new AmqpProtocolException(
+                    "Connection from client using unsupported AMQP attempted", true));
+            }
+
+            switch (header.getProtocolId()) {
+                case 0:
+                    break; // nothing to do..
+                case 3: // Client will be using SASL for auth..
+                    sasl = protonTransport.sasl();
+                    sasl.setMechanisms(new String[] { "ANONYMOUS", "PLAIN" });
+                    sasl.server();
+                    break;
+                default:
+            }
+            frame = header.getBuffer();
+        } else {
+            frame = (Buffer) command;
+        }
+
+        onFrame(frame);
+    }
+
+    public void onFrame(Buffer frame) throws Exception {
+        while (frame.length > 0) {
+            try {
+                int count = protonTransport.input(frame.data, frame.offset, frame.length);
+                frame.moveHead(count);
+            } catch (Throwable e) {
+                handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
+                return;
+            }
+
+            try {
+                if (sasl != null) {
+                    // Lets try to complete the sasl handshake.
+                    if (sasl.getRemoteMechanisms().length > 0) {
+                        if ("PLAIN".equals(sasl.getRemoteMechanisms()[0])) {
+                            byte[] data = new byte[sasl.pending()];
+                            sasl.recv(data, 0, data.length);
+                            Buffer[] parts = new Buffer(data).split((byte) 0);
+                            if (parts.length > 0) {
+                                connectionInfo.setUserName(parts[0].utf8().toString());
+                            }
+                            if (parts.length > 1) {
+                                connectionInfo.setPassword(parts[1].utf8().toString());
+                            }
+
+                            if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates())) {
+                                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+                            } else {
+                                sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
+                            }
+
+                            amqpTransport.getWireFormat().resetMagicRead();
+                            sasl = null;
+                            LOG.debug("SASL [PLAIN] Handshake complete.");
+                        } else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) {
+                            if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates())) {
+                                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+                            } else {
+                                sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
+                            }
+                            amqpTransport.getWireFormat().resetMagicRead();
+                            sasl = null;
+                            LOG.debug("SASL [ANONYMOUS] Handshake complete.");
+                        }
+                    }
+                }
+
+                Event event = null;
+                while ((event = eventCollector.peek()) != null) {
+                    if (amqpTransport.isTrace()) {
+                        LOG.trace("Processing event: {}", event.getType());
+                    }
+                    switch (event.getType()) {
+                        case CONNECTION_REMOTE_OPEN:
+                            processConnectionOpen(event.getConnection());
+                            break;
+                        case CONNECTION_REMOTE_CLOSE:
+                            processConnectionClose(event.getConnection());
+                            break;
+                        case SESSION_REMOTE_OPEN:
+                            processSessionOpen(event.getSession());
+                            break;
+                        case SESSION_REMOTE_CLOSE:
+                            processSessionClose(event.getSession());
+                            break;
+                        case LINK_REMOTE_OPEN:
+                            processLinkOpen(event.getLink());
+                            break;
+                        case LINK_REMOTE_DETACH:
+                            processLinkDetach(event.getLink());
+                            break;
+                        case LINK_REMOTE_CLOSE:
+                            processLinkClose(event.getLink());
+                            break;
+                        case LINK_FLOW:
+                            processLinkFlow(event.getLink());
+                            break;
+                        case DELIVERY:
+                            processDelivery(event.getDelivery());
+                            break;
+                        default:
+                            break;
+                    }
+
+                    eventCollector.pop();
+                }
+
+            } catch (Throwable e) {
+                handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
+            }
+
+            pumpProtonToSocket();
+        }
+    }
+
+    protected void processConnectionOpen(Connection connection) throws Exception {
+
+        connectionInfo.setResponseRequired(true);
+        connectionInfo.setConnectionId(connectionId);
+
+        configureInactivityMonitor();
+
+        String clientId = protonConnection.getRemoteContainer();
+        if (clientId != null && !clientId.isEmpty()) {
+            connectionInfo.setClientId(clientId);
+        }
+
+        connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
+
+        sendToActiveMQ(connectionInfo, new ResponseHandler() {
+            @Override
+            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                Throwable exception = null;
+                try {
+                    if (response.isException()) {
+                        protonConnection.setProperties(getFailedConnetionProperties());
+                        protonConnection.open();
+
+                        exception = ((ExceptionResponse) response).getException();
+                        if (exception instanceof SecurityException) {
+                            protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
+                        } else if (exception instanceof InvalidClientIDException) {
+                            protonConnection.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()));
+                        } else {
+                            protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage()));
+                        }
+
+                        protonConnection.close();
+                    } else {
+                        protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+                        protonConnection.setProperties(getConnetionProperties());
+                        protonConnection.open();
+                    }
+                } finally {
+                    pumpProtonToSocket();
+
+                    if (response.isException()) {
+                        amqpTransport.onException(IOExceptionSupport.create(exception));
+                    }
+                }
+            }
+        });
+    }
+
+    protected void processConnectionClose(Connection connection) throws Exception {
+        if (!closing) {
+            closing = true;
+            sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    protonConnection.close();
+                    protonConnection.free();
+
+                    if (!closedSocket) {
+                        pumpProtonToSocket();
+                    }
+                }
+            });
+
+            sendToActiveMQ(new ShutdownInfo(), null);
+        }
+    }
+
+    protected void processSessionOpen(Session protonSession) throws Exception {
+        new AmqpSession(this, getNextSessionId(), protonSession).open();
+    }
+
+    protected void processSessionClose(Session protonSession) throws Exception {
+        if (protonSession.getContext() != null) {
+            ((AmqpResource) protonSession.getContext()).close();
+        } else {
+            protonSession.close();
+            protonSession.free();
+        }
+    }
+
+    protected void processLinkOpen(Link link) throws Exception {
+        link.setSource(link.getRemoteSource());
+        link.setTarget(link.getRemoteTarget());
+
+        AmqpSession session = (AmqpSession) link.getSession().getContext();
+        if (link instanceof Receiver) {
+            if (link.getRemoteTarget() instanceof Coordinator) {
+                session.createCoordinator((Receiver) link);
+            } else {
+                session.createReceiver((Receiver) link);
+            }
+        } else {
+            session.createSender((Sender) link);
+        }
+    }
+
+    protected void processLinkDetach(Link link) throws Exception {
+        Object context = link.getContext();
+
+        if (context instanceof AmqpLink) {
+            ((AmqpLink) context).detach();
+        } else {
+            link.detach();
+            link.free();
+        }
+    }
+
+    protected void processLinkClose(Link link) throws Exception {
+        Object context = link.getContext();
+
+        if (context instanceof AmqpLink) {
+            ((AmqpLink) context).close();;
+        } else {
+            link.close();
+            link.free();
+        }
+    }
+
+    protected void processLinkFlow(Link link) throws Exception {
+        Object context = link.getContext();
+        if (context instanceof AmqpLink) {
+            ((AmqpLink) context).flow();
+        }
+    }
+
+    protected void processDelivery(Delivery delivery) throws Exception {
+        if (!delivery.isPartial()) {
+            Object context = delivery.getLink().getContext();
+            if (context instanceof AmqpLink) {
+                AmqpLink amqpLink = (AmqpLink) context;
+                amqpLink.delivery(delivery);
+            }
+        }
+    }
+
+    //----- Event entry points for ActiveMQ commands and errors --------------//
+
+    @Override
+    public void onAMQPException(IOException error) {
+        closedSocket = true;
+        if (!closing) {
+            amqpTransport.sendToActiveMQ(error);
+        } else {
+            try {
+                amqpTransport.stop();
+            } catch (Exception ignore) {
+            }
+        }
+    }
+
+    @Override
+    public void onActiveMQCommand(Command command) throws Exception {
+        if (command.isResponse()) {
+            Response response = (Response) command;
+            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
+            if (rh != null) {
+                rh.onResponse(this, response);
+            } else {
+                // Pass down any unexpected errors. Should this close the connection?
+                if (response.isException()) {
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    handleException(exception);
+                }
+            }
+        } else if (command.isMessageDispatch()) {
+            MessageDispatch dispatch = (MessageDispatch) command;
+            AmqpSender sender = subscriptionsByConsumerId.get(dispatch.getConsumerId());
+            if (sender != null) {
+                // End of Queue Browse will have no Message object.
+                if (dispatch.getMessage() != null) {
+                    LOG.trace("Dispatching MessageId: {} to consumer", dispatch.getMessage().getMessageId());
+                } else {
+                    LOG.trace("Dispatching End of Browse Command to consumer {}", dispatch.getConsumerId());
+                }
+                sender.onMessageDispatch(dispatch);
+                if (dispatch.getMessage() != null) {
+                    LOG.trace("Finished Dispatch of MessageId: {} to consumer", dispatch.getMessage().getMessageId());
+                }
+            }
+        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
+            // Pass down any unexpected async errors. Should this close the connection?
+            Throwable exception = ((ConnectionError) command).getException();
+            handleException(exception);
+        } else if (command.isBrokerInfo()) {
+            // ignore
+        } else {
+            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
+        }
+    }
+
+    //----- Utility methods for connection resources to use ------------------//
+
+    void regosterSender(ConsumerId consumerId, AmqpSender sender) {
+        subscriptionsByConsumerId.put(consumerId, sender);
+    }
+
+    void unregosterSender(ConsumerId consumerId) {
+        subscriptionsByConsumerId.remove(consumerId);
+    }
+
+    ActiveMQDestination lookupSubscription(String subscriptionName) throws AmqpProtocolException {
+        ActiveMQDestination result = null;
+        RegionBroker regionBroker;
+
+        try {
+            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
+        } catch (Exception e) {
+            throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e);
+        }
+
+        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+        DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId());
+        if (subscription != null) {
+            result = subscription.getActiveMQDestination();
+        }
+
+        return result;
+    }
+
+    ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) {
+        ActiveMQDestination rc = null;
+        if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {
+            rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++);
+        } else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) {
+            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
+        } else {
+            LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue");
+            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
+        }
+
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(connectionId);
+        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
+        info.setDestination(rc);
+
+        sendToActiveMQ(info, new ResponseHandler() {
+
+            @Override
+            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                if (response.isException()) {
+                    link.setSource(null);
+
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    if (exception instanceof SecurityException) {
+                        link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
+                    } else {
+                        link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
+                    }
+
+                    link.close();
+                    link.free();
+                }
+            }
+        });
+
+        return rc;
+    }
+
+    void deleteTemporaryDestination(ActiveMQTempDestination destination) {
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(connectionId);
+        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+        info.setDestination(destination);
+
+        sendToActiveMQ(info, new ResponseHandler() {
+
+            @Override
+            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                if (response.isException()) {
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    LOG.debug("Error during temp destination removeal: {}", exception.getMessage());
+                }
+            }
+        });
+    }
+
+    void sendToActiveMQ(Command command) {
+        sendToActiveMQ(command, null);
+    }
+
+    void sendToActiveMQ(Command command, ResponseHandler handler) {
+        command.setCommandId(lastCommandId.incrementAndGet());
+        if (handler != null) {
+            command.setResponseRequired(true);
+            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
+        }
+        amqpTransport.sendToActiveMQ(command);
+    }
+
+    void handleException(Throwable exception) {
+        exception.printStackTrace();
+        LOG.debug("Exception detail", exception);
+        try {
+            amqpTransport.stop();
+        } catch (Throwable e) {
+            LOG.error("Failed to stop AMQP Transport ", e);
+        }
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private SessionId getNextSessionId() {
+        return new SessionId(connectionId, nextSessionId++);
+    }
+
+    private void configureInactivityMonitor() {
+        AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
+        if (monitor == null) {
+            return;
+        }
+
+        monitor.stopConnectChecker();
+    }
+
+    private boolean tryAuthenticate(ConnectionInfo info, X509Certificate[] peerCertificates) {
+        try {
+            if (getAuthenticator().authenticate(info.getUserName(), info.getPassword(), peerCertificates) != null) {
+                return true;
+            }
+
+            return false;
+        } catch (Throwable error) {
+            return false;
+        }
+    }
+
+    private AuthenticationBroker getAuthenticator() {
+        if (authenticator == null) {
+            try {
+                authenticator = (AuthenticationBroker) brokerService.getBroker().getAdaptor(AuthenticationBroker.class);
+            } catch (Exception e) {
+                LOG.debug("Failed to lookup AuthenticationBroker from Broker, will use a default Noop version.");
+            }
+
+            if (authenticator == null) {
+                authenticator = new DefaultAuthenticationBroker();
+            }
+        }
+
+        return authenticator;
+    }
+
+    private class DefaultAuthenticationBroker implements AuthenticationBroker {
+
+        @Override
+        public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
+            return new SecurityContext(username) {
+
+                @Override
+                public Set<Principal> getPrincipals() {
+                    return null;
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
new file mode 100644
index 0000000..d245769
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * Interface used to define the operations needed to implement an AMQP
+ * Link based endpoint, i.e. Sender, Receiver or Coordinator.
+ */
+public interface AmqpLink extends AmqpResource {
+
+    /**
+     * Close the Link with an error indicating the reson for the close.
+     *
+     * @param error
+     *        the error that prompted the close.
+     */
+    void close(ErrorCondition error);
+
+    /**
+     * Request from the remote peer to detach this resource.
+     */
+    void detach();
+
+    /**
+     * Handles an incoming flow control.
+     *
+     * @throws Excption if an error occurs during the flow processing.
+     */
+    void flow() throws Exception;
+
+    /**
+     * Called when a new Delivery arrives for the given Link.
+     *
+     * @param delivery
+     *        the newly arrived delivery on this link.
+     *
+     * @throws Exception if an error occurs while processing the new Delivery.
+     */
+    void delivery(Delivery delivery) throws Exception;
+
+    /**
+     * Handle work necessary on commit of transacted resources associated with
+     * this Link instance.
+     *
+     * @throws Exception if an error occurs while performing the commit.
+     */
+    void commit() throws Exception;
+
+    /**
+     * Handle work necessary on rollback of transacted resources associated with
+     * this Link instance.
+     *
+     * @throws Exception if an error occurs while performing the rollback.
+     */
+    void rollback() throws Exception;
+
+    /**
+     * @return the ActiveMQDestination that this link is servicing.
+     */
+    public ActiveMQDestination getDestination();
+
+    /**
+     * Sets the ActiveMQDestination that this link will be servicing.
+     *
+     * @param destination
+     *        the ActiveMQDestination that this link services.
+     */
+    public void setDestination(ActiveMQDestination destination);
+
+    /**
+     * Adds a new Runnable that is called on close of this link.
+     *
+     * @param action
+     *        a Runnable that will be executed when the link closes or detaches.
+     */
+    public void addCloseAction(Runnable action);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
new file mode 100644
index 0000000..9ab7ebe
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+
+import java.io.IOException;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
+import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
+import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
+import org.apache.activemq.transport.amqp.message.EncodedMessage;
+import org.apache.activemq.transport.amqp.message.InboundTransformer;
+import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An AmqpReceiver wraps the AMQP Receiver end of a link from the remote peer
+ * which holds the corresponding Sender which transfers message accross the
+ * link.  The AmqpReceiver handles all incoming deliveries by converting them
+ * or wrapping them into an ActiveMQ message object and forwarding that message
+ * on to the appropriate ActiveMQ Destination.
+ */
+public class AmqpReceiver extends AmqpAbstractReceiver {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
+
+    private final ProducerInfo producerInfo;
+    private final int configuredCredit;
+    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+
+    private InboundTransformer inboundTransformer;
+
+    /**
+     * Create a new instance of an AmqpReceiver
+     *
+     * @param session
+     *        the Session that is the parent of this AmqpReceiver instance.
+     * @param endpoint
+     *        the AMQP receiver endpoint that the class manages.
+     * @param producerInfo
+     *        the ProducerInfo instance that contains this sender's configuration.
+     */
+    public AmqpReceiver(AmqpSession session, Receiver endpoint, ProducerInfo producerInfo) {
+        super(session, endpoint);
+
+        this.producerInfo = producerInfo;
+        this.configuredCredit = session.getConnection().getConfiguredReceiverCredit();
+    }
+
+    @Override
+    public void close() {
+        if (!isClosed() && isOpened()) {
+            sendToActiveMQ(new RemoveInfo(getProducerId()));
+        }
+
+        super.close();
+    }
+
+    //----- Configuration accessors ------------------------------------------//
+
+    /**
+     * @return the ActiveMQ ProducerId used to register this Receiver on the Broker.
+     */
+    public ProducerId getProducerId() {
+        return producerInfo.getProducerId();
+    }
+
+    @Override
+    public ActiveMQDestination getDestination() {
+        return producerInfo.getDestination();
+    }
+
+    @Override
+    public void setDestination(ActiveMQDestination destination) {
+        producerInfo.setDestination(destination);
+    }
+
+    /**
+     * If the Sender that initiated this Receiver endpoint did not define an address
+     * then it is using anonymous mode and message are to be routed to the address
+     * that is defined in the AMQP message 'To' field.
+     *
+     * @return true if this Receiver should operate in anonymous mode.
+     */
+    public boolean isAnonymous() {
+        return producerInfo.getDestination() == null;
+    }
+
+    /**
+     * Returns the amount of receiver credit that has been configured for this AMQP
+     * transport.  If no value was configured on the TransportConnector URI then a
+     * sensible default is used.
+     *
+     * @return the configured receiver credit to grant.
+     */
+    public int getConfiguredReceiverCredit() {
+        return configuredCredit;
+    }
+
+    //----- Internal Implementation ------------------------------------------//
+
+    protected InboundTransformer getInboundTransformer() {
+        if (inboundTransformer == null) {
+            String transformer = session.getConnection().getConfiguredTransformer();
+            if (transformer.equals(InboundTransformer.TRANSFORMER_JMS)) {
+                inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            } else if (transformer.equals(InboundTransformer.TRANSFORMER_NATIVE)) {
+                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            } else if (transformer.equals(InboundTransformer.TRANSFORMER_RAW)) {
+                inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            } else {
+                LOG.warn("Unknown transformer type {} using native one instead", transformer);
+                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            }
+        }
+        return inboundTransformer;
+    }
+
+    @Override
+    protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
+        if (!isClosed()) {
+            EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
+            final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
+            current = null;
+
+            if (isAnonymous()) {
+                Destination toDestination = message.getJMSDestination();
+                if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) {
+                    Rejected rejected = new Rejected();
+                    ErrorCondition condition = new ErrorCondition();
+                    condition.setCondition(Symbol.valueOf("failed"));
+                    condition.setDescription("Missing to field for message sent to an anonymous producer");
+                    rejected.setError(condition);
+                    delivery.disposition(rejected);
+                    return;
+                }
+            } else {
+                message.setJMSDestination(getDestination());
+            }
+
+            message.setProducerId(getProducerId());
+
+            // Always override the AMQP client's MessageId with our own.  Preserve
+            // the original in the TextView property for later Ack.
+            MessageId messageId = new MessageId(getProducerId(), messageIdGenerator.getNextSequenceId());
+
+            MessageId amqpMessageId = message.getMessageId();
+            if (amqpMessageId != null) {
+                if (amqpMessageId.getTextView() != null) {
+                    messageId.setTextView(amqpMessageId.getTextView());
+                } else {
+                    messageId.setTextView(amqpMessageId.toString());
+                }
+            }
+
+            message.setMessageId(messageId);
+
+            LOG.trace("Inbound Message:{} from Producer:{}",
+                      message.getMessageId(), getProducerId() + ":" + messageId.getProducerSequenceId());
+
+            final DeliveryState remoteState = delivery.getRemoteState();
+            if (remoteState != null && remoteState instanceof TransactionalState) {
+                TransactionalState s = (TransactionalState) remoteState;
+                long txid = toLong(s.getTxnId());
+                message.setTransactionId(new LocalTransactionId(session.getConnection().getConnectionId(), txid));
+            }
+
+            message.onSend();
+            if (!delivery.remotelySettled()) {
+                sendToActiveMQ(message, new ResponseHandler() {
+
+                    @Override
+                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                        if (response.isException()) {
+                            ExceptionResponse er = (ExceptionResponse) response;
+                            Rejected rejected = new Rejected();
+                            ErrorCondition condition = new ErrorCondition();
+                            condition.setCondition(Symbol.valueOf("failed"));
+                            condition.setDescription(er.getException().getMessage());
+                            rejected.setError(condition);
+                            delivery.disposition(rejected);
+                        } else {
+
+                            if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
+                                LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
+                                getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
+                            }
+
+                            if (remoteState != null && remoteState instanceof TransactionalState) {
+                                TransactionalState txAccepted = new TransactionalState();
+                                txAccepted.setOutcome(Accepted.getInstance());
+                                txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
+
+                                delivery.disposition(txAccepted);
+                            } else {
+                                delivery.disposition(Accepted.getInstance());
+                            }
+
+                            delivery.settle();
+                        }
+
+                        session.pumpProtonToSocket();
+                    }
+                });
+            } else {
+                if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
+                    LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
+                    getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
+                    session.pumpProtonToSocket();
+                }
+                sendToActiveMQ(message);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpResource.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpResource.java
new file mode 100644
index 0000000..6ee68dc
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpResource.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+/**
+ * Root interface for all endpoint objects.
+ */
+public interface AmqpResource {
+
+    /**
+     * Request from the remote peer to open this resource.
+     */
+    void open();
+
+    /**
+     * Request from the remote peer to close this resource.
+     */
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
new file mode 100644
index 0000000..75b6758
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
+import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
+import org.apache.activemq.transport.amqp.message.EncodedMessage;
+import org.apache.activemq.transport.amqp.message.OutboundTransformer;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An AmqpSender wraps the AMQP Sender end of a link from the remote peer
+ * which holds the corresponding Receiver which receives messages transfered
+ * across the link from the Broker.
+ *
+ * An AmqpSender is in turn a message consumer subscribed to some destination
+ * on the broker.  As messages are dispatched to this sender that are sent on
+ * to the remote Receiver end of the lin.
+ */
+public class AmqpSender extends AmqpAbstractLink<Sender> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
+
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
+
+    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+    private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
+    private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
+    private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
+    private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
+
+    private final ConsumerInfo consumerInfo;
+    private final boolean presettle;
+
+    private boolean closed;
+    private boolean endOfBrowse;
+    private int currentCredit;
+    private long lastDeliveredSequenceId;
+
+    private Buffer currentBuffer;
+    private Delivery currentDelivery;
+
+    /**
+     * Creates a new AmqpSender instance that manages the given Sender
+     *
+     * @param session
+     *        the AmqpSession object that is the parent of this instance.
+     * @param endpoint
+     *        the AMQP Sender instance that this class manages.
+     * @param consumerInfo
+     *        the ConsumerInfo instance that holds configuration for this sender.
+     */
+    public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
+        super(session, endpoint);
+
+        this.currentCredit = endpoint.getRemoteCredit();
+        this.consumerInfo = consumerInfo;
+        this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+    }
+
+    @Override
+    public void open() {
+        if (!closed) {
+            session.regosterSender(getConsumerId(), this);
+        }
+
+        super.open();
+    }
+
+    @Override
+    public void detach() {
+        if (!isClosed() && isOpened()) {
+            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
+            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+            sendToActiveMQ(removeCommand, null);
+
+            session.unregisterSender(getConsumerId());
+        }
+
+        super.detach();
+    }
+
+    @Override
+    public void close() {
+        if (!isClosed() && isOpened()) {
+            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
+            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+            sendToActiveMQ(removeCommand, null);
+
+            if (consumerInfo.isDurable()) {
+                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                rsi.setConnectionId(session.getConnection().getConnectionId());
+                rsi.setSubscriptionName(getEndpoint().getName());
+                rsi.setClientId(session.getConnection().getClientId());
+
+                sendToActiveMQ(rsi, null);
+
+                session.unregisterSender(getConsumerId());
+            }
+        }
+
+        super.close();
+    }
+
+    @Override
+    public void flow() throws Exception {
+        int updatedCredit = getEndpoint().getCredit();
+
+        if (updatedCredit != currentCredit) {
+            currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
+            ConsumerControl control = new ConsumerControl();
+            control.setConsumerId(getConsumerId());
+            control.setDestination(getDestination());
+            control.setPrefetch(currentCredit);
+            sendToActiveMQ(control, null);
+        }
+
+        drainCheck();
+    }
+
+    @Override
+    public void delivery(Delivery delivery) throws Exception {
+        MessageDispatch md = (MessageDispatch) delivery.getContext();
+        DeliveryState state = delivery.getRemoteState();
+
+        if (state instanceof TransactionalState) {
+            TransactionalState txState = (TransactionalState) state;
+            LOG.trace("onDelivery: TX delivery state = {}", state);
+            if (txState.getOutcome() != null) {
+                Outcome outcome = txState.getOutcome();
+                if (outcome instanceof Accepted) {
+                    if (!delivery.remotelySettled()) {
+                        TransactionalState txAccepted = new TransactionalState();
+                        txAccepted.setOutcome(Accepted.getInstance());
+                        txAccepted.setTxnId(((TransactionalState) state).getTxnId());
+
+                        delivery.disposition(txAccepted);
+                    }
+                    settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
+                }
+            }
+        } else {
+            if (state instanceof Accepted) {
+                LOG.trace("onDelivery: accepted state = {}", state);
+                if (!delivery.remotelySettled()) {
+                    delivery.disposition(new Accepted());
+                }
+                settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
+            } else if (state instanceof Rejected) {
+                // re-deliver /w incremented delivery counter.
+                md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+                LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
+                settle(delivery, -1);
+            } else if (state instanceof Released) {
+                LOG.trace("onDelivery: Released state = {}", state);
+                // re-deliver && don't increment the counter.
+                settle(delivery, -1);
+            } else if (state instanceof Modified) {
+                Modified modified = (Modified) state;
+                if (modified.getDeliveryFailed()) {
+                    // increment delivery counter..
+                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+                }
+                LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
+                byte ackType = -1;
+                Boolean undeliverableHere = modified.getUndeliverableHere();
+                if (undeliverableHere != null && undeliverableHere) {
+                    // receiver does not want the message..
+                    // perhaps we should DLQ it?
+                    ackType = MessageAck.POSION_ACK_TYPE;
+                }
+                settle(delivery, ackType);
+            }
+        }
+
+        pumpOutbound();
+    }
+
+    @Override
+    public void commit() throws Exception {
+        if (!dispatchedInTx.isEmpty()) {
+            for (MessageDispatch md : dispatchedInTx) {
+                MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
+                pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
+                pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
+
+                LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
+
+                sendToActiveMQ(pendingTxAck, new ResponseHandler() {
+                    @Override
+                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                        if (response.isException()) {
+                            if (response.isException()) {
+                                Throwable exception = ((ExceptionResponse) response).getException();
+                                exception.printStackTrace();
+                                getEndpoint().close();
+                            }
+                        }
+                        session.pumpProtonToSocket();
+                    }
+                });
+            }
+
+            dispatchedInTx.clear();
+        }
+    }
+
+    @Override
+    public void rollback() throws Exception {
+        synchronized (outbound) {
+
+            LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
+
+            for (MessageDispatch dispatch : dispatchedInTx) {
+                dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
+                dispatch.getMessage().setTransactionId(null);
+                outbound.addFirst(dispatch);
+            }
+
+            dispatchedInTx.clear();
+        }
+    }
+
+    /**
+     * Event point for incoming message from ActiveMQ on this Sender's
+     * corresponding subscription.
+     *
+     * @param dispatch
+     *        the MessageDispatch to process and send across the link.
+     *
+     * @throws Exception if an error occurs while encoding the message for send.
+     */
+    public void onMessageDispatch(MessageDispatch dispatch) throws Exception {
+        if (!isClosed()) {
+            // Lock to prevent stepping on TX redelivery
+            synchronized (outbound) {
+                outbound.addLast(dispatch);
+            }
+            pumpOutbound();
+            session.pumpProtonToSocket();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpSender {" + getConsumerId() + "}";
+    }
+
+    //----- Property getters and setters -------------------------------------//
+
+    public ConsumerId getConsumerId() {
+        return consumerInfo.getConsumerId();
+    }
+
+    @Override
+    public ActiveMQDestination getDestination() {
+        return consumerInfo.getDestination();
+    }
+
+    @Override
+    public void setDestination(ActiveMQDestination destination) {
+        consumerInfo.setDestination(destination);
+    }
+
+    //----- Internal Implementation ------------------------------------------//
+
+    public void pumpOutbound() throws Exception {
+        while (!closed) {
+            while (currentBuffer != null) {
+                int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
+                if (sent > 0) {
+                    currentBuffer.moveHead(sent);
+                    if (currentBuffer.length == 0) {
+                        if (presettle) {
+                            settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
+                        } else {
+                            getEndpoint().advance();
+                        }
+                        currentBuffer = null;
+                        currentDelivery = null;
+                    }
+                } else {
+                    return;
+                }
+            }
+
+            if (outbound.isEmpty()) {
+                return;
+            }
+
+            final MessageDispatch md = outbound.removeFirst();
+            try {
+
+                ActiveMQMessage temp = null;
+                if (md.getMessage() != null) {
+
+                    // Topics can dispatch the same Message to more than one consumer
+                    // so we must copy to prevent concurrent read / write to the same
+                    // message object.
+                    if (md.getDestination().isTopic()) {
+                        synchronized (md.getMessage()) {
+                            temp = (ActiveMQMessage) md.getMessage().copy();
+                        }
+                    } else {
+                        temp = (ActiveMQMessage) md.getMessage();
+                    }
+
+                    if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
+                        temp.setProperty(MESSAGE_FORMAT_KEY, 0);
+                    }
+                }
+
+                final ActiveMQMessage jms = temp;
+                if (jms == null) {
+                    // It's the end of browse signal.
+                    endOfBrowse = true;
+                    drainCheck();
+                } else {
+                    jms.setRedeliveryCounter(md.getRedeliveryCounter());
+                    jms.setReadOnlyBody(true);
+                    final EncodedMessage amqp = outboundTransformer.transform(jms);
+                    if (amqp != null && amqp.getLength() > 0) {
+                        currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
+                        if (presettle) {
+                            currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
+                        } else {
+                            final byte[] tag = tagCache.getNextTag();
+                            currentDelivery = getEndpoint().delivery(tag, 0, tag.length);
+                        }
+                        currentDelivery.setContext(md);
+                    } else {
+                        // TODO: message could not be generated what now?
+                    }
+                }
+            } catch (Exception e) {
+                LOG.warn("Error detected while flushing outbound messages: {}", e.getMessage());
+            }
+        }
+    }
+
+    private void settle(final Delivery delivery, final int ackType) throws Exception {
+        byte[] tag = delivery.getTag();
+        if (tag != null && tag.length > 0 && delivery.remotelySettled()) {
+            tagCache.returnTag(tag);
+        }
+
+        if (ackType == -1) {
+            // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
+            delivery.settle();
+            onMessageDispatch((MessageDispatch) delivery.getContext());
+        } else {
+            MessageDispatch md = (MessageDispatch) delivery.getContext();
+            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
+            MessageAck ack = new MessageAck();
+            ack.setConsumerId(getConsumerId());
+            ack.setFirstMessageId(md.getMessage().getMessageId());
+            ack.setLastMessageId(md.getMessage().getMessageId());
+            ack.setMessageCount(1);
+            ack.setAckType((byte) ackType);
+            ack.setDestination(md.getDestination());
+
+            DeliveryState remoteState = delivery.getRemoteState();
+            if (remoteState != null && remoteState instanceof TransactionalState) {
+                TransactionalState s = (TransactionalState) remoteState;
+                long txid = toLong(s.getTxnId());
+                LocalTransactionId localTxId = new LocalTransactionId(session.getConnection().getConnectionId(), txid);
+                ack.setTransactionId(localTxId);
+
+                // Store the message sent in this TX we might need to
+                // re-send on rollback
+                md.getMessage().setTransactionId(localTxId);
+                dispatchedInTx.addFirst(md);
+            }
+
+            LOG.trace("Sending Ack to ActiveMQ: {}", ack);
+
+            sendToActiveMQ(ack, new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    if (response.isException()) {
+                        if (response.isException()) {
+                            Throwable exception = ((ExceptionResponse) response).getException();
+                            exception.printStackTrace();
+                            getEndpoint().close();
+                        }
+                    } else {
+                        delivery.settle();
+                    }
+                    session.pumpProtonToSocket();
+                }
+            });
+        }
+    }
+
+    private void drainCheck() {
+        // If we are a browser.. lets not say we are drained until
+        // we hit the end of browse message.
+        if (consumerInfo.isBrowser() && !endOfBrowse) {
+            return;
+        }
+
+        if (outbound.isEmpty()) {
+            getEndpoint().drained();
+        }
+    }
+}


[3/4] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5591

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index d1e9f5a..5c33ed1 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -16,1694 +16,53 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY;
-import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
-import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
-import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
-import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
-import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX;
-import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
-import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
-import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX;
-import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
-import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
-import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes;
-import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
-
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.Destination;
-import javax.jms.InvalidClientIDException;
-import javax.jms.InvalidSelectorException;
 
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.TopicRegion;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTempDestination;
-import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.security.AuthenticationBroker;
-import org.apache.activemq.security.SecurityContext;
-import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.store.PersistenceAdapterSupport;
-import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
-import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
-import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
-import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
-import org.apache.activemq.transport.amqp.message.EncodedMessage;
-import org.apache.activemq.transport.amqp.message.InboundTransformer;
-import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
-import org.apache.activemq.transport.amqp.message.OutboundTransformer;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.DescribedType;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Outcome;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
-import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.proton.amqp.transaction.Coordinator;
-import org.apache.qpid.proton.amqp.transaction.Declare;
-import org.apache.qpid.proton.amqp.transaction.Declared;
-import org.apache.qpid.proton.amqp.transaction.Discharge;
-import org.apache.qpid.proton.amqp.transaction.TransactionalState;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.impl.CollectorImpl;
-import org.apache.qpid.proton.engine.impl.ProtocolTracer;
-import org.apache.qpid.proton.engine.impl.TransportImpl;
-import org.apache.qpid.proton.framing.TransportFrame;
-import org.apache.qpid.proton.message.Message;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.ByteArrayOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class AmqpProtocolConverter implements IAmqpProtocolConverter {
-
-    private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
-    private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
-    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
-    private static final int CHANNEL_MAX = 32767;
-
-    private final AmqpTransport amqpTransport;
-    private final AmqpWireFormat amqpWireFormat;
-    private final BrokerService brokerService;
-    private AuthenticationBroker authenticator;
-
-    protected int producerCredit;
-    protected Transport protonTransport = Proton.transport();
-    protected Connection protonConnection = Proton.connection();
-    protected Collector eventCollector = new CollectorImpl();
-
-    public AmqpProtocolConverter(AmqpTransport transport, BrokerService brokerService) {
-        this.amqpTransport = transport;
-        AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
-        if (monitor != null) {
-            monitor.setProtocolConverter(this);
-        }
-        this.amqpWireFormat = transport.getWireFormat();
-        this.brokerService = brokerService;
 
-        // the configured maxFrameSize on the URI.
-        int maxFrameSize = transport.getWireFormat().getMaxAmqpFrameSize();
-        if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) {
-            this.protonTransport.setMaxFrameSize(maxFrameSize);
-        }
-
-        this.protonTransport.bind(this.protonConnection);
-
-        // NOTE: QPid JMS client has a bug where the channel max is stored as a
-        //       short value in the Connection class which means that if we allow
-        //       the default channel max of 65535 to be sent then no new sessions
-        //       can be created because the value would be -1 when checked.
-        this.protonTransport.setChannelMax(CHANNEL_MAX);
-
-        this.protonConnection.collect(eventCollector);
-
-        updateTracer();
-    }
+/**
+ * Interface that defines the API for any AMQP protocol converter ised to
+ * map AMQP mechanincs to ActiveMQ and back.
+ */
+public interface AmqpProtocolConverter {
 
     /**
-     * Load and return a <code>[]Symbol</code> that contains the connection capabilities
-     * offered to new connections
+     * A new incoming data packet from the remote peer is handed off to the
+     * protocol converter for porcessing.  The type can vary and be either an
+     * AmqpHeader at the handshake phase or a byte buffer containing the next
+     * incoming frame data from the remote.
+     *
+     * @param data
+     *        the next incoming data object from the remote peer.
      *
-     * @return the capabilities that are offered to new clients on connect.
+     * @throws Exception if an error occurs processing the incoming data packet.
      */
-    protected Symbol[] getConnectionCapabilitiesOffered() {
-        return new Symbol[]{ ANONYMOUS_RELAY };
-    }
+    void onAMQPData(Object data) throws Exception;
 
     /**
-     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
-     * that this connection supplies to incoming connections.
+     * Called when the transport detects an exception that the converter
+     * needs to respond to.
      *
-     * @return the properties that are offered to the incoming connection.
+     * @param error
+     *        the error that triggered this call.
      */
-    protected Map<Symbol, Object> getConnetionProperties() {
-        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
-
-        properties.put(QUEUE_PREFIX, "queue://");
-        properties.put(TOPIC_PREFIX, "topic://");
-
-        return properties;
-    }
+    void onAMQPException(IOException error);
 
     /**
-     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
-     * that this connection supplies to incoming connections when the open has failed
-     * and the remote should expect a close to follow.
+     * Incoming Command object from ActiveMQ.
      *
-     * @return the properties that are offered to the incoming connection.
+     * @param command
+     *        the next incoming command from the broker.
+     *
+     * @throws Exception if an error occurs processing the command.
      */
-    protected Map<Symbol, Object> getFailedConnetionProperties() {
-        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
-
-        properties.put(CONNECTION_OPEN_FAILED, true);
-
-        return properties;
-    }
-
-    @Override
-    public void updateTracer() {
-        if (amqpTransport.isTrace()) {
-            ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
-                @Override
-                public void receivedFrame(TransportFrame transportFrame) {
-                    TRACE_FRAMES.trace("{} | RECV: {}", AmqpProtocolConverter.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
-                }
-
-                @Override
-                public void sentFrame(TransportFrame transportFrame) {
-                    TRACE_FRAMES.trace("{} | SENT: {}", AmqpProtocolConverter.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
-                }
-            });
-        }
-    }
-
-    void pumpProtonToSocket() {
-        try {
-            boolean done = false;
-            while (!done) {
-                ByteBuffer toWrite = protonTransport.getOutputBuffer();
-                if (toWrite != null && toWrite.hasRemaining()) {
-                    LOG.trace("Sending {} bytes out", toWrite.limit());
-                    amqpTransport.sendToAmqp(toWrite);
-                    protonTransport.outputConsumed();
-                } else {
-                    done = true;
-                }
-            }
-        } catch (IOException e) {
-            amqpTransport.onException(e);
-        }
-    }
-
-    static class AmqpSessionContext {
-        private final SessionId sessionId;
-        long nextProducerId = 0;
-        long nextConsumerId = 0;
-
-        final Map<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
-
-        public AmqpSessionContext(ConnectionId connectionId, long id) {
-            sessionId = new SessionId(connectionId, id);
-        }
-    }
-
-    Sasl sasl;
+    void onActiveMQCommand(Command command) throws Exception;
 
     /**
-     * Convert a AMQP command
+     * On changes to the transport tracing options the Protocol Converter
+     * should update its internal state so that the proper AMQP data is
+     * logged.
      */
-    @Override
-    public void onAMQPData(Object command) throws Exception {
-        Buffer frame;
-        if (command.getClass() == AmqpHeader.class) {
-            AmqpHeader header = (AmqpHeader) command;
-
-            if (amqpWireFormat.isHeaderValid(header)) {
-                LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header);
-            } else {
-                LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header);
-                AmqpHeader reply = amqpWireFormat.getMinimallySupportedHeader();
-                amqpTransport.sendToAmqp(reply.getBuffer());
-                handleException(new AmqpProtocolException(
-                    "Connection from client using unsupported AMQP attempted", true));
-            }
-
-            switch (header.getProtocolId()) {
-                case 0:
-                    break; // nothing to do..
-                case 3: // Client will be using SASL for auth..
-                    sasl = protonTransport.sasl();
-                    sasl.setMechanisms(new String[] { "ANONYMOUS", "PLAIN" });
-                    sasl.server();
-                    break;
-                default:
-            }
-            frame = header.getBuffer();
-        } else {
-            frame = (Buffer) command;
-        }
-        onFrame(frame);
-    }
-
-    public void onFrame(Buffer frame) throws Exception {
-        while (frame.length > 0) {
-            try {
-                int count = protonTransport.input(frame.data, frame.offset, frame.length);
-                frame.moveHead(count);
-            } catch (Throwable e) {
-                handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
-                return;
-            }
-
-            try {
-                if (sasl != null) {
-                    // Lets try to complete the sasl handshake.
-                    if (sasl.getRemoteMechanisms().length > 0) {
-                        if ("PLAIN".equals(sasl.getRemoteMechanisms()[0])) {
-                            byte[] data = new byte[sasl.pending()];
-                            sasl.recv(data, 0, data.length);
-                            Buffer[] parts = new Buffer(data).split((byte) 0);
-                            if (parts.length > 0) {
-                                connectionInfo.setUserName(parts[0].utf8().toString());
-                            }
-                            if (parts.length > 1) {
-                                connectionInfo.setPassword(parts[1].utf8().toString());
-                            }
-
-                            if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates())) {
-                                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
-                            } else {
-                                sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
-                            }
-
-                            amqpTransport.getWireFormat().resetMagicRead();
-                            sasl = null;
-                            LOG.debug("SASL [PLAIN] Handshake complete.");
-                        } else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) {
-                            if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates())) {
-                                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
-                            } else {
-                                sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
-                            }
-                            amqpTransport.getWireFormat().resetMagicRead();
-                            sasl = null;
-                            LOG.debug("SASL [ANONYMOUS] Handshake complete.");
-                        }
-                    }
-                }
-
-                Event event = null;
-                while ((event = eventCollector.peek()) != null) {
-                    if (amqpTransport.isTrace()) {
-                        LOG.trace("Processing event: {}", event.getType());
-                    }
-                    switch (event.getType()) {
-                        case CONNECTION_REMOTE_OPEN:
-                        case CONNECTION_REMOTE_CLOSE:
-                            processConnectionEvent(event.getConnection());
-                            break;
-                        case SESSION_REMOTE_OPEN:
-                        case SESSION_REMOTE_CLOSE:
-                            processSessionEvent(event.getSession());
-                            break;
-                        case LINK_REMOTE_OPEN:
-                            processLinkOpen(event.getLink());
-                            break;
-                        case LINK_REMOTE_DETACH:
-                            processLinkDetach(event.getLink());
-                            break;
-                        case LINK_REMOTE_CLOSE:
-                            processLinkClose(event.getLink());
-                            break;
-                        case LINK_FLOW:
-                            processLinkFlow(event.getLink());
-                            break;
-                        case DELIVERY:
-                            processDelivery(event.getDelivery());
-                            break;
-                        default:
-                            break;
-                    }
-
-                    eventCollector.pop();
-                }
-
-            } catch (Throwable e) {
-                handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
-            }
-
-            pumpProtonToSocket();
-        }
-    }
-
-    protected void processLinkFlow(Link link) throws Exception {
-        Object context = link.getContext();
-        int credit = link.getCredit();
-        if (context instanceof ConsumerContext) {
-            ConsumerContext consumerContext = (ConsumerContext)context;
-
-            if (credit != consumerContext.credit) {
-                consumerContext.credit = credit >= 0 ? credit : 0;
-                ConsumerControl control = new ConsumerControl();
-                control.setConsumerId(consumerContext.consumerId);
-                control.setDestination(consumerContext.destination);
-                control.setPrefetch(consumerContext.credit);
-                sendToActiveMQ(control, null);
-            }
-        }
-        ((AmqpDeliveryListener) link.getContext()).drainCheck();
-    }
-
-    protected void processConnectionEvent(Connection connection) throws Exception {
-        EndpointState remoteState = connection.getRemoteState();
-        if (remoteState == EndpointState.ACTIVE) {
-            onConnectionOpen();
-        } else if (remoteState == EndpointState.CLOSED) {
-            doClose();
-        }
-    }
-
-    protected void processLinkOpen(Link link) throws Exception {
-        onLinkOpen(link);
-    }
-
-    protected void processLinkDetach(Link link) throws Exception {
-        AmqpDeliveryListener context = (AmqpDeliveryListener) link.getContext();
-        if (context != null) {
-            context.onDetach();
-        }
-        link.detach();
-        link.free();
-    }
-
-    protected void processLinkClose(Link link) throws Exception {
-        AmqpDeliveryListener context = (AmqpDeliveryListener) link.getContext();
-        if (context != null) {
-            context.onClose();
-        }
-        link.close();
-        link.free();
-    }
-
-    protected void processSessionEvent(Session session) throws Exception {
-        EndpointState remoteState = session.getRemoteState();
-        if (remoteState == EndpointState.ACTIVE) {
-            onSessionOpen(session);
-        } else if (remoteState == EndpointState.CLOSED) {
-            // TODO - close links?
-            onSessionClose(session);
-        }
-    }
-
-    protected void processDelivery(Delivery delivery) throws Exception {
-        if (!delivery.isPartial()) {
-            AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext();
-            if (listener != null) {
-                listener.onDelivery(delivery);
-            }
-        }
-    }
-
-    boolean closing = false;
-    boolean closedSocket = false;
-
-    private void doClose() {
-        if (!closing) {
-            closing = true;
-            sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
-                @Override
-                public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                    protonConnection.close();
-                    if (!closedSocket) {
-                        pumpProtonToSocket();
-                    }
-                }
-            });
-            sendToActiveMQ(new ShutdownInfo(), null);
-        }
-    }
-
-    @Override
-    public void onAMQPException(IOException error) {
-        closedSocket = true;
-        if (!closing) {
-            amqpTransport.sendToActiveMQ(error);
-        } else {
-            try {
-                amqpTransport.stop();
-            } catch (Exception ignore) {
-            }
-        }
-    }
-
-    @Override
-    public void onActiveMQCommand(Command command) throws Exception {
-        if (command.isResponse()) {
-            Response response = (Response) command;
-            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
-            if (rh != null) {
-                rh.onResponse(this, response);
-            } else {
-                // Pass down any unexpected errors. Should this close the connection?
-                if (response.isException()) {
-                    Throwable exception = ((ExceptionResponse) response).getException();
-                    handleException(exception);
-                }
-            }
-        } else if (command.isMessageDispatch()) {
-            MessageDispatch md = (MessageDispatch) command;
-            ConsumerContext consumerContext = subscriptionsByConsumerId.get(md.getConsumerId());
-            if (consumerContext != null) {
-                // End of Queue Browse will have no Message object.
-                if (md.getMessage() != null) {
-                    LOG.trace("Dispatching MessageId: {} to consumer", md.getMessage().getMessageId());
-                } else {
-                    LOG.trace("Dispatching End of Browse Command to consumer {}", md.getConsumerId());
-                }
-                consumerContext.onMessageDispatch(md);
-                if (md.getMessage() != null) {
-                    LOG.trace("Finished Dispatch of MessageId: {} to consumer", md.getMessage().getMessageId());
-                }
-            }
-        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
-            // Pass down any unexpected async errors. Should this close the connection?
-            Throwable exception = ((ConnectionError) command).getException();
-            handleException(exception);
-        } else if (command.isBrokerInfo()) {
-            // ignore
-        } else {
-            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
-        }
-    }
-
-    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
-    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
-    private final ConnectionInfo connectionInfo = new ConnectionInfo();
-    private long nextSessionId = 0;
-    private long nextTempDestinationId = 0;
-
-    static abstract class AmqpDeliveryListener {
-
-        protected ActiveMQDestination destination;
-        protected List<Runnable> closeActions = new ArrayList<Runnable>();
-
-        abstract public void onDelivery(Delivery delivery) throws Exception;
-
-        public void onDetach() throws Exception {
-        }
-
-        public void onClose() throws Exception {
-
-            for (Runnable action : closeActions) {
-                action.run();
-            }
-
-            closeActions.clear();
-        }
-
-        public void drainCheck() {
-        }
-
-        abstract void doCommit() throws Exception;
-
-        abstract void doRollback() throws Exception;
-
-        public void addCloseAction(Runnable action) {
-            closeActions.add(action);
-        }
-
-        public ActiveMQDestination getDestination() {
-            return destination;
-        }
-
-        public void setDestination(ActiveMQDestination destination) {
-            this.destination = destination;
-        }
-    }
-
-    private void onConnectionOpen() throws AmqpProtocolException {
-
-        connectionInfo.setResponseRequired(true);
-        connectionInfo.setConnectionId(connectionId);
-
-        configureInactivityMonitor();
-
-        String clientId = protonConnection.getRemoteContainer();
-        if (clientId != null && !clientId.isEmpty()) {
-            connectionInfo.setClientId(clientId);
-        }
-
-        connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
-
-        sendToActiveMQ(connectionInfo, new ResponseHandler() {
-            @Override
-            public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                Throwable exception = null;
-                try {
-                    if (response.isException()) {
-                        protonConnection.setProperties(getFailedConnetionProperties());
-                        protonConnection.open();
-
-                        exception = ((ExceptionResponse) response).getException();
-                        if (exception instanceof SecurityException) {
-                            protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
-                        } else if (exception instanceof InvalidClientIDException) {
-                            protonConnection.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()));
-                        } else {
-                            protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage()));
-                        }
-
-                        protonConnection.close();
-                    } else {
-                        protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-                        protonConnection.setProperties(getConnetionProperties());
-                        protonConnection.open();
-                    }
-                } finally {
-                    pumpProtonToSocket();
-
-                    if (response.isException()) {
-                        amqpTransport.onException(IOExceptionSupport.create(exception));
-                    }
-                }
-            }
-        });
-    }
-
-    private void onSessionOpen(Session session) {
-        AmqpSessionContext sessionContext = new AmqpSessionContext(connectionId, nextSessionId++);
-        session.setContext(sessionContext);
-        sendToActiveMQ(new SessionInfo(sessionContext.sessionId), null);
-        session.setIncomingCapacity(Integer.MAX_VALUE);
-        session.open();
-    }
-
-    private void onSessionClose(Session session) {
-        AmqpSessionContext sessionContext = (AmqpSessionContext) session.getContext();
-        if (sessionContext != null) {
-            LOG.trace("Session {} closed", sessionContext.sessionId);
-            sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
-            session.setContext(null);
-        }
-        session.close();
-        session.free();
-    }
-
-    private void onLinkOpen(Link link) {
-        link.setSource(link.getRemoteSource());
-        link.setTarget(link.getRemoteTarget());
-
-        AmqpSessionContext sessionContext = (AmqpSessionContext) link.getSession().getContext();
-        if (link instanceof Receiver) {
-            onReceiverOpen((Receiver) link, sessionContext);
-        } else {
-            onSenderOpen((Sender) link, sessionContext);
-        }
-    }
-
-    private void configureInactivityMonitor() {
-        AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
-        if (monitor == null) {
-            return;
-        }
-
-        monitor.stopConnectChecker();
-    }
-
-    InboundTransformer inboundTransformer;
-
-    protected InboundTransformer getInboundTransformer() {
-        if (inboundTransformer == null) {
-            String transformer = amqpTransport.getTransformer();
-            if (transformer.equals(InboundTransformer.TRANSFORMER_JMS)) {
-                inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
-            } else if (transformer.equals(InboundTransformer.TRANSFORMER_NATIVE)) {
-                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
-            } else if (transformer.equals(InboundTransformer.TRANSFORMER_RAW)) {
-                inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
-            } else {
-                LOG.warn("Unknown transformer type {} using native one instead", transformer);
-                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
-            }
-        }
-        return inboundTransformer;
-    }
-
-    abstract class BaseProducerContext extends AmqpDeliveryListener {
-
-        ByteArrayOutputStream current = new ByteArrayOutputStream();
-
-        private final byte[] recvBuffer = new byte[1024 * 8];
-
-        @Override
-        public void onDelivery(Delivery delivery) throws Exception {
-            Receiver receiver = ((Receiver) delivery.getLink());
-            if (!delivery.isReadable()) {
-                LOG.debug("Delivery was not readable!");
-                return;
-            }
-
-            if (current == null) {
-                current = new ByteArrayOutputStream();
-            }
-
-            int count;
-            while ((count = receiver.recv(recvBuffer, 0, recvBuffer.length)) > 0) {
-                current.write(recvBuffer, 0, count);
-            }
-
-            // Expecting more deliveries..
-            if (count == 0) {
-                return;
-            }
-
-            receiver.advance();
-            Buffer buffer = current.toBuffer();
-            current = null;
-            onMessage(receiver, delivery, buffer);
-        }
-
-        @Override
-        void doCommit() throws Exception {
-        }
-
-        @Override
-        void doRollback() throws Exception {
-        }
-
-        abstract protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception;
-    }
-
-    class ProducerContext extends BaseProducerContext {
-        private final ProducerId producerId;
-        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
-        private boolean closed;
-        private boolean anonymous;
-
-        public ProducerContext(ProducerId producerId) {
-            this.producerId = producerId;
-        }
-
-        @Override
-        public String toString() {
-            return "ProducerContext { producerId = " + producerId + ", destination = " + destination + " }";
-        }
-
-        @Override
-        protected void onMessage(final Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
-            if (!closed) {
-                EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
-                final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
-                current = null;
-
-                if (destination != null) {
-                    message.setJMSDestination(destination);
-                } else if (isAnonymous()) {
-                    Destination toDestination = message.getJMSDestination();
-                    if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) {
-                        Rejected rejected = new Rejected();
-                        ErrorCondition condition = new ErrorCondition();
-                        condition.setCondition(Symbol.valueOf("failed"));
-                        condition.setDescription("Missing to field for message sent to an anonymous producer");
-                        rejected.setError(condition);
-                        delivery.disposition(rejected);
-                        return;
-                    }
-                }
-                message.setProducerId(producerId);
-
-                // Always override the AMQP client's MessageId with our own.  Preserve
-                // the original in the TextView property for later Ack.
-                MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
-
-                MessageId amqpMessageId = message.getMessageId();
-                if (amqpMessageId != null) {
-                    if (amqpMessageId.getTextView() != null) {
-                        messageId.setTextView(amqpMessageId.getTextView());
-                    } else {
-                        messageId.setTextView(amqpMessageId.toString());
-                    }
-                }
-
-                message.setMessageId(messageId);
-
-                LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId());
-
-                final DeliveryState remoteState = delivery.getRemoteState();
-                if (remoteState != null && remoteState instanceof TransactionalState) {
-                    TransactionalState s = (TransactionalState) remoteState;
-                    long txid = toLong(s.getTxnId());
-                    message.setTransactionId(new LocalTransactionId(connectionId, txid));
-                }
-
-                message.onSend();
-                if (!delivery.remotelySettled()) {
-                    sendToActiveMQ(message, new ResponseHandler() {
-
-                        @Override
-                        public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                            if (response.isException()) {
-                                ExceptionResponse er = (ExceptionResponse) response;
-                                Rejected rejected = new Rejected();
-                                ErrorCondition condition = new ErrorCondition();
-                                condition.setCondition(Symbol.valueOf("failed"));
-                                condition.setDescription(er.getException().getMessage());
-                                rejected.setError(condition);
-                                delivery.disposition(rejected);
-                            } else {
-                                if (receiver.getCredit() <= (producerCredit * .2)) {
-                                    LOG.trace("Sending more credit ({}) to producer: {}", producerCredit - receiver.getCredit(), producerId);
-                                    receiver.flow(producerCredit - receiver.getCredit());
-                                }
-
-                                if (remoteState != null && remoteState instanceof TransactionalState) {
-                                    TransactionalState txAccepted = new TransactionalState();
-                                    txAccepted.setOutcome(Accepted.getInstance());
-                                    txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
-
-                                    delivery.disposition(txAccepted);
-                                } else {
-                                    delivery.disposition(Accepted.getInstance());
-                                }
-
-                                delivery.settle();
-                            }
-
-                            pumpProtonToSocket();
-                        }
-                    });
-                } else {
-                    if (receiver.getCredit() <= (producerCredit * .2)) {
-                        LOG.trace("Sending more credit ({}) to producer: {}", producerCredit - receiver.getCredit(), producerId);
-                        receiver.flow(producerCredit - receiver.getCredit());
-                        pumpProtonToSocket();
-                    }
-                    sendToActiveMQ(message, null);
-                }
-            }
-        }
-
-        @Override
-        public void onClose() throws Exception {
-            if (!closed) {
-                sendToActiveMQ(new RemoveInfo(producerId), null);
-            }
-
-            super.onClose();
-        }
-
-        public void close() {
-            closed = true;
-        }
-
-        public boolean isAnonymous() {
-            return anonymous;
-        }
-    }
-
-    private final AtomicLong nextTransactionId = new AtomicLong();
-
-    AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
-
-        @Override
-        protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
-
-            Message msg = Proton.message();
-            int offset = buffer.offset;
-            int len = buffer.length;
-            while (len > 0) {
-                final int decoded = msg.decode(buffer.data, offset, len);
-                assert decoded > 0 : "Make progress decoding the message";
-                offset += decoded;
-                len -= decoded;
-            }
-
-            final Object action = ((AmqpValue) msg.getBody()).getValue();
-            LOG.debug("COORDINATOR received: {}, [{}]", action, buffer);
-            if (action instanceof Declare) {
-                Declare declare = (Declare) action;
-                if (declare.getGlobalId() != null) {
-                    throw new Exception("don't know how to handle a declare /w a set GlobalId");
-                }
-
-                long txid = nextTransactionId.incrementAndGet();
-                TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
-                sendToActiveMQ(txinfo, null);
-                LOG.trace("started transaction {}", txid);
-
-                Declared declared = new Declared();
-                declared.setTxnId(new Binary(toBytes(txid)));
-                delivery.disposition(declared);
-                delivery.settle();
-            } else if (action instanceof Discharge) {
-                Discharge discharge = (Discharge) action;
-                long txid = toLong(discharge.getTxnId());
-
-                final byte operation;
-                if (discharge.getFail()) {
-                    LOG.trace("rollback transaction {}", txid);
-                    operation = TransactionInfo.ROLLBACK;
-                } else {
-                    LOG.trace("commit transaction {}", txid);
-                    operation = TransactionInfo.COMMIT_ONE_PHASE;
-                }
-
-                AmqpSessionContext context = (AmqpSessionContext) receiver.getSession().getContext();
-                for (ConsumerContext consumer : context.consumers.values()) {
-                    if (operation == TransactionInfo.ROLLBACK) {
-                        consumer.doRollback();
-                    } else {
-                        consumer.doCommit();
-                    }
-                }
-
-                TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
-                sendToActiveMQ(txinfo, new ResponseHandler() {
-                    @Override
-                    public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                        if (response.isException()) {
-                            ExceptionResponse er = (ExceptionResponse) response;
-                            Rejected rejected = new Rejected();
-                            rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage()));
-                            delivery.disposition(rejected);
-                        } else {
-                            delivery.disposition(Accepted.getInstance());
-                        }
-                        LOG.debug("TX: {} settling {}", operation, action);
-                        delivery.settle();
-                        pumpProtonToSocket();
-                    }
-                });
-
-                for (ConsumerContext consumer : context.consumers.values()) {
-                    if (operation == TransactionInfo.ROLLBACK) {
-                        consumer.pumpOutbound();
-                    }
-                }
-
-            } else {
-                throw new Exception("Expected coordinator message type: " + action.getClass());
-            }
-        }
-    };
-
-    void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
-        // Client is producing to this receiver object
-        org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
-        int flow = producerCredit;
-
-        try {
-            if (remoteTarget instanceof Coordinator) {
-                pumpProtonToSocket();
-                receiver.setContext(coordinatorContext);
-                receiver.flow(flow);
-                receiver.open();
-                pumpProtonToSocket();
-            } else {
-                Target target = (Target) remoteTarget;
-                ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
-                final ProducerContext producerContext = new ProducerContext(producerId);
-                ActiveMQDestination destination = null;
-                String targetNodeName = target.getAddress();
-
-                if ((targetNodeName == null || targetNodeName.length() == 0) && !target.getDynamic()) {
-                    producerContext.anonymous = true;
-                } else if (target.getDynamic()) {
-                    destination = createTemporaryDestination(receiver, target.getCapabilities());
-                    Target actualTarget = new Target();
-                    actualTarget.setAddress(destination.getQualifiedName());
-                    actualTarget.setDynamic(true);
-                    receiver.setTarget(actualTarget);
-                    producerContext.addCloseAction(new Runnable() {
-
-                        @Override
-                        public void run() {
-                            deleteTemporaryDestination((ActiveMQTempDestination) producerContext.getDestination());
-                        }
-                    });
-                } else {
-                    destination = createDestination(remoteTarget);
-                }
-
-                receiver.setContext(producerContext);
-                receiver.flow(flow);
-
-                ProducerInfo producerInfo = new ProducerInfo(producerId);
-                producerInfo.setDestination(destination);
-                producerContext.setDestination(destination);
-                sendToActiveMQ(producerInfo, new ResponseHandler() {
-                    @Override
-                    public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                        if (response.isException()) {
-                            receiver.setTarget(null);
-                            Throwable exception = ((ExceptionResponse) response).getException();
-                            if (exception instanceof SecurityException) {
-                                receiver.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
-                            } else {
-                                receiver.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
-                            }
-                            producerContext.closed = true;
-                            receiver.close();
-                            receiver.free();
-                        } else {
-                            receiver.open();
-                        }
-                        pumpProtonToSocket();
-                    }
-                });
-            }
-        } catch (AmqpProtocolException exception) {
-            receiver.setTarget(null);
-            receiver.setCondition(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage()));
-            receiver.close();
-        }
-    }
-
-    private ActiveMQDestination createDestination(Object endpoint) throws AmqpProtocolException {
-        if (endpoint == null) {
-            return null;
-        } else if (endpoint instanceof Coordinator) {
-            return null;
-        } else if (endpoint instanceof org.apache.qpid.proton.amqp.messaging.Terminus) {
-            org.apache.qpid.proton.amqp.messaging.Terminus terminus = (org.apache.qpid.proton.amqp.messaging.Terminus) endpoint;
-            if (terminus.getAddress() == null || terminus.getAddress().length() == 0) {
-                if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
-                    throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
-                } else {
-                    throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
-                }
-            }
-
-            return ActiveMQDestination.createDestination(terminus.getAddress(), ActiveMQDestination.QUEUE_TYPE);
-        } else {
-            throw new RuntimeException("Unexpected terminus type: " + endpoint);
-        }
-    }
-
-    OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
-
-    class ConsumerContext extends AmqpDeliveryListener {
-        private final ConsumerId consumerId;
-        private final Sender sender;
-        private final boolean presettle;
-        private boolean closed;
-        public ConsumerInfo info;
-        private boolean endOfBrowse = false;
-        public int credit;
-        private long lastDeliveredSequenceId;
-
-        protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
-
-        public ConsumerContext(ConsumerId consumerId, Sender sender) {
-            this.consumerId = consumerId;
-            this.sender = sender;
-            this.presettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
-        }
-
-        long nextTagId = 0;
-        HashSet<byte[]> tagCache = new HashSet<byte[]>();
-
-        byte[] nextTag() {
-            byte[] rc;
-            if (tagCache != null && !tagCache.isEmpty()) {
-                final Iterator<byte[]> iterator = tagCache.iterator();
-                rc = iterator.next();
-                iterator.remove();
-            } else {
-                try {
-                    rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
-                } catch (UnsupportedEncodingException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-            return rc;
-        }
-
-        void checkinTag(byte[] data) {
-            if (tagCache.size() < 1024) {
-                tagCache.add(data);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "ConsumerContext { " + info + " }";
-        }
-
-        @Override
-        public void onDetach() throws Exception {
-            if (!closed) {
-                closed = true;
-                sender.setContext(null);
-                subscriptionsByConsumerId.remove(consumerId);
-
-                AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
-                if (session != null) {
-                    session.consumers.remove(info.getConsumerId());
-                }
-
-                RemoveInfo removeCommand = new RemoveInfo(consumerId);
-                removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-                sendToActiveMQ(removeCommand, null);
-            }
-        }
-
-        @Override
-        public void onClose() throws Exception {
-            try {
-                if (!closed) {
-                    closed = true;
-                    sender.setContext(null);
-                    subscriptionsByConsumerId.remove(consumerId);
-
-                    AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
-                    if (session != null) {
-                        session.consumers.remove(info.getConsumerId());
-                    }
-
-                    RemoveInfo removeCommand = new RemoveInfo(consumerId);
-                    removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-                    sendToActiveMQ(removeCommand, null);
-
-                    if (info.isDurable()) {
-                        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
-                        rsi.setConnectionId(connectionId);
-                        rsi.setSubscriptionName(sender.getName());
-                        rsi.setClientId(connectionInfo.getClientId());
-
-                        sendToActiveMQ(rsi, null);
-                    }
-                }
-            } finally {
-                super.onClose();
-            }
-        }
-
-        LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
-
-        // called when the connection receives a JMS message from ActiveMQ
-        public void onMessageDispatch(MessageDispatch md) throws Exception {
-            if (!closed) {
-                // Lock to prevent stepping on TX redelivery
-                synchronized (outbound) {
-                    outbound.addLast(md);
-                }
-                pumpOutbound();
-                pumpProtonToSocket();
-            }
-        }
-
-        Buffer currentBuffer;
-        Delivery currentDelivery;
-        final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
-
-        public void pumpOutbound() throws Exception {
-            while (!closed) {
-                while (currentBuffer != null) {
-                    int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
-                    if (sent > 0) {
-                        currentBuffer.moveHead(sent);
-                        if (currentBuffer.length == 0) {
-                            if (presettle) {
-                                settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
-                            } else {
-                                sender.advance();
-                            }
-                            currentBuffer = null;
-                            currentDelivery = null;
-                        }
-                    } else {
-                        return;
-                    }
-                }
-
-                if (outbound.isEmpty()) {
-                    return;
-                }
-
-                final MessageDispatch md = outbound.removeFirst();
-                try {
-
-                    ActiveMQMessage temp = null;
-                    if (md.getMessage() != null) {
-
-                        // Topics can dispatch the same Message to more than one consumer
-                        // so we must copy to prevent concurrent read / write to the same
-                        // message object.
-                        if (md.getDestination().isTopic()) {
-                            synchronized (md.getMessage()) {
-                                temp = (ActiveMQMessage) md.getMessage().copy();
-                            }
-                        } else {
-                            temp = (ActiveMQMessage) md.getMessage();
-                        }
-
-                        if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
-                            temp.setProperty(MESSAGE_FORMAT_KEY, 0);
-                        }
-                    }
-
-                    final ActiveMQMessage jms = temp;
-                    if (jms == null) {
-                        // It's the end of browse signal.
-                        endOfBrowse = true;
-                        drainCheck();
-                    } else {
-                        jms.setRedeliveryCounter(md.getRedeliveryCounter());
-                        jms.setReadOnlyBody(true);
-                        final EncodedMessage amqp = outboundTransformer.transform(jms);
-                        if (amqp != null && amqp.getLength() > 0) {
-                            currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
-                            if (presettle) {
-                                currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
-                            } else {
-                                final byte[] tag = nextTag();
-                                currentDelivery = sender.delivery(tag, 0, tag.length);
-                            }
-                            currentDelivery.setContext(md);
-                        } else {
-                            // TODO: message could not be generated what now?
-                        }
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-
-        private void settle(final Delivery delivery, final int ackType) throws Exception {
-            byte[] tag = delivery.getTag();
-            if (tag != null && tag.length > 0 && delivery.remotelySettled()) {
-                checkinTag(tag);
-            }
-
-            if (ackType == -1) {
-                // we are going to settle, but redeliver.. we we won't yet ack
-                // to ActiveMQ
-                delivery.settle();
-                onMessageDispatch((MessageDispatch) delivery.getContext());
-            } else {
-                MessageDispatch md = (MessageDispatch) delivery.getContext();
-                lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
-                MessageAck ack = new MessageAck();
-                ack.setConsumerId(consumerId);
-                ack.setFirstMessageId(md.getMessage().getMessageId());
-                ack.setLastMessageId(md.getMessage().getMessageId());
-                ack.setMessageCount(1);
-                ack.setAckType((byte) ackType);
-                ack.setDestination(md.getDestination());
-
-                DeliveryState remoteState = delivery.getRemoteState();
-                if (remoteState != null && remoteState instanceof TransactionalState) {
-                    TransactionalState s = (TransactionalState) remoteState;
-                    long txid = toLong(s.getTxnId());
-                    LocalTransactionId localTxId = new LocalTransactionId(connectionId, txid);
-                    ack.setTransactionId(localTxId);
-
-                    // Store the message sent in this TX we might need to
-                    // re-send on rollback
-                    md.getMessage().setTransactionId(localTxId);
-                    dispatchedInTx.addFirst(md);
-                }
-
-
-                LOG.trace("Sending Ack to ActiveMQ: {}", ack);
-
-                sendToActiveMQ(ack, new ResponseHandler() {
-                    @Override
-                    public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                        if (response.isException()) {
-                            if (response.isException()) {
-                                Throwable exception = ((ExceptionResponse) response).getException();
-                                exception.printStackTrace();
-                                sender.close();
-                            }
-                        } else {
-                            delivery.settle();
-                        }
-                        pumpProtonToSocket();
-                    }
-                });
-            }
-        }
-
-        @Override
-        public void drainCheck() {
-            // If we are a browser.. lets not say we are drained until
-            // we hit the end of browse message.
-            if (info.isBrowser() && !endOfBrowse)
-                return;
-
-            if (outbound.isEmpty()) {
-                sender.drained();
-            }
-        }
-
-        @Override
-        public void onDelivery(Delivery delivery) throws Exception {
-            MessageDispatch md = (MessageDispatch) delivery.getContext();
-            DeliveryState state = delivery.getRemoteState();
-
-            if (state instanceof TransactionalState) {
-                TransactionalState txState = (TransactionalState) state;
-                LOG.trace("onDelivery: TX delivery state = {}", state);
-                if (txState.getOutcome() != null) {
-                    Outcome outcome = txState.getOutcome();
-                    if (outcome instanceof Accepted) {
-                        if (!delivery.remotelySettled()) {
-                            TransactionalState txAccepted = new TransactionalState();
-                            txAccepted.setOutcome(Accepted.getInstance());
-                            txAccepted.setTxnId(((TransactionalState) state).getTxnId());
-
-                            delivery.disposition(txAccepted);
-                        }
-                        settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
-                    }
-                }
-            } else {
-                if (state instanceof Accepted) {
-                    LOG.trace("onDelivery: accepted state = {}", state);
-                    if (!delivery.remotelySettled()) {
-                        delivery.disposition(new Accepted());
-                    }
-                    settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
-                } else if (state instanceof Rejected) {
-                    // re-deliver /w incremented delivery counter.
-                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
-                    LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
-                    settle(delivery, -1);
-                } else if (state instanceof Released) {
-                    LOG.trace("onDelivery: Released state = {}", state);
-                    // re-deliver && don't increment the counter.
-                    settle(delivery, -1);
-                } else if (state instanceof Modified) {
-                    Modified modified = (Modified) state;
-                    if (modified.getDeliveryFailed()) {
-                        // increment delivery counter..
-                        md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
-                    }
-                    LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
-                    byte ackType = -1;
-                    Boolean undeliverableHere = modified.getUndeliverableHere();
-                    if (undeliverableHere != null && undeliverableHere) {
-                        // receiver does not want the message..
-                        // perhaps we should DLQ it?
-                        ackType = MessageAck.POSION_ACK_TYPE;
-                    }
-                    settle(delivery, ackType);
-                }
-            }
-            pumpOutbound();
-        }
-
-        @Override
-        void doCommit() throws Exception {
-            if (!dispatchedInTx.isEmpty()) {
-                for (MessageDispatch md : dispatchedInTx) {
-                    MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
-                    pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
-                    pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
-
-                    LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
-
-                    sendToActiveMQ(pendingTxAck, new ResponseHandler() {
-                        @Override
-                        public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                            if (response.isException()) {
-                                if (response.isException()) {
-                                    Throwable exception = ((ExceptionResponse) response).getException();
-                                    exception.printStackTrace();
-                                    sender.close();
-                                }
-                            }
-                            pumpProtonToSocket();
-                        }
-                    });
-                }
-
-                dispatchedInTx.clear();
-            }
-        }
-
-        @Override
-        void doRollback() throws Exception {
-            synchronized (outbound) {
-
-                LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
-
-                for (MessageDispatch md : dispatchedInTx) {
-                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
-                    md.getMessage().setTransactionId(null);
-                    outbound.addFirst(md);
-                }
-
-                dispatchedInTx.clear();
-            }
-        }
-    }
-
-    private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
-
-    @SuppressWarnings("unchecked")
-    void onSenderOpen(final Sender sender, final AmqpSessionContext sessionContext) {
-        org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
-
-        try {
-            final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>();
-            final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
-            final ConsumerContext consumerContext = new ConsumerContext(id, sender);
-            sender.setContext(consumerContext);
-
-            boolean noLocal = false;
-            String selector = null;
-
-            if (source != null) {
-                Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
-                if (filter != null) {
-                    selector = filter.getValue().getDescribed().toString();
-                    // Validate the Selector.
-                    try {
-                        SelectorParser.parse(selector);
-                    } catch (InvalidSelectorException e) {
-                        sender.setSource(null);
-                        sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
-                        sender.close();
-                        consumerContext.closed = true;
-                        return;
-                    }
-
-                    supportedFilters.put(filter.getKey(), filter.getValue());
-                }
-
-                filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
-                if (filter != null) {
-                    noLocal = true;
-                    supportedFilters.put(filter.getKey(), filter.getValue());
-                }
-            }
-
-            ActiveMQDestination destination;
-            if (source == null) {
-                // Attempt to recover previous subscription
-                destination = lookupSubscription(sender.getName());
-
-                if (destination != null) {
-                    source = new org.apache.qpid.proton.amqp.messaging.Source();
-                    source.setAddress(destination.getQualifiedName());
-                    source.setDurable(TerminusDurability.UNSETTLED_STATE);
-                    source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
-                    source.setDistributionMode(COPY);
-                } else {
-                    consumerContext.closed = true;
-                    sender.setSource(null);
-                    sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + sender.getName()));
-                    sender.close();
-                    sender.free();
-                    pumpProtonToSocket();
-                    return;
-                }
-            } else if (source.getDynamic()) {
-                // lets create a temp dest.
-                destination = createTemporaryDestination(sender, source.getCapabilities());
-                source = new org.apache.qpid.proton.amqp.messaging.Source();
-                source.setAddress(destination.getQualifiedName());
-                source.setDynamic(true);
-                consumerContext.addCloseAction(new Runnable() {
-
-                    @Override
-                    public void run() {
-                        deleteTemporaryDestination((ActiveMQTempDestination) consumerContext.getDestination());
-                    }
-                });
-            } else {
-                destination = createDestination(source);
-            }
-
-            source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
-            sender.setSource(source);
-
-            int senderCredit = sender.getRemoteCredit();
-
-            subscriptionsByConsumerId.put(id, consumerContext);
-            ConsumerInfo consumerInfo = new ConsumerInfo(id);
-            consumerInfo.setSelector(selector);
-            consumerInfo.setNoRangeAcks(true);
-            consumerInfo.setDestination(destination);
-            consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
-            consumerInfo.setDispatchAsync(true);
-            consumerInfo.setNoLocal(noLocal);
-
-            if (source.getDistributionMode() == COPY && destination.isQueue()) {
-                consumerInfo.setBrowser(true);
-            }
-            if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
-                 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
-                consumerInfo.setSubscriptionName(sender.getName());
-            }
-
-            consumerContext.info = consumerInfo;
-            consumerContext.setDestination(destination);
-            consumerContext.credit = senderCredit;
-
-            sendToActiveMQ(consumerInfo, new ResponseHandler() {
-                @Override
-                public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                    if (response.isException()) {
-                        sender.setSource(null);
-                        Throwable exception = ((ExceptionResponse) response).getException();
-                        if (exception instanceof SecurityException) {
-                            sender.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
-                        } else if (exception instanceof InvalidSelectorException) {
-                            sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()));
-                        } else {
-                            sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
-                        }
-                        subscriptionsByConsumerId.remove(id);
-                        consumerContext.closed = true;
-                        sender.close();
-                        sender.free();
-                    } else {
-                        sessionContext.consumers.put(id, consumerContext);
-                        sender.open();
-                    }
-                    pumpProtonToSocket();
-                }
-            });
-        } catch (AmqpProtocolException e) {
-            sender.setSource(null);
-            sender.setCondition(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage()));
-            sender.close();
-        }
-    }
-
-    private ActiveMQDestination lookupSubscription(String subscriptionName) throws AmqpProtocolException {
-        ActiveMQDestination result = null;
-        RegionBroker regionBroker;
-
-        try {
-            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
-        } catch (Exception e) {
-            throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e);
-        }
-
-        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
-        DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId());
-        if (subscription != null) {
-            result = subscription.getActiveMQDestination();
-        }
-
-        return result;
-    }
-
-    private ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) {
-        ActiveMQDestination rc = null;
-        if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {
-            rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++);
-        } else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) {
-            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
-        } else {
-            LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue");
-            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
-        }
-
-        DestinationInfo info = new DestinationInfo();
-        info.setConnectionId(connectionId);
-        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
-        info.setDestination(rc);
-
-        sendToActiveMQ(info, new ResponseHandler() {
-
-            @Override
-            public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                if (response.isException()) {
-                    link.setSource(null);
-
-                    Throwable exception = ((ExceptionResponse) response).getException();
-                    if (exception instanceof SecurityException) {
-                        link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
-                    } else {
-                        link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
-                    }
-
-                    link.close();
-                    link.free();
-                }
-            }
-        });
-
-        return rc;
-    }
-
-    private void deleteTemporaryDestination(ActiveMQTempDestination destination) {
-        DestinationInfo info = new DestinationInfo();
-        info.setConnectionId(connectionId);
-        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
-        info.setDestination(destination);
-
-        sendToActiveMQ(info, new ResponseHandler() {
-
-            @Override
-            public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                if (response.isException()) {
-                    Throwable exception = ((ExceptionResponse) response).getException();
-                    LOG.debug("Error during temp destination removeal: {}", exception.getMessage());
-                }
-            }
-        });
-    }
-
-    // //////////////////////////////////////////////////////////////////////////
-    //
-    // Implementation methods
-    //
-    // //////////////////////////////////////////////////////////////////////////
-
-    private final AtomicInteger lastCommandId = new AtomicInteger();
-    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
-
-    void sendToActiveMQ(Command command, ResponseHandler handler) {
-        command.setCommandId(lastCommandId.incrementAndGet());
-        if (handler != null) {
-            command.setResponseRequired(true);
-            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
-        }
-        amqpTransport.sendToActiveMQ(command);
-    }
-
-    void handleException(Throwable exception) {
-        exception.printStackTrace();
-        LOG.debug("Exception detail", exception);
-        try {
-            amqpTransport.stop();
-        } catch (Throwable e) {
-            LOG.error("Failed to stop AMQP Transport ", e);
-        }
-    }
-
-    @Override
-    public void setProducerCredit(int producerCredit) {
-        this.producerCredit = producerCredit;
-    }
-
-    @SuppressWarnings("unused")
-    private List<SubscriptionInfo> lookupSubscriptions() throws AmqpProtocolException {
-        List<SubscriptionInfo> subscriptions = Collections.emptyList();
-        try {
-            subscriptions = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId());
-        } catch (IOException e) {
-            throw new AmqpProtocolException("Error loading store subscriptions", true, e);
-        }
-
-        return subscriptions;
-    }
-
-    public boolean tryAuthenticate(ConnectionInfo info, X509Certificate[] peerCertificates) {
-        try {
-            if (getAuthenticator().authenticate(info.getUserName(), info.getPassword(), peerCertificates) != null) {
-                return true;
-            }
-
-            return false;
-        } catch (Throwable error) {
-            return false;
-        }
-    }
-
-    private AuthenticationBroker getAuthenticator() {
-        if (authenticator == null) {
-            try {
-                authenticator = (AuthenticationBroker) brokerService.getBroker().getAdaptor(AuthenticationBroker.class);
-            } catch (Exception e) {
-                LOG.debug("Failed to lookup AuthenticationBroker from Broker, will use a default Noop version.");
-            }
-
-            if (authenticator == null) {
-                authenticator = new DefaultAuthenticationBroker();
-            }
-        }
-
-        return authenticator;
-    }
-
-    private class DefaultAuthenticationBroker implements AuthenticationBroker {
-
-        @Override
-        public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
-            return new SecurityContext(username) {
+    void updateTracer();
 
-                @Override
-                public Set<Principal> getPrincipals() {
-                    return null;
-                }
-            };
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolDiscriminator.java
new file mode 100644
index 0000000..85f44ec
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolDiscriminator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
+
+/**
+ * Used to assign the best implementation of a AmqpProtocolConverter to the
+ * AmqpTransport based on the AmqpHeader that the client sends us.
+ */
+public class AmqpProtocolDiscriminator implements AmqpProtocolConverter {
+
+    public static final int DEFAULT_PREFETCH = 1000;
+
+    private final AmqpTransport transport;
+    private final BrokerService brokerService;
+
+    interface Discriminator {
+        boolean matches(AmqpHeader header);
+
+        AmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService);
+    }
+
+    static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
+    static {
+        DISCRIMINATORS.add(new Discriminator() {
+
+            @Override
+            public AmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService) {
+                return new AmqpConnection(transport, brokerService);
+            }
+
+            @Override
+            public boolean matches(AmqpHeader header) {
+                switch (header.getProtocolId()) {
+                    case 0:
+                    case 3:
+                        if (header.getMajor() == 1 && header.getMinor() == 0 && header.getRevision() == 0) {
+                            return true;
+                        }
+                }
+                return false;
+            }
+        });
+    }
+
+    final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
+
+    public AmqpProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService) {
+        this.transport = transport;
+        this.brokerService = brokerService;
+    }
+
+    @Override
+    public void onAMQPData(Object command) throws Exception {
+        if (command.getClass() == AmqpHeader.class) {
+            AmqpHeader header = (AmqpHeader) command;
+
+            Discriminator match = null;
+            for (Discriminator discriminator : DISCRIMINATORS) {
+                if (discriminator.matches(header)) {
+                    match = discriminator;
+                }
+            }
+
+            // Lets use first in the list if none are a good match.
+            if (match == null) {
+                match = DISCRIMINATORS.get(0);
+            }
+
+            AmqpProtocolConverter next = match.create(transport, brokerService);
+            transport.setProtocolConverter(next);
+            for (Command send : pendingCommands) {
+                next.onActiveMQCommand(send);
+            }
+            pendingCommands.clear();
+            next.onAMQPData(command);
+        } else {
+            throw new IllegalStateException();
+        }
+    }
+
+    @Override
+    public void onAMQPException(IOException error) {
+    }
+
+    @Override
+    public void onActiveMQCommand(Command command) throws Exception {
+        pendingCommands.add(command);
+    }
+
+    @Override
+    public void updateTracer() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSslTransportFactory.java
new file mode 100644
index 0000000..5d04e8c
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSslTransportFactory.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.SslTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * A <a href="http://amqp.org/">AMQP</a> over SSL transport factory
+ */
+public class AmqpSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
+
+    private BrokerService brokerService = null;
+
+    @Override
+    protected String getDefaultWireFormatType() {
+        return "amqp";
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+        transport = new AmqpTransportFilter(transport, format, brokerService);
+        IntrospectionSupport.setProperties(transport, options);
+        return super.compositeConfigure(transport, format, options);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+        transport = super.serverConfigure(transport, format, options);
+
+        // strip off the mutex transport.
+        if (transport instanceof MutexTransport) {
+            transport = ((MutexTransport) transport).getNext();
+        }
+
+        return transport;
+    }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    @Override
+    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+        filter.setInactivityMonitor(monitor);
+        return monitor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
index 526a043..1c3d587 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -20,10 +20,12 @@ import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.Map;
 
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
 import org.fusesource.hawtbuf.Buffer;
 
 /**
@@ -53,7 +55,7 @@ public class AmqpSupport {
     public static final Symbol COPY = Symbol.getSymbol("copy");
 
     // Lifetime policy symbols
-    public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+    public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
 
     /**
      * Search for a given Symbol in a given array of Symbol object.
@@ -169,4 +171,39 @@ public class AmqpSupport {
         Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength());
         return buffer.bigEndianEditor().readLong();
     }
+
+    /**
+     * Given an AMQP endpoint, deduce the appropriate ActiveMQDestination type and create
+     * a new instance.  By default if the endpoint address does not carry the standard prefix
+     * value then we default to a Queue type destination.  If the endpoint is null or is an
+     * AMQP Coordinator type endpoint this method returns null to indicate no destination
+     * can be mapped.
+     *
+     * @param endpoint
+     *        the AMQP endpoint to construct an ActiveMQDestination from.
+     *
+     * @return a new ActiveMQDestination that best matches the address of the given endpoint
+     *
+     * @throws AmqpProtocolException if an error occurs while deducing the destination type.
+     */
+    public static ActiveMQDestination createDestination(Object endpoint) throws AmqpProtocolException {
+        if (endpoint == null) {
+            return null;
+        } else if (endpoint instanceof Coordinator) {
+            return null;
+        } else if (endpoint instanceof org.apache.qpid.proton.amqp.messaging.Terminus) {
+            org.apache.qpid.proton.amqp.messaging.Terminus terminus = (org.apache.qpid.proton.amqp.messaging.Terminus) endpoint;
+            if (terminus.getAddress() == null || terminus.getAddress().length() == 0) {
+                if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
+                    throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
+                } else {
+                    throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
+                }
+            }
+
+            return ActiveMQDestination.createDestination(terminus.getAddress(), ActiveMQDestination.QUEUE_TYPE);
+        } else {
+            throw new RuntimeException("Unexpected terminus type: " + endpoint);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
index 698d7b7..1972e18 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
@@ -46,11 +46,12 @@ public interface AmqpTransport {
 
     public boolean isTrace();
 
-    public IAmqpProtocolConverter getProtocolConverter();
+    public AmqpProtocolConverter getProtocolConverter();
 
-    public void setProtocolConverter(IAmqpProtocolConverter protocolConverter);
+    public void setProtocolConverter(AmqpProtocolConverter protocolConverter);
 
     public void setInactivityMonitor(AmqpInactivityMonitor monitor);
 
     public AmqpInactivityMonitor getInactivityMonitor();
+
 }