You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/09 20:43:11 UTC

[48/58] [abbrv] activemq-artemis git commit: Refactoring between Connection and protocol manager

Refactoring between Connection and protocol manager


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/168b98cb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/168b98cb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/168b98cb

Branch: refs/heads/refactor-openwire
Commit: 168b98cbd11d8fcc1367b683f038f3888256f727
Parents: 946b440
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Feb 24 22:30:28 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 9 14:41:41 2016 -0500

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 331 ++++++++++++++-----
 .../openwire/OpenWireProtocolManager.java       | 325 +++---------------
 .../core/protocol/openwire/amq/AMQConsumer.java |  20 +-
 .../openwire/amq/AMQProducerBrokerExchange.java |  96 ------
 .../openwire/amq/AMQServerConsumer.java         |  12 +
 .../core/protocol/openwire/amq/AMQSession.java  |  21 +-
 .../artemis/core/server/ServerConsumer.java     |   6 +
 .../server/SlowConsumerDetectionListener.java   |  22 ++
 .../artemis/core/server/impl/QueueImpl.java     |   2 +
 .../core/server/impl/ServerConsumerImpl.java    |  18 +
 10 files changed, 370 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/168b98cb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 991f24b..6f2e3be 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -23,37 +23,45 @@ import javax.jms.ResourceAllocationException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
@@ -102,33 +110,32 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    private final OpenWireProtocolManager protocolManager;
 
-   private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
-
    private boolean destroyed = false;
 
    private final Object sendLock = new Object();
 
-   private final Acceptor acceptorUsed;
-
    private final OpenWireFormat wireFormat;
 
    private AMQConnectionContext context;
 
-   private Throwable stopError = null;
-
    private final AtomicBoolean stopping = new AtomicBoolean(false);
 
-   private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
-
-   protected final List<Command> dispatchQueue = new LinkedList<>();
-
    private boolean inServiceException;
 
    private final AtomicBoolean asyncException = new AtomicBoolean(false);
 
+   // Clebert: Artemis session has meta-data support, perhaps we could reuse it here
+   private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
+
+
    private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<>();
    private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<>();
 
+   // Clebert TODO: Artemis already stores the Session. Why do we need a different one here
+   private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
+
+
+
    private ConnectionState state;
 
    private final Set<ActiveMQDestination> tempQueues = new ConcurrentHashSet<>();
@@ -139,14 +146,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    private String defaultSocketURIString;
 
-   public OpenWireConnection(Acceptor acceptorUsed,
-                             Connection connection,
+   public OpenWireConnection(Connection connection,
                              Executor executor,
                              OpenWireProtocolManager openWireProtocolManager,
                              OpenWireFormat wf) {
       super(connection, executor);
       this.protocolManager = openWireProtocolManager;
-      this.acceptorUsed = acceptorUsed;
       this.wireFormat = wf;
       this.defaultSocketURIString = connection.getLocalAddress();
    }
@@ -322,8 +327,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
    }
 
-   // throw a WireFormatInfo to the peer
-   public void init() {
+   // send a WireFormatInfo to the peer
+   public void sendHandshake() {
       WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
       sendCommand(info);
    }
@@ -590,7 +595,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
       }
       try {
-         protocolManager.removeConnection(this, this.getConnectionInfo(), me);
+         protocolManager.removeConnection(this.getConnectionInfo(), me);
       }
       catch (InvalidClientIDException e) {
          ActiveMQServerLogger.LOGGER.warn("Couldn't close connection because invalid clientID", e);
@@ -681,40 +686,185 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       context.incRefCount();
    }
 
-   /** This will answer with commands to the client */
+   /**
+    * This will answer with commands to the client
+    */
    public boolean sendCommand(final Command command) {
       if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
          ActiveMQServerLogger.LOGGER.trace("sending " + command);
       }
