You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/27 13:54:36 UTC

[09/15] activemq-artemis git commit: ARTEMIS-751 Simplification of the AMQP implementation

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
new file mode 100644
index 0000000..848f0d2
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+import java.util.AbstractMap;
+import java.util.Map;
+
+/**
+ * Set of useful methods and definitions used in the AMQP protocol handling
+ */
+public class AmqpSupport {
+
+   // Identification values used to locating JMS selector types.
+   public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
+   public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
+   public static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[]{JMS_SELECTOR_CODE, JMS_SELECTOR_NAME};
+   public static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L);
+   public static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:no-local-filter:list");
+   public static final Object[] NO_LOCAL_FILTER_IDS = new Object[]{NO_LOCAL_CODE, NO_LOCAL_NAME};
+
+   // Capabilities used to identify destination type in some requests.
+   public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
+   public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
+
+   // Symbols used to announce connection information to remote peer.
+   public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
+   public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id");
+
+   // Symbols used to announce connection information to remote peer.
+   public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+   public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
+   public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+   public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
+   public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
+   public static final Symbol PRODUCT = Symbol.valueOf("product");
+   public static final Symbol VERSION = Symbol.valueOf("version");
+   public static final Symbol PLATFORM = Symbol.valueOf("platform");
+
+   // Symbols used in configuration of newly opened links.
+   public static final Symbol COPY = Symbol.getSymbol("copy");
+
+   // Lifetime policy symbols
+   public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+
+   public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
+   /**
+    * Search for a given Symbol in a given array of Symbol object.
+    *
+    * @param symbols
+    *        the set of Symbols to search.
+    * @param key
+    *        the value to try and find in the Symbol array.
+    *
+    * @return true if the key is found in the given Symbol array.
+    */
+   public static boolean contains(Symbol[] symbols, Symbol key) {
+      if (symbols == null || symbols.length == 0) {
+         return false;
+      }
+
+      for (Symbol symbol : symbols) {
+         if (symbol.equals(key)) {
+            return true;
+         }
+      }
+
+      return false;
+   }
+
+   /**
+    * Search for a particular filter using a set of known indentification values
+    * in the Map of filters.
+    *
+    * @param filters
+    *        The filters map that should be searched.
+    * @param filterIds
+    *        The aliases for the target filter to be located.
+    *
+    * @return the filter if found in the mapping or null if not found.
+    */
+   public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
+
+      if (filterIds == null || filterIds.length == 0) {
+         StringBuilder ids = new StringBuilder();
+         if (filterIds != null) {
+            for (Object filterId : filterIds) {
+               ids.append(filterId).append(" ");
+            }
+         }
+         throw new IllegalArgumentException("Invalid Filter Ids array passed: " + ids);
+      }
+
+      if (filters == null || filters.isEmpty()) {
+         return null;
+      }
+
+      for (Map.Entry<Symbol, Object> filter : filters.entrySet()) {
+         if (filter.getValue() instanceof DescribedType) {
+            DescribedType describedType = ((DescribedType) filter.getValue());
+            Object descriptor = describedType.getDescriptor();
+
+            for (Object filterId : filterIds) {
+               if (descriptor.equals(filterId)) {
+                  return new AbstractMap.SimpleImmutableEntry<>(filter.getKey(), describedType);
+               }
+            }
+         }
+      }
+
+      return null;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonDeliveryHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonDeliveryHandler.java
new file mode 100644
index 0000000..43f1913
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonDeliveryHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * An interface to handle deliveries, either messages, acks or transaction calls
+ */
+public interface ProtonDeliveryHandler {
+
+   void onFlow(int currentCredits, boolean drain);
+
+   void onMessage(Delivery delivery) throws ActiveMQAMQPException;
+
+   /*
+   * we have to distinguish between a remote close on the link and a close via a connection or session as the latter mean
+   * that a link reattach can happen and we need to keep the underlying resource (queue/subscription) around for pub subs
+   * */
+   void close(boolean remoteLinkClose) throws ActiveMQAMQPException;
+
+   void close(ErrorCondition condition) throws ActiveMQAMQPException;
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonInitializable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonInitializable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonInitializable.java
new file mode 100644
index 0000000..3870810
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonInitializable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+public class ProtonInitializable {
+
+   private boolean initialized = false;
+
+   public boolean isInitialized() {
+      return initialized;
+   }
+
+   public void initialise() throws Exception {
+      if (!initialized) {
+         initialized = true;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
new file mode 100644
index 0000000..4b97831
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
+import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.jboss.logging.Logger;
+
+public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler {
+
+   private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
+
+   protected final AMQPConnectionContext connection;
+
+   protected final AMQPSessionContext protonSession;
+
+   protected final Receiver receiver;
+
+   protected String address;
+
+   protected final AMQPSessionCallback sessionSPI;
+
+
+   /*
+    The maximum number of credits we will allocate to clients.
+    This number is also used by the broker when refresh client credits.
+     */
+   private static int maxCreditAllocation = 100;
+
+   // Used by the broker to decide when to refresh clients credit.  This is not used when client requests credit.
+   private static int minCreditRefresh = 30;
+
+   public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
+                                      AMQPConnectionContext connection,
+                                      AMQPSessionContext protonSession,
+                                      Receiver receiver) {
+      this.connection = connection;
+      this.protonSession = protonSession;
+      this.receiver = receiver;
+      this.sessionSPI = sessionSPI;
+   }
+
+   @Override
+   public void onFlow(int credits, boolean drain) {
+      flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation);
+   }
+
+   @Override
+   public void initialise() throws Exception {
+      super.initialise();
+      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
+
+      if (target != null) {
+         if (target.getDynamic()) {
+            //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
+            // will be deleted on closing of the session
+            address = sessionSPI.tempQueueName();
+
+            try {
+               sessionSPI.createTemporaryQueue(address);
+            }
+            catch (Exception e) {
+               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+            }
+            target.setAddress(address);
+         }
+         else {
+            //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
+            //be a queue bound to it so we nee to check this.
+            address = target.getAddress();
+            if (address == null) {
+               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet();
+            }
+
+            try {
+               if (!sessionSPI.bindingQuery(address)) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
+               }
+            }
+            catch (ActiveMQAMQPNotFoundException e) {
+               throw e;
+            }
+            catch (Exception e) {
+               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+            }
+         }
+      }
+      flow(maxCreditAllocation, minCreditRefresh);
+   }
+
+   /*
+   * called when Proton receives a message to be delivered via a Delivery.
+   *
+   * This may be called more than once per deliver so we have to cache the buffer until we have received it all.
+   *
+   * */
+   @Override
+   public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
+      Receiver receiver;
+      try {
+         receiver = ((Receiver) delivery.getLink());
+
+         if (!delivery.isReadable()) {
+            return;
+         }
+
+         if (delivery.isPartial()) {
+            return;
+         }
+
+         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024);
+         try {
+            synchronized (connection.getLock()) {
+               DeliveryUtil.readDelivery(receiver, buffer);
+
+               receiver.advance();
+
+               Transaction tx = null;
+               if (delivery.getRemoteState() instanceof TransactionalState) {
+
+                  TransactionalState txState = (TransactionalState) delivery.getRemoteState();
+                  tx = this.sessionSPI.getTransaction(txState.getTxnId());
+               }
+               sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), buffer);
+
+               flow(maxCreditAllocation, minCreditRefresh);
+            }
+         }
+         finally {
+            buffer.release();
+         }
+      }
+      catch (Exception e) {
+         log.warn(e.getMessage(), e);
+         Rejected rejected = new Rejected();
+         ErrorCondition condition = new ErrorCondition();
+         condition.setCondition(Symbol.valueOf("failed"));
+         condition.setDescription(e.getMessage());
+         rejected.setError(condition);
+         delivery.disposition(rejected);
+      }
+   }
+
+   @Override
+   public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
+      protonSession.removeReceiver(receiver);
+   }
+
+   @Override
+   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
+      receiver.setCondition(condition);
+      close(false);
+   }
+
+   public void flow(int credits, int threshold) {
+      // Use the SessionSPI to allocate producer credits, or default, always allocate credit.
+      if (sessionSPI != null) {
+         sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
+      }
+      else {
+         synchronized (connection.getLock()) {
+            receiver.flow(credits);
+            connection.flush();
+         }
+      }
+
+   }
+
+   public void drain(int credits) {
+      synchronized (connection.getLock()) {
+         receiver.drain(credits);
+      }
+      connection.flush();
+   }
+
+   public int drained() {
+      return receiver.drained();
+   }
+
+   public boolean isDraining() {
+      return receiver.draining();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
new file mode 100644
index 0000000..0a071fd
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import java.util.Map;
+import java.util.Objects;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.selector.filter.FilterException;
+import org.apache.activemq.artemis.selector.impl.SelectorParser;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.jboss.logging.Logger;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+
+public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
+
+   private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
+
+   private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
+   private static final Symbol COPY = Symbol.valueOf("copy");
+   private static final Symbol TOPIC = Symbol.valueOf("topic");
+
+   private Object brokerConsumer;
+
+   protected final AMQPSessionContext protonSession;
+   protected final Sender sender;
+   protected final AMQPConnectionContext connection;
+   protected boolean closed = false;
+   protected final AMQPSessionCallback sessionSPI;
+   protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
+
+
+   public ProtonServerSenderContext(AMQPConnectionContext connection,
+                                    Sender sender,
+                                    AMQPSessionContext protonSession,
+                                    AMQPSessionCallback server) {
+      super();
+      this.connection = connection;
+      this.sender = sender;
+      this.protonSession = protonSession;
+      this.sessionSPI = server;
+   }
+
+   public Object getBrokerConsumer() {
+      return brokerConsumer;
+   }
+
+   @Override
+   public void onFlow(int currentCredits, boolean drain) {
+      this.creditsSemaphore.setCredits(currentCredits);
+      sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
+   }
+
+   public Sender getSender() {
+      return sender;
+   }
+
+   /*
+* start the session
+* */
+   public void start() throws ActiveMQAMQPException {
+      sessionSPI.start();
+      // protonSession.getServerSession().start();
+
+      //todo add flow control
+      try {
+         // to do whatever you need to make the broker start sending messages to the consumer
+         //this could be null if a link reattach has happened
+         if (brokerConsumer != null) {
+            sessionSPI.startSender(brokerConsumer);
+         }
+         //protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
+      }
+      catch (Exception e) {
+         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
+      }
+   }
+
+   /**
+    * create the actual underlying ActiveMQ Artemis Server Consumer
+    */
+   @Override
+   public void initialise() throws Exception {
+      super.initialise();
+
+      Source source = (Source) sender.getRemoteSource();
+
+      String queue;
+
+      String selector = null;
+
+      /*
+      * even tho the filter is a map it will only return a single filter unless a nolocal is also provided
+      * */
+      if (source != null) {
+         Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
+         if (filter != null) {
+            selector = filter.getValue().getDescribed().toString();
+            // Validate the Selector.
+            try {
+               SelectorParser.parse(selector);
+            }
+            catch (FilterException e) {
+               close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
+               return;
+            }
+         }
+      }
+
+      /*
+      * if we have a capability for a topic (qpid-jms) or we are configured on this address to act like a topic then act
+      * like a subscription.
+      * */
+      boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
+
+      if (isPubSub) {
+         if (AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
+            String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
+            String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
+            if (selector != null) {
+               selector += " AND " + noLocalFilter;
+            }
+            else {
+               selector = noLocalFilter;
+            }
+         }
+      }
+
+      if (source == null) {
+         // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
+         String clientId = connection.getRemoteContainer();
+         String pubId = sender.getName();
+         queue = clientId + ":" + pubId;
+         boolean exists = sessionSPI.queueQuery(queue, false).isExists();
+
+         /*
+         * If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a
+         * link remote close.
+         * */
+         if (exists) {
+            source = new org.apache.qpid.proton.amqp.messaging.Source();
+            source.setAddress(queue);
+            source.setDurable(TerminusDurability.UNSETTLED_STATE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+            source.setDistributionMode(COPY);
+            source.setCapabilities(TOPIC);
+            sender.setSource(source);
+         }
+         else {
+            throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
+         }
+      }
+      else {
+         if (source.getDynamic()) {
+            //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
+            // will be deleted on closing of the session
+            queue = java.util.UUID.randomUUID().toString();
+            try {
+               sessionSPI.createTemporaryQueue(queue);
+               //protonSession.getServerSession().createQueue(queue, queue, null, true, false);
+            }
+            catch (Exception e) {
+               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+            }
+            source.setAddress(queue);
+         }
+         else {
+            //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
+            //be a queue bound to it so we nee to check this.
+            if (isPubSub) {
+               // if we are a subscription and durable create a durable queue using the container id and link name
+               if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
+                                TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
+                  String clientId = connection.getRemoteContainer();
+                  String pubId = sender.getName();
+                  queue = clientId + ":" + pubId;
+                  QueueQueryResult result = sessionSPI.queueQuery(queue, false);
+
+                  if (result.isExists()) {
+                     // If a client reattaches to a durable subscription with a different no-local filter value, selector
+                     // or address then we must recreate the queue (JMS semantics).
+
+                     if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) ||
+                        (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
+                        if (result.getConsumerCount() == 0) {
+                           sessionSPI.deleteQueue(queue);
+                           sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                        }
+                        else {
+                           throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
+                        }
+                     }
+                  }
+                  else {
+                     sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                  }
+                  source.setAddress(queue);
+               }
+               //otherwise we are a volatile subscription
+               else {
+                  queue = java.util.UUID.randomUUID().toString();
+                  try {
+                     sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
+                  }
+                  catch (Exception e) {
+                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+                  }
+                  source.setAddress(queue);
+               }
+            }
+            else {
+               queue = source.getAddress();
+            }
+            if (queue == null) {
+               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
+            }
+
+            try {
+               if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+               }
+            }
+            catch (ActiveMQAMQPNotFoundException e) {
+               throw e;
+            }
+            catch (Exception e) {
+               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+            }
+         }
+
+         boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
+         try {
+            brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly);
+         }
+         catch (Exception e) {
+            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
+         }
+      }
+   }
+
+   private boolean isPubSub(Source source) {
+      String pubSubPrefix = sessionSPI.getPubSubPrefix();
+      return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
+   }
+
+   /*
+   * close the session
+   * */
+   @Override
+   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
+      closed = true;
+      protonSession.removeSender(sender);
+      synchronized (connection.getLock()) {
+         sender.close();
+      }
+      connection.flush();
+
+      try {
+         sessionSPI.closeSender(brokerConsumer);
+      }
+      catch (Exception e) {
+         log.warn(e.getMessage(), e);
+         throw new ActiveMQAMQPInternalErrorException(e.getMessage());
+      }
+   }
+
+   /*
+   * close the session
+   * */
+   @Override
+   public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
+      try {
+         sessionSPI.closeSender(brokerConsumer);
+         //if this is a link close rather than a connection close or detach, we need to delete any durable resources for
+         // say pub subs
+         if (remoteLinkClose) {
+            Source source = (Source) sender.getSource();
+            if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) {
+               String queueName = source.getAddress();
+               QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
+               if (result.isExists() && source.getDynamic()) {
+                  sessionSPI.deleteQueue(queueName);
+               }
+               else {
+                  String clientId = connection.getRemoteContainer();
+                  String pubId = sender.getName();
+                  String queue = clientId + ":" + pubId;
+                  result = sessionSPI.queueQuery(queue, false);
+                  if (result.isExists()) {
+                     if (result.getConsumerCount() > 0) {
+                        System.out.println("error");
+                     }
+                     sessionSPI.deleteQueue(queue);
+                  }
+               }
+            }
+         }
+      }
+      catch (Exception e) {
+         log.warn(e.getMessage(), e);
+         throw new ActiveMQAMQPInternalErrorException(e.getMessage());
+      }
+   }
+
+   @Override
+   public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
+      Object message = delivery.getContext();
+
+      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+
+      DeliveryState remoteState = delivery.getRemoteState();
+
+      if (remoteState != null) {
+         // If we are transactional then we need ack if the msg has been accepted
+         if (remoteState instanceof TransactionalState) {
+
+            TransactionalState txState = (TransactionalState) remoteState;
+            Transaction tx = this.sessionSPI.getTransaction(txState.getTxnId());
+            if (txState.getOutcome() != null) {
+               Outcome outcome = txState.getOutcome();
+               if (outcome instanceof Accepted) {
+                  if (!delivery.remotelySettled()) {
+                     TransactionalState txAccepted = new TransactionalState();
+                     txAccepted.setOutcome(Accepted.getInstance());
+                     txAccepted.setTxnId(txState.getTxnId());
+
+                     delivery.disposition(txAccepted);
+                  }
+                  //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
+                  // from dealer, a perf hit but a must
+                  try {
+                     sessionSPI.ack(tx, brokerConsumer, message);
+                  }
+                  catch (Exception e) {
+                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+                  }
+               }
+            }
+         }
+         else if (remoteState instanceof Accepted) {
+            //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
+            // from dealer, a perf hit but a must
+            try {
+               sessionSPI.ack(null, brokerConsumer, message);
+            }
+            catch (Exception e) {
+               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+            }
+         }
+         else if (remoteState instanceof Released) {
+            try {
+               sessionSPI.cancel(brokerConsumer, message, false);
+            }
+            catch (Exception e) {
+               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
+            }
+         }
+         else if (remoteState instanceof Rejected || remoteState instanceof Modified) {
+            try {
+               sessionSPI.cancel(brokerConsumer, message, true);
+            }
+            catch (Exception e) {
+               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
+            }
+         }
+         //todo add tag caching
+         if (!preSettle) {
+            protonSession.replaceTag(delivery.getTag());
+         }
+
+         synchronized (connection.getLock()) {
+            delivery.settle();
+            sender.offer(1);
+         }
+
+      }
+      else {
+         //todo not sure if we need to do anything here
+      }
+   }
+
+   public synchronized void checkState() {
+      sessionSPI.resumeDelivery(brokerConsumer);
+   }
+
+   /**
+    * handle an out going message from ActiveMQ Artemis, send via the Proton Sender
+    */
+   public int deliverMessage(Object message, int deliveryCount) throws Exception {
+      if (closed) {
+         System.err.println("Message can't be delivered as it's closed");
+         return 0;
+      }
+
+      //encode the message
+      ProtonJMessage serverMessage;
+      try {
+         // This can be done a lot better here
+         serverMessage = sessionSPI.encodeMessage(message, deliveryCount);
+      }
+      catch (Throwable e) {
+         log.warn(e.getMessage(), e);
+         throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+      }
+
+      return performSend(serverMessage, message);
+   }
+
+   private static boolean hasCapabilities(Symbol symbol, Source source) {
+      if (source != null) {
+         if (source.getCapabilities() != null) {
+            for (Symbol cap : source.getCapabilities()) {
+               if (symbol.equals(cap)) {
+                  return true;
+               }
+            }
+         }
+      }
+      return false;
+   }
+   protected int performSend(ProtonJMessage serverMessage, Object context) {
+      if (!creditsSemaphore.tryAcquire()) {
+         try {
+            creditsSemaphore.acquire();
+         }
+         catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            // nothing to be done here.. we just keep going
+            throw new IllegalStateException(e.getMessage(), e);
+         }
+      }
+
+      //presettle means we can ack the message on the dealer side before we send it, i.e. for browsers
+      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+
+      //we only need a tag if we are going to ack later
+      byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
+
+      ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+      try {
+         serverMessage.encode(new NettyWritable(nettyBuffer));
+
+         int size = nettyBuffer.writerIndex();
+
+         synchronized (connection.getLock()) {
+            final Delivery delivery;
+            delivery = sender.delivery(tag, 0, tag.length);
+            delivery.setContext(context);
+
+            // this will avoid a copy.. patch provided by Norman using buffer.array()
+            sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
+
+            if (preSettle) {
+               delivery.settle();
+            }
+            else {
+               sender.advance();
+            }
+         }
+
+         connection.flush();
+
+         return size;
+      }
+      finally {
+         nettyBuffer.release();
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
new file mode 100644
index 0000000..6d4e73a
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.jboss.logging.Logger;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+
+/**
+ * handles an amqp Coordinator to deal with transaction boundaries etc
+ */
+public class ProtonTransactionHandler implements ProtonDeliveryHandler {
+
+   private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
+
+   final AMQPSessionCallback sessionSPI;
+
+   public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
+      this.sessionSPI = sessionSPI;
+   }
+
+   @Override
+   public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
+      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      final Receiver receiver;
+      try {
+         receiver = ((Receiver) delivery.getLink());
+
+         if (!delivery.isReadable()) {
+            return;
+         }
+
+         DeliveryUtil.readDelivery(receiver, buffer);
+
+         receiver.advance();
+
+         MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer);
+
+         Object action = ((AmqpValue) msg.getBody()).getValue();
+
+         if (action instanceof Declare) {
+            Binary txID = sessionSPI.newTransaction();
+            Declared declared = new Declared();
+            declared.setTxnId(txID);
+            delivery.disposition(declared);
+            delivery.settle();
+         }
+         else if (action instanceof Discharge) {
+            Discharge discharge = (Discharge) action;
+
+            Binary txID = discharge.getTxnId();
+            if (discharge.getFail()) {
+               try {
+                  sessionSPI.rollbackTX(txID, true);
+                  delivery.disposition(new Accepted());
+               }
+               catch (Exception e) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
+               }
+            }
+            else {
+               try {
+                  sessionSPI.commitTX(txID);
+                  delivery.disposition(new Accepted());
+               }
+               catch (ActiveMQAMQPException amqpE) {
+                  throw amqpE;
+               }
+               catch (Exception e) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
+               }
+            }
+         }
+      }
+      catch (ActiveMQAMQPException amqpE) {
+         delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
+      }
+      catch (Exception e) {
+         log.warn(e.getMessage(), e);
+         delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
+      }
+      finally {
+         delivery.settle();
+         buffer.release();
+      }
+   }
+
+   private Rejected createRejected(Symbol amqpError, String message) {
+      Rejected rejected = new Rejected();
+      ErrorCondition condition = new ErrorCondition();
+      condition.setCondition(amqpError);
+      condition.setDescription(message);
+      rejected.setError(condition);
+      return rejected;
+   }
+
+   @Override
+   public void onFlow(int credits, boolean drain) {
+   }
+
+   @Override
+   public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
+      // no op
+   }
+
+   @Override
+   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
+      // no op
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
new file mode 100644
index 0000000..91c9a67
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
+
+/**
+ * EventHandler
+ */
+public interface EventHandler {
+
+   void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl);
+
+   void onInit(Connection connection) throws Exception;
+
+   void onLocalOpen(Connection connection) throws Exception;
+
+   void onRemoteOpen(Connection connection) throws Exception;
+
+   void onLocalClose(Connection connection) throws Exception;
+
+   void onRemoteClose(Connection connection) throws Exception;
+
+   void onFinal(Connection connection) throws Exception;
+
+   void onInit(Session session) throws Exception;
+
+   void onLocalOpen(Session session) throws Exception;
+
+   void onRemoteOpen(Session session) throws Exception;
+
+   void onLocalClose(Session session) throws Exception;
+
+   void onRemoteClose(Session session) throws Exception;
+
+   void onFinal(Session session) throws Exception;
+
+   void onInit(Link link) throws Exception;
+
+   void onLocalOpen(Link link) throws Exception;
+
+   void onRemoteOpen(Link link) throws Exception;
+
+   void onLocalClose(Link link) throws Exception;
+
+   void onRemoteClose(Link link) throws Exception;
+
+   void onFlow(Link link) throws Exception;
+
+   void onFinal(Link link) throws Exception;
+
+   void onRemoteDetach(Link link) throws Exception;
+
+   void onDetach(Link link) throws Exception;
+
+   void onDelivery(Delivery delivery) throws Exception;
+
+   void onTransport(Transport transport) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
new file mode 100644
index 0000000..6552f64
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Transport;
+
+public final class Events {
+
+   public static void dispatchTransport(Transport transport, EventHandler handler) throws Exception {
+      handler.onTransport(transport);
+   }
+
+   public static void dispatch(Event event, EventHandler handler) throws Exception {
+      switch (event.getType()) {
+         case CONNECTION_INIT:
+            handler.onInit(event.getConnection());
+            break;
+         case CONNECTION_LOCAL_OPEN:
+            handler.onLocalOpen(event.getConnection());
+            break;
+         case CONNECTION_REMOTE_OPEN:
+            handler.onRemoteOpen(event.getConnection());
+            break;
+         case CONNECTION_LOCAL_CLOSE:
+            handler.onLocalClose(event.getConnection());
+            break;
+         case CONNECTION_REMOTE_CLOSE:
+            handler.onRemoteClose(event.getConnection());
+            break;
+         case CONNECTION_FINAL:
+            handler.onFinal(event.getConnection());
+            break;
+         case SESSION_INIT:
+            handler.onInit(event.getSession());
+            break;
+         case SESSION_LOCAL_OPEN:
+            handler.onLocalOpen(event.getSession());
+            break;
+         case SESSION_REMOTE_OPEN:
+            handler.onRemoteOpen(event.getSession());
+            break;
+         case SESSION_LOCAL_CLOSE:
+            handler.onLocalClose(event.getSession());
+            break;
+         case SESSION_REMOTE_CLOSE:
+            handler.onRemoteClose(event.getSession());
+            break;
+         case SESSION_FINAL:
+            handler.onFinal(event.getSession());
+            break;
+         case LINK_INIT:
+            handler.onInit(event.getLink());
+            break;
+         case LINK_LOCAL_OPEN:
+            handler.onLocalOpen(event.getLink());
+            break;
+         case LINK_REMOTE_OPEN:
+            handler.onRemoteOpen(event.getLink());
+            break;
+         case LINK_LOCAL_CLOSE:
+            handler.onLocalClose(event.getLink());
+            break;
+         case LINK_REMOTE_CLOSE:
+            handler.onRemoteClose(event.getLink());
+            break;
+         case LINK_FLOW:
+            handler.onFlow(event.getLink());
+            break;
+         case LINK_FINAL:
+            handler.onFinal(event.getLink());
+            break;
+         case LINK_LOCAL_DETACH:
+            handler.onDetach(event.getLink());
+            break;
+         case LINK_REMOTE_DETACH:
+            handler.onRemoteDetach(event.getLink());
+            break;
+         case TRANSPORT:
+            handler.onTransport(event.getTransport());
+            break;
+         case DELIVERY:
+            handler.onDelivery(event.getDelivery());
+            break;
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
new file mode 100644
index 0000000..b2a6230
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Connection;
+
+public class ExtCapability {
+
+   public static final Symbol[] capabilities = new Symbol[] {
+      AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY
+   };
+
+   public static Symbol[] getCapabilities() {
+      return capabilities;
+   }
+
+   public static boolean needUniqueConnection(Connection connection) {
+      Symbol[] extCapabilities = connection.getRemoteDesiredCapabilities();
+      if (extCapabilities != null) {
+         for (Symbol sym : extCapabilities) {
+            if (sym.compareTo(AmqpSupport.SOLE_CONNECTION_CAPABILITY) == 0) {
+               return true;
+            }
+         }
+      }
+      return false;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
new file mode 100644
index 0000000..2efaa1b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
+import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+import org.jboss.logging.Logger;
+
+public class ProtonHandler extends ProtonInitializable {
+
+   private static final Logger log = Logger.getLogger(ProtonHandler.class);
+
+   private static final byte SASL = 0x03;
+
+   private static final byte BARE = 0x00;
+
+   private final Transport transport = Proton.transport();
+
+   private final Connection connection = Proton.connection();
+
+   private final Collector collector = Proton.collector();
+
+   private final Executor dispatchExecutor;
+
+   private final Runnable dispatchRunnable = new Runnable() {
+      @Override
+      public void run() {
+         dispatch();
+      }
+   };
+
+   private ArrayList<EventHandler> handlers = new ArrayList<>();
+
+   private Sasl serverSasl;
+
+   private Sasl clientSasl;
+
+   private final Object lock = new Object();
+
+   private final long creationTime;
+
+   private Map<String, ServerSASL> saslHandlers;
+
+   private SASLResult saslResult;
+
+   protected volatile boolean dataReceived;
+
+   protected boolean receivedFirstPacket = false;
+
+   private int offset = 0;
+
+   public ProtonHandler(Executor dispatchExecutor) {
+      this.dispatchExecutor = dispatchExecutor;
+      this.creationTime = System.currentTimeMillis();
+      transport.bind(connection);
+      connection.collect(collector);
+   }
+
+   public long tick(boolean firstTick) {
+      if (!firstTick) {
+         try {
+            if (connection.getLocalState() != EndpointState.CLOSED) {
+               long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+               if (transport.isClosed()) {
+                  throw new IllegalStateException("Channel was inactive for to long");
+               }
+               return rescheduleAt;
+            }
+         }
+         catch (Exception e) {
+            transport.close();
+            connection.setCondition(new ErrorCondition());
+         }
+         return 0;
+      }
+      return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+   }
+
+   public int capacity() {
+      synchronized (lock) {
+         return transport.capacity();
+      }
+   }
+
+   public Object getLock() {
+      return lock;
+   }
+
+   public Transport getTransport() {
+      return transport;
+   }
+
+   public Connection getConnection() {
+      return connection;
+   }
+
+   public ProtonHandler addEventHandler(EventHandler handler) {
+      handlers.add(handler);
+      return this;
+   }
+
+   public void createServerSASL(ServerSASL[] handlers) {
+      this.serverSasl = transport.sasl();
+      saslHandlers = new HashMap<>();
+      String[] names = new String[handlers.length];
+      int count = 0;
+      for (ServerSASL handler : handlers) {
+         saslHandlers.put(handler.getName(), handler);
+         names[count++] = handler.getName();
+      }
+      this.serverSasl.server();
+      serverSasl.setMechanisms(names);
+
+   }
+
+   public SASLResult getSASLResult() {
+      return saslResult;
+   }
+
+   public void inputBuffer(ByteBuf buffer) {
+      dataReceived = true;
+      synchronized (lock) {
+         while (buffer.readableBytes() > 0) {
+            int capacity = transport.capacity();
+
+            if (!receivedFirstPacket) {
+               try {
+                  byte auth = buffer.getByte(4);
+                  if (auth == SASL || auth == BARE) {
+                     dispatchAuth(auth == SASL);
+                     /*
+                     * there is a chance that if SASL Handshake has been carried out that the capacity may change.
+                     * */
+                     capacity = transport.capacity();
+                  }
+               }
+               catch (Throwable e) {
+                  log.debug(e.getMessage(), e);
+               }
+
+               receivedFirstPacket = true;
+            }
+
+            if (capacity > 0) {
+               ByteBuffer tail = transport.tail();
+               int min = Math.min(capacity, buffer.readableBytes());
+               tail.limit(min);
+               buffer.readBytes(tail);
+
+               flush();
+            }
+            else {
+               if (capacity == 0) {
+                  log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
+               }
+               else {
+                  log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity());
+               }
+               break;
+            }
+         }
+      }
+   }
+
+   public boolean checkDataReceived() {
+      boolean res = dataReceived;
+
+      dataReceived = false;
+
+      return res;
+   }
+
+   public long getCreationTime() {
+      return creationTime;
+   }
+
+   public void outputDone(int bytes) {
+      synchronized (lock) {
+         transport.pop(bytes);
+         offset -= bytes;
+
+         if (offset < 0) {
+            throw new IllegalStateException("You called outputDone for more bytes than you actually received. numberOfBytes=" + bytes +
+                                               ", outcome result=" + offset);
+         }
+      }
+
+      flush();
+   }
+
+   public ByteBuf outputBuffer() {
+
+      synchronized (lock) {
+         int pending = transport.pending();
+
+         if (pending < 0) {
+            return null;//throw new IllegalStateException("xxx need to close the connection");
+         }
+
+         int size = pending - offset;
+
+         if (size < 0) {
+            throw new IllegalStateException("negative size: " + pending);
+         }
+
+         if (size == 0) {
+            return null;
+         }
+
+         // For returning PooledBytes
+         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(size);
+         ByteBuffer head = transport.head();
+         head.position(offset);
+         head.limit(offset + size);
+         buffer.writeBytes(head);
+         offset += size; // incrementing offset for future calls
+         return buffer;
+      }
+   }
+
+   public void flush() {
+      synchronized (lock) {
+         transport.process();
+
+         checkServerSASL();
+
+      }
+
+      dispatchExecutor.execute(dispatchRunnable);
+   }
+
+   public void close() {
+      synchronized (lock) {
+         connection.close();
+      }
+      flush();
+   }
+
+   protected void checkServerSASL() {
+      if (serverSasl != null && serverSasl.getRemoteMechanisms().length > 0) {
+         // TODO: should we look at the first only?
+         ServerSASL mechanism = saslHandlers.get(serverSasl.getRemoteMechanisms()[0]);
+         if (mechanism != null) {
+
+            byte[] dataSASL = new byte[serverSasl.pending()];
+            serverSasl.recv(dataSASL, 0, dataSASL.length);
+
+            if (log.isTraceEnabled()) {
+               log.trace("Working on sasl::" + ByteUtil.bytesToHex(dataSASL, 2));
+            }
+
+            saslResult = mechanism.processSASL(dataSASL);
+
+            if (saslResult != null && saslResult.isSuccess()) {
+               serverSasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+               serverSasl = null;
+               saslHandlers.clear();
+               saslHandlers = null;
+            }
+            else {
+               serverSasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
+            }
+            serverSasl = null;
+         }
+         else {
+            // no auth available, system error
+            serverSasl.done(Sasl.SaslOutcome.PN_SASL_SYS);
+         }
+      }
+   }
+
+   private Event popEvent() {
+      synchronized (lock) {
+         Event ev = collector.peek();
+         if (ev != null) {
+            // pop will invalidate the event
+            // for that reason we make a new one
+            // Events are reused inside the collector, so we need to make a new one here
+            ev = ev.copy();
+            collector.pop();
+         }
+         return ev;
+      }
+   }
+
+   private void dispatchAuth(boolean sasl) {
+      for (EventHandler h : handlers) {
+         h.onAuthInit(this, getConnection(), sasl);
+      }
+   }
+
+   private void dispatch() {
+      Event ev;
+      // We don't hold a lock on the entire event processing
+      // because we could have a distributed deadlock
+      // while processing events (for instance onTransport)
+      // while a client is also trying to write here
+      while ((ev = popEvent()) != null) {
+         for ( EventHandler h : handlers) {
+            if (log.isTraceEnabled()) {
+               log.trace("Handling " + ev + " towards " + h);
+            }
+            try {
+               Events.dispatch(ev, h);
+            }
+            catch (Exception e) {
+               log.warn(e.getMessage(), e);
+               connection.setCondition(new ErrorCondition());
+            }
+         }
+      }
+
+      for (EventHandler h : handlers) {
+         try {
+            h.onTransport(transport);
+         }
+         catch (Exception e) {
+            log.warn(e.getMessage(), e);
+            connection.setCondition(new ErrorCondition());
+         }
+      }
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/package-info.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/package-info.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/package-info.java
new file mode 100644
index 0000000..8476f5b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package includes classes used on the interaction with Proton, including Context classes that will be translated
+ * through the model event.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/AnonymousServerSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/AnonymousServerSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/AnonymousServerSASL.java
new file mode 100644
index 0000000..013b73b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/AnonymousServerSASL.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public class AnonymousServerSASL implements ServerSASL {
+
+   public AnonymousServerSASL() {
+   }
+
+   @Override
+   public String getName() {
+      return "ANONYMOUS";
+   }
+
+   @Override
+   public SASLResult processSASL(byte[] bytes) {
+      return new PlainSASLResult(true, null, null);
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java
new file mode 100644
index 0000000..cb82eba
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+import org.apache.activemq.artemis.core.security.SecurityStore;
+
+public class PlainSASL extends ServerSASLPlain {
+
+   private final SecurityStore securityStore;
+
+   public PlainSASL(SecurityStore securityStore) {
+      this.securityStore = securityStore;
+   }
+
+   @Override
+   protected boolean authenticate(String user, String password) {
+      if (securityStore.isSecurityEnabled()) {
+         try {
+            securityStore.authenticate(user, password, null);
+            return true;
+         }
+         catch (Exception e) {
+            return false;
+         }
+      }
+      else {
+         return true;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASLResult.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASLResult.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASLResult.java
new file mode 100644
index 0000000..f138ae3
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASLResult.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public class PlainSASLResult implements SASLResult {
+
+   private boolean success;
+   private String user;
+   private String password;
+
+   public PlainSASLResult(boolean success, String user, String password) {
+      this.success = success;
+      this.user = user;
+      this.password = password;
+   }
+
+   @Override
+   public String getUser() {
+      return user;
+   }
+
+   public String getPassword() {
+      return password;
+   }
+
+   @Override
+   public boolean isSuccess() {
+      return success;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/SASLResult.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/SASLResult.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/SASLResult.java
new file mode 100644
index 0000000..f8c4297
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/SASLResult.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public interface SASLResult {
+
+   String getUser();
+
+   boolean isSuccess();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASL.java
new file mode 100644
index 0000000..43d57d0
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASL.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public interface ServerSASL {
+
+   String getName();
+
+   SASLResult processSASL(byte[] bytes);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASLPlain.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASLPlain.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASLPlain.java
new file mode 100644
index 0000000..da26d2e
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/ServerSASLPlain.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.sasl;
+
+public class ServerSASLPlain implements ServerSASL {
+
+   public static final String NAME = "PLAIN";
+
+   @Override
+   public String getName() {
+      return NAME;
+   }
+
+   @Override
+   public SASLResult processSASL(byte[] data) {
+
+      String username = null;
+      String password = null;
+      String bytes = new String(data);
+      String[] credentials = bytes.split(Character.toString((char) 0));
+      int offSet = 0;
+      if (credentials.length > 0) {
+         if (credentials[0].length() == 0) {
+            offSet = 1;
+         }
+
+         if (credentials.length >= offSet) {
+            username = credentials[offSet];
+         }
+         if (credentials.length >= (offSet + 1)) {
+            password = credentials[offSet + 1];
+         }
+      }
+
+      boolean success = authenticate(username, password);
+
+      return new PlainSASLResult(success, username, password);
+   }
+
+   /**
+    * Hook for subclasses to perform the authentication here
+    *
+    * @param user
+    * @param password
+    */
+   protected boolean authenticate(String user, String password) {
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CodecCache.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CodecCache.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CodecCache.java
new file mode 100644
index 0000000..53d0bc1
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CodecCache.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+
+public class CodecCache {
+
+   private static class EncoderDecoderPair {
+
+      DecoderImpl decoder = new DecoderImpl();
+      EncoderImpl encoder = new EncoderImpl(decoder);
+
+      {
+         AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+      }
+   }
+
+   private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() {
+      @Override
+      protected EncoderDecoderPair initialValue() {
+         return new EncoderDecoderPair();
+      }
+   };
+
+   public static DecoderImpl getDecoder() {
+      return tlsCodec.get().decoder;
+   }
+
+   public static EncoderImpl getEncoder() {
+      return tlsCodec.get().encoder;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java
new file mode 100644
index 0000000..3eda199
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+public class CreditsSemaphore {
+
+   @SuppressWarnings("serial")
+   private static class Sync extends AbstractQueuedSynchronizer {
+
+      private Sync(int initial) {
+         setState(initial);
+      }
+
+      public int getCredits() {
+         return getState();
+      }
+
+      @Override
+      public int tryAcquireShared(final int numberOfAqcquires) {
+         for (;;) {
+            int actualSize = getState();
+            int newValue = actualSize - numberOfAqcquires;
+
+            if (newValue < 0) {
+               if (actualSize == getState()) {
+                  return -1;
+               }
+            }
+            else if (compareAndSetState(actualSize, newValue)) {
+               return newValue;
+            }
+         }
+      }
+
+      @Override
+      public boolean tryReleaseShared(final int numberOfReleases) {
+         for (;;) {
+            int actualSize = getState();
+            int newValue = actualSize + numberOfReleases;
+
+            if (compareAndSetState(actualSize, newValue)) {
+               return true;
+            }
+
+         }
+      }
+
+      public void setCredits(final int credits) {
+         for (;;) {
+            int actualState = getState();
+            if (compareAndSetState(actualState, credits)) {
+               // This is to wake up any pending threads that could be waiting on queued
+               releaseShared(0);
+               return;
+            }
+         }
+      }
+   }
+
+   private final Sync sync;
+
+   public CreditsSemaphore(int initialCredits) {
+      sync = new Sync(initialCredits);
+   }
+
+   public void acquire() throws InterruptedException {
+      sync.acquireSharedInterruptibly(1);
+   }
+
+   public boolean tryAcquire() {
+      return sync.tryAcquireShared(1) >= 0;
+   }
+
+   public void release() throws InterruptedException {
+      sync.releaseShared(1);
+   }
+
+   public void release(int credits) throws InterruptedException {
+      sync.releaseShared(credits);
+   }
+
+   public void setCredits(int credits) {
+      sync.setCredits(credits);
+   }
+
+   public int getCredits() {
+      return sync.getCredits();
+   }
+
+   public boolean hasQueuedThreads() {
+      return sync.hasQueuedThreads();
+   }
+
+}
\ No newline at end of file