You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/27 13:54:39 UTC

[12/15] activemq-artemis git commit: ARTEMIS-751 Simplification of the AMQP implementation

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
deleted file mode 100644
index d5b2ff7..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.proton.plug;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
-import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
-import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
-import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
-import org.apache.activemq.artemis.core.remoting.FailureListener;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
-import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.utils.ReusableLatch;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.SASLResult;
-import org.proton.plug.ServerSASL;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.handler.ExtCapability;
-import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
-import org.proton.plug.sasl.AnonymousServerSASL;
-
-import static org.proton.plug.AmqpSupport.CONTAINER_ID;
-import static org.proton.plug.AmqpSupport.INVALID_FIELD;
-import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
-
-public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, FailureListener, CloseListener {
-   private static final Logger logger = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
-   private static final List<String> connectedContainers = Collections.synchronizedList(new ArrayList());
-
-   private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>();
-
-   private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
-
-   private final ProtonProtocolManager manager;
-
-   private final Connection connection;
-
-   protected ActiveMQProtonRemotingConnection protonConnectionDelegate;
-
-   protected AMQPConnectionContext amqpConnection;
-
-   private final ReusableLatch latch = new ReusableLatch(0);
-
-   private final Executor closeExecutor;
-
-   private String remoteContainerId;
-
-   private AtomicBoolean registeredConnectionId = new AtomicBoolean(false);
-
-   private ActiveMQServer server;
-
-   public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
-                                           Connection connection,
-                                           Executor closeExecutor,
-                                           ActiveMQServer server) {
-      this.manager = manager;
-      this.connection = connection;
-      this.closeExecutor = closeExecutor;
-      this.server = server;
-   }
-
-   @Override
-   public ServerSASL[] getSASLMechnisms() {
-
-      ServerSASL[] result;
-
-      if (isSupportsAnonymous()) {
-         result = new ServerSASL[]{new ActiveMQPlainSASL(manager.getServer().getSecurityStore()), new AnonymousServerSASL()};
-      }
-      else {
-         result = new ServerSASL[]{new ActiveMQPlainSASL(manager.getServer().getSecurityStore())};
-      }
-
-      return result;
-   }
-
-   @Override
-   public boolean isSupportsAnonymous() {
-      boolean supportsAnonymous = false;
-      try {
-         manager.getServer().getSecurityStore().authenticate(null, null, null);
-         supportsAnonymous = true;
-      }
-      catch (Exception e) {
-         // authentication failed so no anonymous support
-      }
-      return supportsAnonymous;
-   }
-
-   @Override
-   public void close() {
-      try {
-         if (registeredConnectionId.getAndSet(false)) {
-            server.removeClientConnection(remoteContainerId);
-         }
-         connection.close();
-         amqpConnection.close();
-      }
-      finally {
-         for (Transaction tx : transactions.values()) {
-            try {
-               tx.rollback();
-            }
-            catch (Exception e) {
-               logger.warn(e.getMessage(), e);
-            }
-         }
-      }
-   }
-
-   public Executor getExeuctor() {
-      if (protonConnectionDelegate != null) {
-         return protonConnectionDelegate.getExecutor();
-      }
-      else {
-         return null;
-      }
-   }
-
-   @Override
-   public void setConnection(AMQPConnectionContext connection) {
-      this.amqpConnection = connection;
-   }
-
-   @Override
-   public AMQPConnectionContext getConnection() {
-      return amqpConnection;
-   }
-
-   public ActiveMQProtonRemotingConnection getProtonConnectionDelegate() {
-      return protonConnectionDelegate;
-   }
-
-   public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) {
-
-      this.protonConnectionDelegate = protonConnectionDelegate;
-   }
-
-   @Override
-   public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) {
-      final int size = byteBuf.writerIndex();
-
-      latch.countUp();
-      connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() {
-         @Override
-         public void operationComplete(ChannelFuture future) throws Exception {
-            latch.countDown();
-         }
-      });
-
-      if (amqpConnection.isSyncOnFlush()) {
-         try {
-            latch.await(5, TimeUnit.SECONDS);
-         }
-         catch (Exception e) {
-            e.printStackTrace();
-         }
-      }
-
-      amqpConnection.outputDone(size);
-   }
-
-   @Override
-   public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
-      return new ProtonSessionIntegrationCallback(this, manager, connection, this.connection, closeExecutor);
-   }
-
-   @Override
-   public void sendSASLSupported() {
-      connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1, 0, 0}));
-   }
-
-   @Override
-   public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) {
-      remoteContainerId = connection.getRemoteContainer();
-      boolean idOK = server.addClientConnection(remoteContainerId, ExtCapability.needUniqueConnection(connection));
-      if (!idOK) {
-         //https://issues.apache.org/jira/browse/ARTEMIS-728
-         Map<Symbol, Object> connProp = new HashMap<>();
-         connProp.put(CONNECTION_OPEN_FAILED, "true");
-         connection.setProperties(connProp);
-         connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
-         Map<Symbol, Symbol> info = new HashMap<>();
-         info.put(INVALID_FIELD, CONTAINER_ID);
-         connection.getCondition().setInfo(info);
-         return false;
-      }
-      registeredConnectionId.set(true);
-      return true;
-   }
-
-   @Override
-   public void connectionClosed() {
-      close();
-   }
-
-   @Override
-   public void connectionFailed(ActiveMQException exception, boolean failedOver) {
-      close();
-   }
-
-   @Override
-   public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
-      close();
-   }
-
-   @Override
-   public Binary newTransaction() {
-      XidImpl xid = newXID();
-      Transaction transaction = new TransactionImpl(xid, server.getStorageManager(), -1);
-      transactions.put(xid, transaction);
-      return new Binary(xid.getGlobalTransactionId());
-   }
-
-   @Override
-   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
-      XidImpl xid = newXID(txid.getArray());
-      Transaction tx = transactions.get(xid);
-
-      if (tx == null) {
-         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
-      }
-
-      return tx;
-   }
-
-   @Override
-   public void removeTransaction(Binary txid) {
-      XidImpl xid = newXID(txid.getArray());
-      transactions.remove(xid);
-   }
-
-
-   protected XidImpl newXID() {
-      return newXID(UUIDGenerator.getInstance().generateStringUUID().getBytes());
-   }
-
-   protected XidImpl newXID(byte[] bytes) {
-      return new XidImpl("amqp".getBytes(), 1, bytes);
-   }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
deleted file mode 100644
index da9dd9c..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.proton.plug;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
-import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
-import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
-import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SelectorTranslator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.AMQPSessionContext;
-import org.proton.plug.SASLResult;
-import org.proton.plug.context.ProtonPlugSender;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
-import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
-import org.proton.plug.sasl.PlainSASLResult;
-
-public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback {
-
-   protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
-
-   private final ActiveMQProtonConnectionCallback protonSPI;
-
-   private final ProtonProtocolManager manager;
-
-   private final AMQPConnectionContext connection;
-
-   private final Connection transportConnection;
-
-   private ServerSession serverSession;
-
-   private AMQPSessionContext protonSession;
-
-   private final Executor closeExecutor;
-
-   private final AtomicBoolean draining = new AtomicBoolean(false);
-
-   public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI,
-                                           ProtonProtocolManager manager,
-                                           AMQPConnectionContext connection,
-                                           Connection transportConnection,
-                                           Executor executor) {
-      this.protonSPI = protonSPI;
-      this.manager = manager;
-      this.connection = connection;
-      this.transportConnection = transportConnection;
-      this.closeExecutor = executor;
-   }
-
-   @Override
-   public boolean isWritable(ReadyListener callback) {
-      return transportConnection.isWritable(callback);
-   }
-
-   @Override
-   public void onFlowConsumer(Object consumer, int credits, final boolean drain) {
-      ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
-      if (drain) {
-         // If the draining is already running, then don't do anything
-         if (draining.compareAndSet(false, true)) {
-            final ProtonPlugSender plugSender = (ProtonPlugSender) serverConsumer.getProtocolContext();
-            serverConsumer.forceDelivery(1, new Runnable() {
-               @Override
-               public void run() {
-                  try {
-                     plugSender.getSender().drained();
-                  }
-                  finally {
-                     draining.set(false);
-                  }
-               }
-            });
-         }
-      }
-      else {
-         serverConsumer.receiveCredits(-1);
-      }
-   }
-
-   @Override
-   public void browserFinished(ServerConsumer consumer) {
-
-   }
-
-   @Override
-   public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
-
-      this.protonSession = protonSession;
-
-      String name = UUIDGenerator.getInstance().generateStringUUID();
-
-      String user = null;
-      String passcode = null;
-      if (saslResult != null) {
-         user = saslResult.getUser();
-         if (saslResult instanceof PlainSASLResult) {
-            passcode = ((PlainSASLResult) saslResult).getPassword();
-         }
-      }
-
-      serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
-                                                        false, // boolean autoCommitSends
-                                                        false, // boolean autoCommitAcks,
-                                                        false, // boolean preAcknowledge,
-                                                        true, //boolean xa,
-                                                        (String) null, this, true);
-   }
-
-   @Override
-   public void afterDelivery() throws Exception {
-
-   }
-
-   @Override
-   public void start() {
-
-   }
-
-   @Override
-   public Object createSender(ProtonPlugSender protonSender,
-                              String queue,
-                              String filter,
-                              boolean browserOnly) throws Exception {
-      long consumerID = consumerIDGenerator.generateID();
-
-      filter = SelectorTranslator.convertToActiveMQFilterString(filter);
-
-      ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly);
-
-      // AMQP handles its own flow control for when it's started
-      consumer.setStarted(true);
-
-      consumer.setProtocolContext(protonSender);
-
-      return consumer;
-   }
-
-   @Override
-   public void startSender(Object brokerConsumer) throws Exception {
-      ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer;
-      // flow control is done at proton
-      serverConsumer.receiveCredits(-1);
-   }
-
-   @Override
-   public void createTemporaryQueue(String queueName) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false);
-   }
-
-   @Override
-   public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false);
-   }
-
-   @Override
-   public void createDurableQueue(String address, String queueName, String filter) throws Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
-   }
-
-   @Override
-   public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception {
-      QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
-
-      if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateJmsQueues() && autoCreate) {
-         try {
-            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
-         }
-         catch (ActiveMQQueueExistsException e) {
-            // The queue may have been created by another thread in the mean time.  Catch and do nothing.
-         }
-         queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateJmsQueues(), true);
-      }
-      return queueQueryResult;
-   }
-
-   @Override
-   public boolean bindingQuery(String address) throws Exception {
-      BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
-      if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateJmsQueues()) {
-         try {
-            serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true);
-         }
-         catch (ActiveMQQueueExistsException e) {
-            // The queue may have been created by another thread in the mean time.  Catch and do nothing.
-         }
-         bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
-      }
-      return bindingQueryResult.isExists();
-   }
-
-   @Override
-   public void closeSender(final Object brokerConsumer) throws Exception {
-
-      final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
-      final CountDownLatch latch = new CountDownLatch(1);
-
-      Runnable runnable = new Runnable() {
-         @Override
-         public void run() {
-            try {
-               consumer.close(false);
-               latch.countDown();
-            }
-            catch (Exception e) {
-            }
-         }
-      };
-
-      // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
-      // to avoid deadlocks the close has to be done outside of the main thread on an executor
-      // otherwise you could get a deadlock
-      Executor executor = protonSPI.getExeuctor();
-
-      if (executor != null) {
-         executor.execute(runnable);
-      }
-      else {
-         runnable.run();
-      }
-
-      try {
-         latch.await(10, TimeUnit.SECONDS);
-      }
-      catch (InterruptedException e) {
-         throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
-      }
-   }
-
-   @Override
-   public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception {
-      return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount);
-   }
-
-   @Override
-   public String tempQueueName() {
-      return UUIDGenerator.getInstance().generateStringUUID();
-   }
-
-   @Override
-   public void close() throws Exception {
-      //need to check here as this can be called if init fails
-      if (serverSession != null) {
-         recoverContext();
-         try {
-            serverSession.close(false);
-         }
-         finally {
-            resetContext();
-         }
-      }
-   }
-
-   @Override
-   public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception {
-      if (transaction == null) {
-         transaction = serverSession.getCurrentTransaction();
-      }
-      recoverContext();
-      try {
-         ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID());
-      }
-      finally {
-         resetContext();
-      }
-   }
-
-   @Override
-   public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception {
-      recoverContext();
-      try {
-         ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts);
-      }
-      finally {
-         resetContext();
-      }
-   }
-
-   @Override
-   public void resumeDelivery(Object consumer) {
-      ((ServerConsumer) consumer).receiveCredits(-1);
-   }
-
-   @Override
-   public void serverSend(final Transaction transaction,
-                          final Receiver receiver,
-                          final Delivery delivery,
-                          String address,
-                          int messageFormat,
-                          ByteBuf messageEncoded) throws Exception {
-      EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex());
-
-      ServerMessage message = manager.getConverter().inbound(encodedMessage);
-      //use the address on the receiver if not null, if null let's hope it was set correctly on the message
-      if (address != null) {
-         message.setAddress(new SimpleString(address));
-      }
-
-      recoverContext();
-
-      PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
-      if (store.isRejectingMessages()) {
-         // We drop pre-settled messages (and abort any associated Tx)
-         if (delivery.remotelySettled()) {
-            if (transaction != null) {
-               String amqpAddress = delivery.getLink().getTarget().getAddress();
-               ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
-               transaction.markAsRollbackOnly(e);
-            }
-         }
-         else {
-            rejectMessage(delivery);
-         }
-      }
-      else {
-         serverSend(transaction, message, delivery, receiver);
-      }
-   }
-
-   private void rejectMessage(Delivery delivery) {
-      String address = delivery.getLink().getTarget().getAddress();
-      ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
-      Rejected rejected = new Rejected();
-      rejected.setError(ec);
-      delivery.disposition(rejected);
-      connection.flush();
-   }
-
-   private void serverSend(final Transaction transaction, final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
-      try {
-
-         message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
-         serverSession.send(transaction, message, false, false);
-
-         // FIXME Potential race here...
-         manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
-            @Override
-            public void done() {
-               synchronized (connection.getLock()) {
-                  delivery.disposition(Accepted.getInstance());
-                  delivery.settle();
-                  connection.flush();
-               }
-            }
-
-            @Override
-            public void onError(int errorCode, String errorMessage) {
-               synchronized (connection.getLock()) {
-                  receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
-                  connection.flush();
-               }
-            }
-         });
-      }
-      finally {
-         resetContext();
-      }
-   }
-
-   @Override
-   public String getPubSubPrefix() {
-      return manager.getPubSubPrefix();
-   }
-
-   @Override
-   public void offerProducerCredit(final String address, final int credits, final int threshold, final Receiver receiver) {
-      try {
-         final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
-         store.checkMemory(new Runnable() {
-            @Override
-            public void run() {
-               if (receiver.getRemoteCredit() < threshold) {
-                  receiver.flow(credits);
-                  connection.flush();
-               }
-            }
-         });
-      }
-      catch (Exception e) {
-         throw new RuntimeException(e);
-      }
-   }
-
-   @Override
-   public void deleteQueue(String queueName) throws Exception {
-      manager.getServer().destroyQueue(new SimpleString(queueName));
-   }
-
-   private void resetContext() {
-      manager.getServer().getStorageManager().setContext(null);
-   }
-
-   private void recoverContext() {
-      manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
-   }
-
-   @Override
-   public void sendProducerCreditsMessage(int credits, SimpleString address) {
-   }
-
-   @Override
-   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
-      return false;
-   }
-
-   @Override
-   public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
-   }
-
-   @Override
-   public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
-
-      message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
-
-      ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext();
-
-      try {
-         return plugSender.deliverMessage(message, deliveryCount);
-      }
-      catch (Exception e) {
-         synchronized (connection.getLock()) {
-            plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
-            connection.flush();
-         }
-         throw new IllegalStateException("Can't deliver message " + e, e);
-      }
-
-   }
-
-   @Override
-   public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
-      return 0;
-   }
-
-   @Override
-   public int sendLargeMessageContinuation(ServerConsumer consumer,
-                                           byte[] body,
-                                           boolean continues,
-                                           boolean requiresResponse) {
-      return 0;
-   }
-
-   @Override
-   public void closed() {
-   }
-
-   @Override
-   public void disconnect(ServerConsumer consumer, String queueName) {
-      synchronized (connection.getLock()) {
-         ((Link) consumer.getProtocolContext()).close();
-         connection.flush();
-      }
-   }
-
-   @Override
-   public boolean hasCredits(ServerConsumer consumer) {
-      ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext();
-
-      if (plugSender != null && plugSender.getSender().getCredit() > 0) {
-         return true;
-      }
-      else {
-         return false;
-      }
-   }
-
-   @Override
-   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
-      return protonSPI.getTransaction(txid);
-   }
-
-   @Override
-   public Binary newTransaction() {
-      return protonSPI.newTransaction();
-   }
-
-
-   @Override
-   public void commitTX(Binary txid) throws Exception {
-      Transaction tx = protonSPI.getTransaction(txid);
-      tx.commit(true);
-      protonSPI.removeTransaction(txid);
-   }
-
-   @Override
-   public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
-      Transaction tx = protonSPI.getTransaction(txid);
-      tx.rollback();
-      protonSPI.removeTransaction(txid);
-
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java
deleted file mode 100644
index bf4f043..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/sasl/ActiveMQPlainSASL.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.proton.sasl;
-
-import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.proton.plug.sasl.ServerSASLPlain;
-
-public class ActiveMQPlainSASL extends ServerSASLPlain {
-
-   private final SecurityStore securityStore;
-
-   public ActiveMQPlainSASL(SecurityStore securityStore) {
-      this.securityStore = securityStore;
-   }
-
-   @Override
-   protected boolean authenticate(String user, String password) {
-      if (securityStore.isSecurityEnabled()) {
-         try {
-            securityStore.authenticate(user, password, null);
-            return true;
-         }
-         catch (Exception e) {
-            return false;
-         }
-      }
-      else {
-         return true;
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
new file mode 100644
index 0000000..fd79547
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
+import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.jboss.logging.Logger;
+import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
+import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+
+public class AMQPConnectionCallback implements FailureListener, CloseListener {
+   private static final Logger logger = Logger.getLogger(AMQPConnectionCallback.class);
+
+   private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>();
+
+   private final ProtonProtocolManager manager;
+
+   private final Connection connection;
+
+   protected ActiveMQProtonRemotingConnection protonConnectionDelegate;
+
+   protected AMQPConnectionContext amqpConnection;
+
+   private final ReusableLatch latch = new ReusableLatch(0);
+
+   private final Executor closeExecutor;
+
+   private String remoteContainerId;
+
+   private AtomicBoolean registeredConnectionId = new AtomicBoolean(false);
+
+   private ActiveMQServer server;
+
+   public AMQPConnectionCallback(ProtonProtocolManager manager,
+                                 Connection connection,
+                                 Executor closeExecutor,
+                                 ActiveMQServer server) {
+      this.manager = manager;
+      this.connection = connection;
+      this.closeExecutor = closeExecutor;
+      this.server = server;
+   }
+
+   public ServerSASL[] getSASLMechnisms() {
+
+      ServerSASL[] result;
+
+      if (isSupportsAnonymous()) {
+         result = new ServerSASL[]{new PlainSASL(manager.getServer().getSecurityStore()), new AnonymousServerSASL()};
+      }
+      else {
+         result = new ServerSASL[]{new PlainSASL(manager.getServer().getSecurityStore())};
+      }
+
+      return result;
+   }
+
+   public boolean isSupportsAnonymous() {
+      boolean supportsAnonymous = false;
+      try {
+         manager.getServer().getSecurityStore().authenticate(null, null, null);
+         supportsAnonymous = true;
+      }
+      catch (Exception e) {
+         // authentication failed so no anonymous support
+      }
+      return supportsAnonymous;
+   }
+
+   public void close() {
+      try {
+         if (registeredConnectionId.getAndSet(false)) {
+            server.removeClientConnection(remoteContainerId);
+         }
+         connection.close();
+         amqpConnection.close();
+      }
+      finally {
+         for (Transaction tx : transactions.values()) {
+            try {
+               tx.rollback();
+            }
+            catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+      }
+   }
+
+   public Executor getExeuctor() {
+      if (protonConnectionDelegate != null) {
+         return protonConnectionDelegate.getExecutor();
+      }
+      else {
+         return null;
+      }
+   }
+
+   public void setConnection(AMQPConnectionContext connection) {
+      this.amqpConnection = connection;
+   }
+
+   public AMQPConnectionContext getConnection() {
+      return amqpConnection;
+   }
+
+   public ActiveMQProtonRemotingConnection getProtonConnectionDelegate() {
+      return protonConnectionDelegate;
+   }
+
+   public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection protonConnectionDelegate) {
+
+      this.protonConnectionDelegate = protonConnectionDelegate;
+   }
+
+   public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) {
+      final int size = byteBuf.writerIndex();
+
+      latch.countUp();
+      connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() {
+         @Override
+         public void operationComplete(ChannelFuture future) throws Exception {
+            latch.countDown();
+         }
+      });
+
+      if (amqpConnection.isSyncOnFlush()) {
+         try {
+            latch.await(5, TimeUnit.SECONDS);
+         }
+         catch (Exception e) {
+            e.printStackTrace();
+         }
+      }
+
+      amqpConnection.outputDone(size);
+   }
+
+   public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
+      return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor);
+   }
+
+   public void sendSASLSupported() {
+      connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1, 0, 0}));
+   }
+
+   public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) {
+      remoteContainerId = connection.getRemoteContainer();
+      boolean idOK = server.addClientConnection(remoteContainerId, ExtCapability.needUniqueConnection(connection));
+      if (!idOK) {
+         //https://issues.apache.org/jira/browse/ARTEMIS-728
+         Map<Symbol, Object> connProp = new HashMap<>();
+         connProp.put(AmqpSupport.CONNECTION_OPEN_FAILED, "true");
+         connection.setProperties(connProp);
+         connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
+         Map<Symbol, Symbol> info = new HashMap<>();
+         info.put(AmqpSupport.INVALID_FIELD, AmqpSupport.CONTAINER_ID);
+         connection.getCondition().setInfo(info);
+         return false;
+      }
+      registeredConnectionId.set(true);
+      return true;
+   }
+
+   @Override
+   public void connectionClosed() {
+      close();
+   }
+
+   @Override
+   public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+      close();
+   }
+
+   @Override
+   public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+      close();
+   }
+
+   public Binary newTransaction() {
+      XidImpl xid = newXID();
+      Transaction transaction = new TransactionImpl(xid, server.getStorageManager(), -1);
+      transactions.put(xid, transaction);
+      return new Binary(xid.getGlobalTransactionId());
+   }
+
+   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+      XidImpl xid = newXID(txid.getArray());
+      Transaction tx = transactions.get(xid);
+
+      if (tx == null) {
+         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
+      }
+
+      return tx;
+   }
+
+   public void removeTransaction(Binary txid) {
+      XidImpl xid = newXID(txid.getArray());
+      transactions.remove(xid);
+   }
+
+
+   protected XidImpl newXID() {
+      return newXID(UUIDGenerator.getInstance().generateStringUUID().getBytes());
+   }
+
+   protected XidImpl newXID(byte[] bytes) {
+      return new XidImpl("amqp".getBytes(), 1, bytes);
+   }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
new file mode 100644
index 0000000..9bdf4e1
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.SelectorTranslator;
+import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
+import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
+
+public class AMQPSessionCallback implements SessionCallback {
+
+   protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
+
+   private final AMQPConnectionCallback protonSPI;
+
+   private final ProtonProtocolManager manager;
+
+   private final AMQPConnectionContext connection;
+
+   private final Connection transportConnection;
+
+   private ServerSession serverSession;
+
+   private AMQPSessionContext protonSession;
+
+   private final Executor closeExecutor;
+
+   private final AtomicBoolean draining = new AtomicBoolean(false);
+
+   public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
+                              ProtonProtocolManager manager,
+                              AMQPConnectionContext connection,
+                              Connection transportConnection,
+                              Executor executor) {
+      this.protonSPI = protonSPI;
+      this.manager = manager;
+      this.connection = connection;
+      this.transportConnection = transportConnection;
+      this.closeExecutor = executor;
+   }
+
+   @Override
+   public boolean isWritable(ReadyListener callback) {
+      return transportConnection.isWritable(callback);
+   }
+
+   public void onFlowConsumer(Object consumer, int credits, final boolean drain) {
+      ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
+      if (drain) {
+         // If the draining is already running, then don't do anything
+         if (draining.compareAndSet(false, true)) {
+            final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
+            serverConsumer.forceDelivery(1, new Runnable() {
+               @Override
+               public void run() {
+                  try {
+                     plugSender.getSender().drained();
+                  }
+                  finally {
+                     draining.set(false);
+                  }
+               }
+            });
+         }
+      }
+      else {
+         serverConsumer.receiveCredits(-1);
+      }
+   }
+
+   @Override
+   public void browserFinished(ServerConsumer consumer) {
+
+   }
+
+   public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
+
+      this.protonSession = protonSession;
+
+      String name = UUIDGenerator.getInstance().generateStringUUID();
+
+      String user = null;
+      String passcode = null;
+      if (saslResult != null) {
+         user = saslResult.getUser();
+         if (saslResult instanceof PlainSASLResult) {
+            passcode = ((PlainSASLResult) saslResult).getPassword();
+         }
+      }
+
+      serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
+                                                        false, // boolean autoCommitSends
+                                                        false, // boolean autoCommitAcks,
+                                                        false, // boolean preAcknowledge,
+                                                        true, //boolean xa,
+                                                        (String) null, this, true);
+   }
+
+   @Override
+   public void afterDelivery() throws Exception {
+
+   }
+
+   public void start() {
+
+   }
+
+   public Object createSender(ProtonServerSenderContext protonSender,
+                              String queue,
+                              String filter,
+                              boolean browserOnly) throws Exception {
+      long consumerID = consumerIDGenerator.generateID();
+
+      filter = SelectorTranslator.convertToActiveMQFilterString(filter);
+
+      ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly);
+
+      // AMQP handles its own flow control for when it's started
+      consumer.setStarted(true);
+
+      consumer.setProtocolContext(protonSender);
+
+      return consumer;
+   }
+
+   public void startSender(Object brokerConsumer) throws Exception {
+      ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer;
+      // flow control is done at proton
+      serverConsumer.receiveCredits(-1);
+   }
+
+   public void createTemporaryQueue(String queueName) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false);
+   }
+
+   public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false);
+   }
+
+   public void createDurableQueue(String address, String queueName, String filter) throws Exception {
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
+   }
+
+   public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception {
+      QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+
+      if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateJmsQueues() && autoCreate) {
+         try {
+            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
+         }
+         catch (ActiveMQQueueExistsException e) {
+            // The queue may have been created by another thread in the mean time.  Catch and do nothing.
+         }
+         queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateJmsQueues(), true);
+      }
+      return queueQueryResult;
+   }
+
+   public boolean bindingQuery(String address) throws Exception {
+      BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
+      if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateJmsQueues()) {
+         try {
+            serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true);
+         }
+         catch (ActiveMQQueueExistsException e) {
+            // The queue may have been created by another thread in the mean time.  Catch and do nothing.
+         }
+         bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
+      }
+      return bindingQueryResult.isExists();
+   }
+
+   public void closeSender(final Object brokerConsumer) throws Exception {
+
+      final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      Runnable runnable = new Runnable() {
+         @Override
+         public void run() {
+            try {
+               consumer.close(false);
+               latch.countDown();
+            }
+            catch (Exception e) {
+            }
+         }
+      };
+
+      // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
+      // to avoid deadlocks the close has to be done outside of the main thread on an executor
+      // otherwise you could get a deadlock
+      Executor executor = protonSPI.getExeuctor();
+
+      if (executor != null) {
+         executor.execute(runnable);
+      }
+      else {
+         runnable.run();
+      }
+
+      try {
+         latch.await(10, TimeUnit.SECONDS);
+      }
+      catch (InterruptedException e) {
+         throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
+      }
+   }
+
+   public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception {
+      return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount);
+   }
+
+   public String tempQueueName() {
+      return UUIDGenerator.getInstance().generateStringUUID();
+   }
+
+   public void close() throws Exception {
+      //need to check here as this can be called if init fails
+      if (serverSession != null) {
+         recoverContext();
+         try {
+            serverSession.close(false);
+         }
+         finally {
+            resetContext();
+         }
+      }
+   }
+
+   public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception {
+      if (transaction == null) {
+         transaction = serverSession.getCurrentTransaction();
+      }
+      recoverContext();
+      try {
+         ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID());
+      }
+      finally {
+         resetContext();
+      }
+   }
+
+   public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception {
+      recoverContext();
+      try {
+         ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts);
+      }
+      finally {
+         resetContext();
+      }
+   }
+
+   public void resumeDelivery(Object consumer) {
+      ((ServerConsumer) consumer).receiveCredits(-1);
+   }
+
+   public void serverSend(final Transaction transaction,
+                          final Receiver receiver,
+                          final Delivery delivery,
+                          String address,
+                          int messageFormat,
+                          ByteBuf messageEncoded) throws Exception {
+      EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex());
+
+      ServerMessage message = manager.getConverter().inbound(encodedMessage);
+      //use the address on the receiver if not null, if null let's hope it was set correctly on the message
+      if (address != null) {
+         message.setAddress(new SimpleString(address));
+      }
+
+      recoverContext();
+
+      PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
+      if (store.isRejectingMessages()) {
+         // We drop pre-settled messages (and abort any associated Tx)
+         if (delivery.remotelySettled()) {
+            if (transaction != null) {
+               String amqpAddress = delivery.getLink().getTarget().getAddress();
+               ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
+               transaction.markAsRollbackOnly(e);
+            }
+         }
+         else {
+            rejectMessage(delivery);
+         }
+      }
+      else {
+         serverSend(transaction, message, delivery, receiver);
+      }
+   }
+
+   private void rejectMessage(Delivery delivery) {
+      String address = delivery.getLink().getTarget().getAddress();
+      ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
+      Rejected rejected = new Rejected();
+      rejected.setError(ec);
+      delivery.disposition(rejected);
+      connection.flush();
+   }
+
+   private void serverSend(final Transaction transaction, final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
+      try {
+
+         message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
+         serverSession.send(transaction, message, false, false);
+
+         // FIXME Potential race here...
+         manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
+            @Override
+            public void done() {
+               synchronized (connection.getLock()) {
+                  delivery.disposition(Accepted.getInstance());
+                  delivery.settle();
+                  connection.flush();
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+               synchronized (connection.getLock()) {
+                  receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
+                  connection.flush();
+               }
+            }
+         });
+      }
+      finally {
+         resetContext();
+      }
+   }
+
+   public String getPubSubPrefix() {
+      return manager.getPubSubPrefix();
+   }
+
+   public void offerProducerCredit(final String address, final int credits, final int threshold, final Receiver receiver) {
+      try {
+         final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
+         store.checkMemory(new Runnable() {
+            @Override
+            public void run() {
+               if (receiver.getRemoteCredit() < threshold) {
+                  receiver.flow(credits);
+                  connection.flush();
+               }
+            }
+         });
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   public void deleteQueue(String queueName) throws Exception {
+      manager.getServer().destroyQueue(new SimpleString(queueName));
+   }
+
+   private void resetContext() {
+      manager.getServer().getStorageManager().setContext(null);
+   }
+
+   private void recoverContext() {
+      manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
+   }
+
+   @Override
+   public void sendProducerCreditsMessage(int credits, SimpleString address) {
+   }
+
+   @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+      return false;
+   }
+
+   @Override
+   public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
+   }
+
+   @Override
+   public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+
+      message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
+
+      ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
+
+      try {
+         return plugSender.deliverMessage(message, deliveryCount);
+      }
+      catch (Exception e) {
+         synchronized (connection.getLock()) {
+            plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
+            connection.flush();
+         }
+         throw new IllegalStateException("Can't deliver message " + e, e);
+      }
+
+   }
+
+   @Override
+   public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+      return 0;
+   }
+
+   @Override
+   public int sendLargeMessageContinuation(ServerConsumer consumer,
+                                           byte[] body,
+                                           boolean continues,
+                                           boolean requiresResponse) {
+      return 0;
+   }
+
+   @Override
+   public void closed() {
+   }
+
+   @Override
+   public void disconnect(ServerConsumer consumer, String queueName) {
+      synchronized (connection.getLock()) {
+         ((Link) consumer.getProtocolContext()).close();
+         connection.flush();
+      }
+   }
+
+   @Override
+   public boolean hasCredits(ServerConsumer consumer) {
+      ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
+
+      if (plugSender != null && plugSender.getSender().getCredit() > 0) {
+         return true;
+      }
+      else {
+         return false;
+      }
+   }
+
+   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+      return protonSPI.getTransaction(txid);
+   }
+
+   public Binary newTransaction() {
+      return protonSPI.newTransaction();
+   }
+
+
+   public void commitTX(Binary txid) throws Exception {
+      Transaction tx = protonSPI.getTransaction(txid);
+      tx.commit(true);
+      protonSPI.removeTransaction(txid);
+   }
+
+   public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
+      Transaction tx = protonSPI.getTransaction(txid);
+      tx.rollback();
+      protonSPI.removeTransaction(txid);
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
new file mode 100644
index 0000000..8fd3169
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+
+/**
+ * This is a Server's Connection representation used by ActiveMQ Artemis.
+ */
+public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection {
+
+   private final AMQPConnectionContext amqpConnection;
+
+   private boolean destroyed = false;
+
+   private final ProtonProtocolManager manager;
+
+   public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager,
+                                           AMQPConnectionContext amqpConnection,
+                                           Connection transportConnection,
+                                           Executor executor) {
+      super(transportConnection, executor);
+      this.manager = manager;
+      this.amqpConnection = amqpConnection;
+   }
+
+   public Executor getExecutor() {
+      return this.executor;
+   }
+
+   public ProtonProtocolManager getManager() {
+      return manager;
+   }
+
+   /*
+    * This can be called concurrently by more than one thread so needs to be locked
+    */
+   @Override
+   public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
+      if (destroyed) {
+         return;
+      }
+
+      destroyed = true;
+
+      ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+
+      // Then call the listeners
+      callFailureListeners(me, scaleDownTargetNodeID);
+
+      callClosingListeners();
+
+      internalClose();
+   }
+
+   @Override
+   public void destroy() {
+      synchronized (this) {
+         if (destroyed) {
+            return;
+         }
+
+         destroyed = true;
+      }
+
+      callClosingListeners();
+
+      internalClose();
+
+   }
+
+   @Override
+   public boolean isClient() {
+      return false;
+   }
+
+   @Override
+   public boolean isDestroyed() {
+      return destroyed;
+   }
+
+   @Override
+   public void disconnect(boolean criticalError) {
+      getTransportConnection().close();
+   }
+
+   /**
+    * Disconnect the connection, closing all channels
+    */
+   @Override
+   public void disconnect(String scaleDownNodeID, boolean criticalError) {
+      getTransportConnection().close();
+   }
+
+   @Override
+   public boolean checkDataReceived() {
+      return amqpConnection.checkDataReceived();
+   }
+
+   @Override
+   public void flush() {
+      amqpConnection.flush();
+   }
+
+   @Override
+   public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+      amqpConnection.inputBuffer(buffer.byteBuf());
+      super.bufferReceived(connectionID, buffer);
+   }
+
+   private void internalClose() {
+      // We close the underlying transport connection
+      getTransportConnection().close();
+   }
+
+   @Override
+   public void killMessage(SimpleString nodeID) {
+      //unsupported
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
new file mode 100644
index 0000000..fe7b976
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
+import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.management.Notification;
+import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+
+/**
+ * A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources
+ */
+public class ProtonProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
+
+   private static final List<String> websocketRegistryNames = Arrays.asList("amqp");
+
+   private final ActiveMQServer server;
+
+   private MessageConverter protonConverter;
+
+   private final ProtonProtocolManagerFactory factory;
+
+   /*
+   * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
+   * the address. This can be changed on the acceptor.
+   * */
+   private String pubSubPrefix = ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX;
+
+   private int maxFrameSize = AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
+
+   public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
+      this.factory = factory;
+      this.server = server;
+      this.protonConverter = new ProtonMessageConverter(server.getStorageManager());
+   }
+
+   public ActiveMQServer getServer() {
+      return server;
+   }
+
+   @Override
+   public MessageConverter getConverter() {
+      return protonConverter;
+   }
+
+   @Override
+   public void onNotification(Notification notification) {
+
+   }
+
+   @Override
+   public ProtocolManagerFactory<Interceptor> getFactory() {
+      return factory;
+   }
+
+   @Override
+   public void updateInterceptors(List<BaseInterceptor> incomingInterceptors,
+                                  List<BaseInterceptor> outgoingInterceptors) {
+      // no op
+   }
+
+   @Override
+   public boolean acceptsNoHandshake() {
+      return false;
+   }
+
+   @Override
+   public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
+      AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server);
+      long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
+
+      if (server.getConfiguration().getConnectionTTLOverride() != -1) {
+         ttl = server.getConfiguration().getConnectionTTLOverride();
+      }
+
+      String id = server.getConfiguration().getName();
+      AMQPConnectionContext amqpConnection =
+         new AMQPConnectionContext(connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
+
+      Executor executor = server.getExecutorFactory().getExecutor();
+
+      ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(this, amqpConnection, remotingConnection, executor);
+
+      connectionCallback.setProtonConnectionDelegate(delegate);
+
+      ConnectionEntry entry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl);
+
+      return entry;
+   }
+
+   @Override
+   public void removeHandler(String name) {
+
+   }
+
+   @Override
+   public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) {
+      ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection) connection;
+
+      protonConnection.bufferReceived(protonConnection.getID(), buffer);
+   }
+
+   @Override
+   public void addChannelHandlers(ChannelPipeline pipeline) {
+
+   }
+
+   @Override
+   public boolean isProtocol(byte[] array) {
+      return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P';
+   }
+
+   @Override
+   public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
+   }
+
+   @Override
+   public List<String> websocketSubprotocolIdentifiers() {
+      return websocketRegistryNames;
+   }
+
+   public String getPubSubPrefix() {
+      return pubSubPrefix;
+   }
+
+   public void setPubSubPrefix(String pubSubPrefix) {
+      this.pubSubPrefix = pubSubPrefix;
+   }
+
+
+   public int getMaxFrameSize() {
+      return maxFrameSize;
+   }
+
+   public void setMaxFrameSize(int maxFrameSize) {
+      this.maxFrameSize = maxFrameSize;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
new file mode 100644
index 0000000..7255ca0
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+import org.apache.activemq.artemis.utils.uri.BeanSupport;
+import org.osgi.service.component.annotations.Component;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Component(service = ProtocolManagerFactory.class)
+public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
+
+   private static final String AMQP_PROTOCOL_NAME = "AMQP";
+
+   private static final String MODULE_NAME = "artemis-amqp-protocol";
+
+   private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
+
+   @Override
+   public ProtocolManager createProtocolManager(ActiveMQServer server,
+                                                final Map<String, Object> parameters,
+                                                List<BaseInterceptor> incomingInterceptors,
+                                                List<BaseInterceptor> outgoingInterceptors) throws Exception {
+      return BeanSupport.setData(new ProtonProtocolManager(this, server), parameters);
+   }
+
+   @Override
+   public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
+      // no interceptors on Proton
+      return Collections.emptyList();
+   }
+
+   @Override
+   public String[] getProtocols() {
+      return SUPPORTED_PROTOCOLS;
+   }
+
+   @Override
+   public String getModuleName() {
+      return MODULE_NAME;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/package-info.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/package-info.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/package-info.java
new file mode 100644
index 0000000..c8a3c6a
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package includes classes used to interact with the broker.
+ * The ProtocolManager will be here, and some classes that will interact with the PostOffice
+ * and other internal components directly.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
\ No newline at end of file