-      synchronized (this) {
-         if (isDestroyed()) {
-            return false;
+
+      if (isDestroyed()) {
+         return false;
+      }
+
+      try {
+         physicalSend(command);
+      }
+      catch (Exception e) {
+         return false;
+      }
+      catch (Throwable t) {
+         return false;
+      }
+      return true;
+   }
+
+   public void addDestination(DestinationInfo info) throws Exception {
+      ActiveMQDestination dest = info.getDestination();
+      if (dest.isQueue()) {
+         SimpleString qName = OpenWireUtil.toCoreAddress(dest);
+         QueueBinding binding = (QueueBinding) protocolManager.getServer().getPostOffice().getBinding(qName);
+         if (binding == null) {
+            if (getState().getInfo() != null) {
+
+               CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
+               protocolManager.getServer().getSecurityStore().check(qName, checkType, this);
+
+               protocolManager.getServer().checkQueueCreationLimit(getUsername());
+            }
+            ConnectionInfo connInfo = getState().getInfo();
+            protocolManager.getServer().createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
          }
 
-         try {
-            physicalSend(command);
+         if (dest.isTemporary()) {
+            registerTempQueue(dest);
          }
-         catch (Exception e) {
-            return false;
+      }
+
+      if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+         AMQConnectionContext context = getContext();
+         DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
+
+         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
+         protocolManager.fireAdvisory(context, topic, advInfo);
+      }
+   }
+
+
+   public void updateConsumer(ConsumerControl consumerControl) {
+      SessionId sessionId = consumerControl.getConsumerId().getParentId();
+      AMQSession amqSession = sessions.get(sessionId);
+      amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch());
+   }
+
+   public void addConsumer(ConsumerInfo info) throws Exception {
+      // Todo: add a destination interceptors holder here (amq supports this)
+      SessionId sessionId = info.getConsumerId().getParentId();
+      ConnectionId connectionId = sessionId.getParentId();
+      ConnectionState cs = getState();
+      if (cs == null) {
+         throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId);
+      }
+      SessionState ss = cs.getSessionState(sessionId);
+      if (ss == null) {
+         throw new IllegalStateException(protocolManager.getServer() + " Cannot add a consumer to a session that had not been registered: " + sessionId);
+      }
+      // Avoid replaying dup commands
+      if (!ss.getConsumerIds().contains(info.getConsumerId())) {
+
+         AMQSession amqSession = sessions.get(sessionId);
+         if (amqSession == null) {
+            throw new IllegalStateException("Session not exist! : " + sessionId);
+         }
+
+         amqSession.createConsumer(info, amqSession, new SlowConsumerDetection());
+
+         ss.addConsumer(info);
+      }
+   }
+
+   class SlowConsumerDetection implements SlowConsumerDetectionListener {
+
+      @Override
+      public void onSlowConsumer(ServerConsumer consumer) {
+         if (consumer instanceof AMQServerConsumer) {
+            AMQServerConsumer serverConsumer = (AMQServerConsumer)consumer;
+            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getDestination());
+            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+            try {
+               advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, serverConsumer.getAmqConsumer().getId().toString());
+               protocolManager.fireAdvisory(context, topic, advisoryMessage, serverConsumer.getAmqConsumer().getId());
+            }
+            catch (Exception e) {
+               // TODO-NOW: LOGGING
+               e.printStackTrace();
+            }
          }
-         catch (Throwable t) {
-            return false;
+      }
+   }
+
+   public void addSessions(Set<SessionId> sessionSet) {
+      Iterator<SessionId> iter = sessionSet.iterator();
+      while (iter.hasNext()) {
+         SessionId sid = iter.next();
+         addSession(getState().getSessionState(sid).getInfo(), true);
+      }
+   }
+
+   public AMQSession addSession(SessionInfo ss) {
+      return addSession(ss, false);
+   }
+
+   public AMQSession addSession(SessionInfo ss, boolean internal) {
+      AMQSession amqSession = new AMQSession(getState().getInfo(), ss, protocolManager.getServer(), this, protocolManager.getScheduledPool(), protocolManager);
+      amqSession.initialize();
+      amqSession.setInternal(internal);
+      sessions.put(ss.getSessionId(), amqSession);
+      sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
+      return amqSession;
+   }
+
+   public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception {
+      AMQSession session = sessions.remove(info.getSessionId());
+      if (session != null) {
+         session.close();
+      }
+   }
+
+   public AMQSession getSession(SessionId sessionId) {
+      return sessions.get(sessionId);
+   }
+
+   public void removeDestination(ActiveMQDestination dest) throws Exception {
+      if (dest.isQueue()) {
+         SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
+         protocolManager.getServer().destroyQueue(qName);
+      }
+      else {
+         Bindings bindings = protocolManager.getServer().getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
+         Iterator<Binding> iterator = bindings.getBindings().iterator();
+
+         while (iterator.hasNext()) {
+            Queue b = (Queue) iterator.next().getBindable();
+            if (b.getConsumerCount() > 0) {
+               throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
+            }
+            if (b.isDurable()) {
+               throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
+            }
+            b.deleteQueue();
          }
-         return true;
+      }
+
+      if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+         AMQConnectionContext context = getContext();
+         DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
+
+         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
+         protocolManager.fireAdvisory(context, topic, advInfo);
       }
    }
 
    // This will listen for commands throught the protocolmanager
    public class CommandProcessor implements CommandVisitor {
 
-
       public AMQConnectionContext getContext() {
          return OpenWireConnection.this.getContext();
       }
 
       @Override
       public Response processAddConnection(ConnectionInfo info) throws Exception {
-         //let protoclmanager handle connection add/remove
          try {
             protocolManager.addConnection(OpenWireConnection.this, info);
          }
@@ -739,7 +889,36 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processAddProducer(ProducerInfo info) throws Exception {
          Response resp = null;
          try {
-            protocolManager.addProducer(OpenWireConnection.this, info);
+            SessionId sessionId = info.getProducerId().getParentId();
+            ConnectionId connectionId = sessionId.getParentId();
+            ConnectionState cs = getState();
+            if (cs == null) {
+               throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId);
+            }
+            SessionState ss = cs.getSessionState(sessionId);
+            if (ss == null) {
+               throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
+            }
+            // Avoid replaying dup commands
+            if (!ss.getProducerIds().contains(info.getProducerId())) {
+
+               AMQSession amqSession = sessions.get(sessionId);
+               if (amqSession == null) {
+                  throw new IllegalStateException("Session not exist! : " + sessionId);
+               }
+
+               ActiveMQDestination destination = info.getDestination();
+               if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
+                  if (destination.isQueue()) {
+                     OpenWireUtil.validateDestination(destination, amqSession);
+                  }
+                  DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
+                  OpenWireConnection.this.addDestination(destInfo);
+               }
+
+               ss.addProducer(info);
+
+            }
          }
          catch (Exception e) {
             if (e instanceof ActiveMQSecurityException) {
@@ -759,7 +938,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processAddConsumer(ConsumerInfo info) {
          Response resp = null;
          try {
-            protocolManager.addConsumer(OpenWireConnection.this, info);
+            addConsumer(info);
          }
          catch (Exception e) {
             e.printStackTrace();
@@ -776,13 +955,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       @Override
       public Response processRemoveDestination(DestinationInfo info) throws Exception {
          ActiveMQDestination dest = info.getDestination();
-         protocolManager.removeDestination(OpenWireConnection.this, dest);
+         removeDestination(dest);
          return null;
       }
 
       @Override
       public Response processRemoveProducer(ProducerId id) throws Exception {
-         protocolManager.removeProducer(id);
+
+         // TODO-now: proper implement this method
          return null;
       }
 
@@ -807,7 +987,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             }
          }
          state.removeSession(id);
-         protocolManager.removeSession(context, session.getInfo());
+         removeSession(context, session.getInfo());
          return null;
       }
 
@@ -843,7 +1023,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processAddDestination(DestinationInfo dest) throws Exception {
          Response resp = null;
          try {
-            protocolManager.addDestination(OpenWireConnection.this, dest);
+            addDestination(dest);
          }
          catch (Exception e) {
             if (e instanceof ActiveMQSecurityException) {
@@ -860,14 +1040,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processAddSession(SessionInfo info) throws Exception {
          // Avoid replaying dup commands
          if (!state.getSessionIds().contains(info.getSessionId())) {
-            protocolManager.addSession(OpenWireConnection.this, info);
-            try {
-               state.addSession(info);
-            }
-            catch (IllegalStateException e) {
-               e.printStackTrace();
-               protocolManager.removeSession(context, info);
-            }
+            addSession(info);
+            state.addSession(info);
          }
          return null;
       }
@@ -923,7 +1097,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          //amq5 clients send this command to restore prefetchSize
          //after successful reconnect
          try {
-            protocolManager.updateConsumer(OpenWireConnection.this, consumerControl);
+            updateConsumer(consumerControl);
          }
          catch (Exception e) {
             //log error
@@ -976,33 +1150,31 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
             boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
 
-            AMQSession session = protocolManager.getSession(producerId.getParentId());
+            AMQSession session = getSession(producerId.getParentId());
 
-            // TODO: canDispatch is always returning true;
-            if (producerExchange.canDispatch(messageSend)) {
-               SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
-               if (result.isBlockNextSend()) {
-                  if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
-                     // TODO see logging
-                     throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
-                  }
+            SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
+            if (result.isBlockNextSend()) {
+               if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
+                  // TODO see logging
+                  throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
+               }
 
-                  if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
-                     //in that case don't send the response
-                     //this will force the client to wait until
-                     //the response is got.
-                     context.setDontSendReponse(true);
-                  }
-                  else {
-                     //hang the connection until the space is available
-                     session.blockingWaitForSpace(producerExchange, result);
-                  }
+               if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
+                  //in that case don't send the response
+                  //this will force the client to wait until
+                  //the response is got.
+                  context.setDontSendReponse(true);
                }
-               else if (sendProducerAck) {
-                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
-                  OpenWireConnection.this.dispatchAsync(ack);
+               else {
+                  //hang the connection until the space is available
+                  session.blockingWaitForSpace(producerExchange, result);
                }
             }
+            else if (sendProducerAck) {
+               // TODO-now: send through OperationContext
+               ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+               OpenWireConnection.this.dispatchAsync(ack);
+            }
          }
          catch (Throwable e) {
             if (e instanceof ActiveMQSecurityException) {
@@ -1056,15 +1228,26 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       @Override
       public Response processRecoverTransactions(TransactionInfo info) throws Exception {
          Set<SessionId> sIds = state.getSessionIds();
-         TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
-         return new DataArrayResponse(recovered);
+
+
+         List<TransactionId> recovered = new ArrayList<>();
+         if (sIds != null) {
+            for (SessionId sid : sIds) {
+               AMQSession s = sessions.get(sid);
+               if (s != null) {
+                  s.recover(recovered);
+               }
+            }
+         }
+
+         return new DataArrayResponse(recovered.toArray(new TransactionId[0]));
       }
 
       @Override
       public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
          //we let protocol manager to handle connection add/remove
          try {
-            protocolManager.removeConnection(OpenWireConnection.this, state.getInfo(), null);
+            protocolManager.removeConnection(state.getInfo(), null);
          }
          catch (Throwable e) {
             // log

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/168b98cb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index add1455..bdf27f8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,15 +17,12 @@
 package org.apache.activemq.artemis.core.protocol.openwire;
 
 import javax.jms.InvalidClientIDException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -40,26 +37,14 @@ import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
-import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
-import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.core.server.management.Notification;
-import org.apache.activemq.artemis.core.server.management.NotificationListener;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -69,7 +54,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
 import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
@@ -78,31 +62,24 @@ import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.openwire.OpenWireFormatFactory;
-import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.state.ProducerState;
-import org.apache.activemq.state.SessionState;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.InetAddressUtil;
 import org.apache.activemq.util.LongSequenceGenerator;
 
-public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener, ClusterTopologyListener {
+public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
 
    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
    private static final IdGenerator ID_GENERATOR = new IdGenerator();
@@ -127,21 +104,16 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
 
    protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<>();
+
    // Clebert TODO: use ConcurrentHashMap, or maybe use the schema that's already available on Artemis upstream (unique-client-id)
    private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
 
    private String brokerName;
 
-   // Clebert TODO: Artemis already stores the Session. Why do we need a different one here
-   private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
-
    // Clebert: Artemis already has a Resource Manager. Need to remove this..
    //          The TransactionID extends XATransactionID, so all we need is to convert the XID here
    private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<>();
 
-   // Clebert: Artemis session has meta-data support, perhaps we could reuse it here
-   private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
-
    private final Map<String, TopologyMember> topologyMap = new ConcurrentHashMap<>();
 
    private final LinkedList<TopologyMember> members = new LinkedList<>();
@@ -163,9 +135,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
       ManagementService service = server.getManagementService();
       scheduledPool = server.getScheduledPool();
-      if (service != null) {
-         service.addNotificationListener(this);
-      }
 
       final ClusterManager clusterManager = this.server.getClusterManager();
       ClusterConnection cc = clusterManager.getDefaultConnection(null);
@@ -187,6 +156,35 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       }
    }
 
+
+   public void removeConnection(ConnectionInfo info,
+                                Throwable error) throws InvalidClientIDException {
+      synchronized (clientIdSet) {
+         String clientId = info.getClientId();
+         if (clientId != null) {
+            AMQConnectionContext context = this.clientIdSet.get(clientId);
+            if (context != null && context.decRefCount() == 0) {
+               //connection is still there and need to close
+               context.getConnection().disconnect(error != null);
+               this.connections.remove(this);//what's that for?
+               this.clientIdSet.remove(clientId);
+            }
+         }
+         else {
+            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
+         }
+      }
+   }
+
+
+   public ScheduledExecutorService getScheduledPool() {
+      return scheduledPool;
+   }
+
+   public ActiveMQServer getServer() {
+      return server;
+   }
+
    private void updateClientClusterInfo() {
 
       synchronized (members) {
@@ -219,8 +217,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
       OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
-      OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection,  server.getExecutorFactory().getExecutor(), this, wf);
-      owConn.init();
+      OpenWireConnection owConn = new OpenWireConnection(connection, server.getExecutorFactory().getExecutor(), this, wf);
+      owConn.sendHandshake();
 
       // TODO CLEBERT What is this constant here? we should get it from TTL initial pings
       return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000);
@@ -233,7 +231,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    @Override
    public void removeHandler(String name) {
-      // TODO Auto-generated method stub
    }
 
    @Override
@@ -276,8 +273,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    @Override
    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
-      // TODO Auto-generated method stub
-
    }
 
    public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
@@ -322,11 +317,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
          fireAdvisory(context, topic, copy);
 
          // init the conn
-         addSessions(context.getConnection(), context.getConnectionState().getSessionIds());
+         context.getConnection().addSessions( context.getConnectionState().getSessionIds());
       }
    }
 
-   private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
+   public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
       this.fireAdvisory(context, topic, copy, null);
    }
 
@@ -341,7 +336,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    /*
     * See AdvisoryBroker.fireAdvisory()
     */
-   private void fireAdvisory(AMQConnectionContext context,
+   public void fireAdvisory(AMQConnectionContext context,
                              ActiveMQTopic topic,
                              Command command,
                              ConsumerId targetConsumerId) throws Exception {
@@ -448,198 +443,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    public boolean isStopping() {
       return false;
    }
-
-   public void addProducer(OpenWireConnection theConn, ProducerInfo info) throws Exception {
-      SessionId sessionId = info.getProducerId().getParentId();
-      ConnectionId connectionId = sessionId.getParentId();
-      ConnectionState cs = theConn.getState();
-      if (cs == null) {
-         throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId);
-      }
-      SessionState ss = cs.getSessionState(sessionId);
-      if (ss == null) {
-         throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
-      }
-      // Avoid replaying dup commands
-      if (!ss.getProducerIds().contains(info.getProducerId())) {
-
-         AMQSession amqSession = sessions.get(sessionId);
-         if (amqSession == null) {
-            throw new IllegalStateException("Session not exist! : " + sessionId);
-         }
-
-         ActiveMQDestination destination = info.getDestination();
-         if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
-            if (destination.isQueue()) {
-               OpenWireUtil.validateDestination(destination, amqSession);
-            }
-            DestinationInfo destInfo = new DestinationInfo(theConn.getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
-            this.addDestination(theConn, destInfo);
-         }
-
-         amqSession.createProducer(info);
-
-         try {
-            ss.addProducer(info);
-         }
-         catch (IllegalStateException e) {
-            amqSession.removeProducer(info);
-         }
-
-      }
-
-   }
-
-   public void updateConsumer(OpenWireConnection theConn, ConsumerControl consumerControl) {
-      SessionId sessionId = consumerControl.getConsumerId().getParentId();
-      AMQSession amqSession = sessions.get(sessionId);
-      amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch());
-   }
-
-   public void addConsumer(OpenWireConnection theConn, ConsumerInfo info) throws Exception {
-      // Todo: add a destination interceptors holder here (amq supports this)
-      SessionId sessionId = info.getConsumerId().getParentId();
-      ConnectionId connectionId = sessionId.getParentId();
-      ConnectionState cs = theConn.getState();
-      if (cs == null) {
-         throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId);
-      }
-      SessionState ss = cs.getSessionState(sessionId);
-      if (ss == null) {
-         throw new IllegalStateException(this.server + " Cannot add a consumer to a session that had not been registered: " + sessionId);
-      }
-      // Avoid replaying dup commands
-      if (!ss.getConsumerIds().contains(info.getConsumerId())) {
-
-         AMQSession amqSession = sessions.get(sessionId);
-         if (amqSession == null) {
-            throw new IllegalStateException("Session not exist! : " + sessionId);
-         }
-
-         amqSession.createConsumer(info, amqSession);
-
-         ss.addConsumer(info);
-      }
-   }
-
-   public void addSessions(OpenWireConnection theConn, Set<SessionId> sessionSet) {
-      Iterator<SessionId> iter = sessionSet.iterator();
-      while (iter.hasNext()) {
-         SessionId sid = iter.next();
-         addSession(theConn, theConn.getState().getSessionState(sid).getInfo(), true);
-      }
-   }
-
-   public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss) {
-      return addSession(theConn, ss, false);
-   }
-
-   public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss, boolean internal) {
-      AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, server, theConn, scheduledPool, this);
-      amqSession.initialize();
-      amqSession.setInternal(internal);
-      sessions.put(ss.getSessionId(), amqSession);
-      sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
-      return amqSession;
-   }
-
-   public void removeConnection(OpenWireConnection connection,
-                                ConnectionInfo info,
-                                Throwable error) throws InvalidClientIDException {
-      synchronized (clientIdSet) {
-         String clientId = info.getClientId();
-         if (clientId != null) {
-            AMQConnectionContext context = this.clientIdSet.get(clientId);
-            if (context != null && context.decRefCount() == 0) {
-               //connection is still there and need to close
-               this.clientIdSet.remove(clientId);
-               connection.disconnect(error != null);
-               this.connections.remove(connection);//what's that for?
-            }
-         }
-         else {
-            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
-         }
-      }
-   }
-
-   public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception {
-      AMQSession session = sessions.remove(info.getSessionId());
-      if (session != null) {
-         session.close();
-      }
-   }
-
-   public void removeProducer(ProducerId id) {
-      SessionId sessionId = id.getParentId();
-      AMQSession session = sessions.get(sessionId);
-      session.removeProducer(id);
-   }
-
-   public AMQSession getSession(SessionId sessionId) {
-      return sessions.get(sessionId);
-   }
-
-   public void removeDestination(OpenWireConnection connection, ActiveMQDestination dest) throws Exception {
-      if (dest.isQueue()) {
-         SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
-         this.server.destroyQueue(qName);
-      }
-      else {
-         Bindings bindings = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
-         Iterator<Binding> iterator = bindings.getBindings().iterator();
-
-         while (iterator.hasNext()) {
-            Queue b = (Queue) iterator.next().getBindable();
-            if (b.getConsumerCount() > 0) {
-               throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
-            }
-            if (b.isDurable()) {
-               throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
-            }
-            b.deleteQueue();
-         }
-      }
-
-      if (!AdvisorySupport.isAdvisoryTopic(dest)) {
-         AMQConnectionContext context = connection.getContext();
-         DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
-
-         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
-         fireAdvisory(context, topic, advInfo);
-      }
-   }
-
-   public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception {
-      ActiveMQDestination dest = info.getDestination();
-      if (dest.isQueue()) {
-         SimpleString qName = OpenWireUtil.toCoreAddress(dest);
-         QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
-         if (binding == null) {
-            if (connection.getState().getInfo() != null) {
-
-               CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
-               server.getSecurityStore().check(qName, checkType, connection);
-
-               server.checkQueueCreationLimit(connection.getUsername());
-            }
-            ConnectionInfo connInfo = connection.getState().getInfo();
-            this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
-         }
-         if (dest.isTemporary()) {
-            connection.registerTempQueue(dest);
-         }
-      }
-
-      if (!AdvisorySupport.isAdvisoryTopic(dest)) {
-         AMQConnectionContext context = connection.getContext();
-         DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
-
-         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
-         fireAdvisory(context, topic, advInfo);
-      }
-   }
-
    public void endTransaction(TransactionInfo info) throws Exception {
       AMQSession txSession = transactions.get(info.getTransactionId());
 
@@ -682,19 +485,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       transactions.remove(info.getTransactionId());
    }
 
-   public TransactionId[] recoverTransactions(Set<SessionId> sIds) {
-      List<TransactionId> recovered = new ArrayList<>();
-      if (sIds != null) {
-         for (SessionId sid : sIds) {
-            AMQSession s = this.sessions.get(sid);
-            if (s != null) {
-               s.recover(recovered);
-            }
-         }
-      }
-      return recovered.toArray(new TransactionId[0]);
-   }
-
    public boolean validateUser(String login, String passcode) {
       boolean validated = true;
 
@@ -717,50 +507,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    /**
     * TODO: remove this, use the regular ResourceManager from the Server's
-    * */
+    */
    public void registerTx(TransactionId txId, AMQSession amqSession) {
       transactions.put(txId, amqSession);
    }
 
-   //advisory support
-   @Override
-   public void onNotification(Notification notif) {
-      try {
-         if (notif.getType() instanceof CoreNotificationType) {
-            CoreNotificationType type = (CoreNotificationType) notif.getType();
-            switch (type) {
-               case CONSUMER_SLOW:
-                  fireSlowConsumer(notif);
-                  break;
-               default:
-                  break;
-            }
-         }
-      }
-      catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e);
-      }
-   }
-
-   private void fireSlowConsumer(Notification notif) throws Exception {
-      SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME);
-      Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME);
-      SessionId sessionId = sessionIdMap.get(coreSessionId.toString());
-      AMQSession session = sessions.get(sessionId);
-      AMQConsumer consumer = session.getConsumer(coreConsumerId);
-      ActiveMQDestination destination = consumer.getDestination();
-
-      if (!AdvisorySupport.isAdvisoryTopic(destination)) {
-         ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
-         ConnectionId connId = sessionId.getParentId();
-         OpenWireConnection cc = this.brokerConnectionStates.get(connId);
-         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-         advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
-
-         fireAdvisory(cc.getContext(), topic, advisoryMessage, consumer.getId());
-      }
-   }
-
    public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
       SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
       server.destroyQueue(subQueueName);
@@ -795,7 +546,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       return this.updateClusterClients;
    }
 
-   public  void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
+   public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
       this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/168b98cb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index b0f007a..221679f 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,7 +27,14 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
@@ -36,14 +43,9 @@ import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 
 public class AMQConsumer implements BrowserListener {
+
    private AMQSession session;
    private org.apache.activemq.command.ActiveMQDestination actualDest;
    private ConsumerInfo info;
@@ -72,7 +74,7 @@ public class AMQConsumer implements BrowserListener {
       }
    }
 
-   public void init() throws Exception {
+   public void init(SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
       AMQServerSession coreSession = session.getCoreSession();
 
       SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
@@ -127,7 +129,9 @@ public class AMQConsumer implements BrowserListener {
             coreSession.createQueue(address, subQueueName, selector, true, false);
          }
 
-         coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
+         AMQServerConsumer serverConsumer = (AMQServerConsumer) coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
+         serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
+         serverConsumer.setAmqConsumer(this);
       }
       else {
          SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/168b98cb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
index e9c4044..b5d8dbd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
@@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
 import org.apache.activemq.state.ProducerState;
 
 public class AMQProducerBrokerExchange {
@@ -28,7 +26,6 @@ public class AMQProducerBrokerExchange {
    private AMQConnectionContext connectionContext;
    private ProducerState producerState;
    private boolean mutable = true;
-   private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
    private final FlowControlInfo flowControlInfo = new FlowControlInfo();
 
    public AMQProducerBrokerExchange() {
@@ -57,13 +54,6 @@ public class AMQProducerBrokerExchange {
    }
 
    /**
-    * @return the mutable
-    */
-   public boolean isMutable() {
-      return this.mutable;
-   }
-
-   /**
     * @param mutable the mutable to set
     */
    public void setMutable(boolean mutable) {
@@ -84,75 +74,13 @@ public class AMQProducerBrokerExchange {
       this.producerState = producerState;
    }
 
-   /**
-    * Enforce duplicate suppression using info from persistence adapter
-    *
-    * @return false if message should be ignored as a duplicate
-    */
-   public boolean canDispatch(Message messageSend) {
-      // TODO: auditProduceSequenceIds is never true
-      boolean canDispatch = true;
-      //TODO: DEAD CODE
-//      if (auditProducerSequenceIds && messageSend.isPersistent()) {
-//         final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
-//         if (isNetworkProducer) {
-//            // messages are multiplexed on this producer so we need to query the
-//            // persistenceAdapter
-//            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
-//            if (producerSequenceId <= lastStoredForMessageProducer) {
-//               canDispatch = false;
-//            }
-//         }
-//         else if (producerSequenceId <= lastSendSequenceNumber.get()) {
-//            canDispatch = false;
-//            // TODO: WHAT IS THIS?
-//            if (messageSend.isInTransaction()) {
-//
-//
-//            }
-//            else {
-//            }
-//         }
-//         else {
-//            // track current so we can suppress duplicates later in the stream
-//            lastSendSequenceNumber.set(producerSequenceId);
-//         }
-//      }
-      return canDispatch;
-   }
-
-   private long getStoredSequenceIdForMessage(MessageId messageId) {
-      return -1;
-   }
-
    public void setLastStoredSequenceId(long l) {
    }
 
-   public void incrementSend() {
-      flowControlInfo.incrementSend();
-   }
-
    public void blockingOnFlowControl(boolean blockingOnFlowControl) {
       flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
    }
 
-   public boolean isBlockedForFlowControl() {
-      return flowControlInfo.isBlockingOnFlowControl();
-   }
-
-   public void resetFlowControl() {
-      flowControlInfo.reset();
-   }
-
-   public long getTotalTimeBlocked() {
-      return flowControlInfo.getTotalTimeBlocked();
-   }
-
-   public int getPercentageBlocked() {
-      double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
-      return (int) value * 100;
-   }
-
    public static class FlowControlInfo {
 
       private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
@@ -160,10 +88,6 @@ public class AMQProducerBrokerExchange {
       private AtomicLong sendsBlocked = new AtomicLong();
       private AtomicLong totalTimeBlocked = new AtomicLong();
 
-      public boolean isBlockingOnFlowControl() {
-         return blockingOnFlowControl.get();
-      }
-
       public void setBlockingOnFlowControl(boolean blockingOnFlowControl) {
          this.blockingOnFlowControl.set(blockingOnFlowControl);
          if (blockingOnFlowControl) {
@@ -171,30 +95,10 @@ public class AMQProducerBrokerExchange {
          }
       }
 
-      public long getTotalSends() {
-         return totalSends.get();
-      }
-
-      public void incrementSend() {
-         this.totalSends.incrementAndGet();
-      }
-
-      public long getSendsBlocked() {
-         return sendsBlocked.get();
-      }
-
       public void incrementSendBlocked() {
          this.sendsBlocked.incrementAndGet();
       }
 
-      public long getTotalTimeBlocked() {
-         return totalTimeBlocked.get();
-      }
-
-      public void incrementTimeBlocked(long time) {
-         this.totalTimeBlocked.addAndGet(time);
-      }
-
       public void reset() {
          blockingOnFlowControl.set(false);
          totalSends.set(0);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/168b98cb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
index b0ec7ed..3f94351 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
@@ -33,6 +33,18 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
 public class AMQServerConsumer extends ServerConsumerImpl {
 
+   // TODO-NOW: remove this once unified
+   AMQConsumer amqConsumer;
+
+   public AMQConsumer getAmqConsumer() {
+      return amqConsumer;
+   }
+
+   /** TODO-NOW: remove this once unified */
+   public void setAmqConsumer(AMQConsumer amqConsumer) {
+      this.amqConsumer = amqConsumer;
+   }
+
    public AMQServerConsumer(long consumerID,
                             AMQServerSession serverSession,
                             QueueBinding binding,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/168b98cb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 83709a1..e59295d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
@@ -71,8 +72,6 @@ public class AMQSession implements SessionCallback {
 
    private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>();
 
-   private Map<Long, AMQProducer> producers = new HashMap<>();
-
    private AtomicBoolean started = new AtomicBoolean(false);
 
    private TransactionId txId = null;
@@ -121,7 +120,7 @@ public class AMQSession implements SessionCallback {
 
    }
 
-   public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception {
+   public void createConsumer(ConsumerInfo info, AMQSession amqSession, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
       //check destination
       ActiveMQDestination dest = info.getDestination();
       ActiveMQDestination[] dests = null;
@@ -139,7 +138,7 @@ public class AMQSession implements SessionCallback {
          }
          AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
 
-         consumer.init();
+         consumer.init(slowConsumerDetectionListener);
          consumerMap.put(d, consumer);
          consumers.put(consumer.getNativeId(), consumer);
       }
@@ -228,20 +227,6 @@ public class AMQSession implements SessionCallback {
       consumers.remove(consumerId);
    }
 
-   public void createProducer(ProducerInfo info) throws Exception {
-      AMQProducer producer = new AMQProducer(this, info);
-      producer.init();
-      producers.put(info.getProducerId().getValue(), producer);
-   }
-
-   public void removeProducer(ProducerInfo info) {
-      removeProducer(info.getProducerId());
-   }
-
-   public void removeProducer(ProducerId id) {
-      producers.remove(id.getValue());
-   }
-
    public SendingResult send(AMQProducerBrokerExchange producerExchange,
                              Message messageSend,
                              boolean sendProducerAck) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/168b98cb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index 6045e2c..d75efdd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -25,6 +25,12 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
  */
 public interface ServerConsumer extends Consumer {
 
+   void setlowConsumerDetection(SlowConsumerDetectionListener listener);
+
+   SlowConsumerDetectionListener getSlowConsumerDetecion();
+
+   void fireSlowConsumer();
+
    /**
     * @param protocolContext
     * @see #getProtocolContext()

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/168b98cb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
new file mode 100644
index 0000000..0c60f25
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server;
+
+public interface SlowConsumerDetectionListener {
+   void onSlowConsumer(ServerConsumer consumer);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/168b98cb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 8bf5d08..86ca36c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2930,6 +2930,8 @@ public class QueueImpl implements Queue {
                      }
                   }
 
+                  serverConsumer.fireSlowConsumer();
+
                   if (connection != null) {
                      ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
                      if (policy.equals(SlowConsumerPolicy.KILL)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/168b98cb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 25c657e..ed6a173 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -86,6 +87,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    private Object protocolContext;
 
+   private SlowConsumerDetectionListener slowConsumerListener;
+
    /**
     * We get a readLock when a message is handled, and return the readLock when the message is finally delivered
     * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
@@ -220,6 +223,21 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    // ----------------------------------------------------------------------
 
    @Override
+   public void setlowConsumerDetection(SlowConsumerDetectionListener listener) {
+      this.slowConsumerListener = listener;
+   }
+
+   @Override
+   public SlowConsumerDetectionListener getSlowConsumerDetecion() {
+      return slowConsumerListener;
+   }
+
+   @Override
+   public void fireSlowConsumer() {
+      slowConsumerListener.onSlowConsumer(this);
+   }
+
+   @Override
    public Object getProtocolContext() {
       return protocolContext;
    }