You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/03/31 17:24:59 UTC

[1/4] activemq-6 git commit: ACTIVEMQ6-89 Refactored stomp support contributed

Repository: activemq-6
Updated Branches:
  refs/heads/master 9da0a37b8 -> c65ca252f


ACTIVEMQ6-89 Refactored stomp support contributed

https://issues.apache.org/jira/browse/ACTIVEMQ6-89

I have done a lot of refactoring on this. So we can a different version of the interceptor for each protocol based on a base class now.
Just an abstract class over Stomp would be a bit hacky... this is a better approach.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/519a47f0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/519a47f0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/519a47f0

Branch: refs/heads/master
Commit: 519a47f023ff24a474160dd6e61c2809dbdd5764
Parents: b2524b1
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Mar 16 15:11:25 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 31 11:23:05 2015 -0400

----------------------------------------------------------------------
 .../activemq/api/core/BaseInterceptor.java      |  35 +++++
 .../apache/activemq/api/core/Interceptor.java   |  13 +-
 .../protocol/proton/ProtonProtocolManager.java  |  26 +++-
 .../proton/ProtonProtocolManagerFactory.java    |  15 +-
 .../openwire/OpenWireProtocolManager.java       | 141 +++++++++++--------
 .../OpenWireProtocolManagerFactory.java         |  14 +-
 .../protocol/stomp/StompFrameInterceptor.java   |  33 +----
 .../protocol/stomp/StompProtocolManager.java    |  56 +++++---
 .../stomp/StompProtocolManagerFactory.java      |  16 ++-
 .../protocol/core/impl/CoreProtocolManager.java |  30 +++-
 .../core/impl/CoreProtocolManagerFactory.java   |  23 ++-
 .../server/impl/RemotingServiceImpl.java        |  55 ++++++--
 .../AbstractProtocolManagerFactory.java         |  57 ++++++++
 .../spi/core/protocol/ProtocolManager.java      |  16 ++-
 .../core/protocol/ProtocolManagerFactory.java   |  22 ++-
 docs/user-manual/en/intercepting-operations.md  |   4 +-
 .../tests/integration/stomp/ExtraStompTest.java |  98 +++++++++----
 .../tests/integration/stomp/StompTestBase.java  |   2 +-
 18 files changed, 463 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java
new file mode 100644
index 0000000..57c788b
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.api.core;
+
+import org.apache.activemq.spi.core.protocol.RemotingConnection;
+
+public interface BaseInterceptor<P>
+{
+   /**
+    * Intercepts a packet which is received before it is sent to the channel
+    *
+    * @param packet     the packet being received
+    * @param connection the connection the packet was received on
+    * @return {@code true} to process the next interceptor and handle the packet,
+    * {@code false} to abort processing of the packet
+    * @throws ActiveMQException
+    */
+   boolean intercept(P packet, RemotingConnection connection) throws ActiveMQException;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java
index a1fa94a..7cfee0b 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.api.core;
 
 import org.apache.activemq.core.protocol.core.Packet;
-import org.apache.activemq.spi.core.protocol.RemotingConnection;
 
 /**
  * This is class is a simple way to intercepting calls on ActiveMQ client and servers.
@@ -26,16 +25,6 @@ import org.apache.activemq.spi.core.protocol.RemotingConnection;
  * {@literal activemq-configuration.xml}.<br>
  * To add it to a client, use {@link org.apache.activemq.api.core.client.ServerLocator#addIncomingInterceptor(Interceptor)}
  */
-public interface Interceptor
+public interface Interceptor extends BaseInterceptor<Packet>
 {
-   /**
-    * Intercepts a packet which is received before it is sent to the channel
-    *
-    * @param packet     the packet being received
-    * @param connection the connection the packet was received on
-    * @return {@code true} to process the next interceptor and handle the packet,
-    * {@code false} to abort processing of the packet
-    * @throws ActiveMQException
-    */
-   boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java
index b3f3211..dd88cab 100644
--- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java
+++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java
@@ -16,10 +16,13 @@
  */
 package org.apache.activemq.core.protocol.proton;
 
+import java.util.List;
 import java.util.concurrent.Executor;
 
 import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.BaseInterceptor;
+import org.apache.activemq.api.core.Interceptor;
 import org.apache.activemq.api.core.client.ActiveMQClient;
 import org.apache.activemq.core.protocol.proton.converter.ProtonMessageConverter;
 import org.apache.activemq.core.protocol.proton.plug.ActiveMQProtonConnectionCallback;
@@ -30,6 +33,7 @@ import org.apache.activemq.core.server.management.NotificationListener;
 import org.apache.activemq.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.spi.core.protocol.MessageConverter;
 import org.apache.activemq.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.spi.core.remoting.Acceptor;
 import org.apache.activemq.spi.core.remoting.Connection;
@@ -39,14 +43,18 @@ import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
 /**
  * A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ resources
  */
-public class ProtonProtocolManager implements ProtocolManager, NotificationListener
+public class ProtonProtocolManager implements ProtocolManager<Interceptor>, NotificationListener
 {
    private final ActiveMQServer server;
 
    private MessageConverter protonConverter;
 
-   public ProtonProtocolManager(ActiveMQServer server)
+
+   private final ProtonProtocolManagerFactory factory;
+
+   public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server)
    {
+      this.factory = factory;
       this.server = server;
       this.protonConverter = new ProtonMessageConverter(server.getStorageManager());
    }
@@ -70,6 +78,18 @@ public class ProtonProtocolManager implements ProtocolManager, NotificationListe
    }
 
    @Override
+   public ProtocolManagerFactory<Interceptor> getFactory()
+   {
+      return factory;
+   }
+
+   @Override
+   public void updateInterceptors(List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors)
+   {
+      // no op
+   }
+
+   @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection)
    {
       ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection);
@@ -97,7 +117,7 @@ public class ProtonProtocolManager implements ProtocolManager, NotificationListe
    @Override
    public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer)
    {
-      ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection)connection;
+      ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection) connection;
 
       protonConnection.bufferReceived(protonConnection.getID(), buffer);
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java
index c83f10c..6fe2576 100644
--- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java
+++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java
@@ -16,14 +16,16 @@
  */
 package org.apache.activemq.core.protocol.proton;
 
+import org.apache.activemq.api.core.BaseInterceptor;
 import org.apache.activemq.api.core.Interceptor;
 import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
 
+import java.util.Collections;
 import java.util.List;
 
-public class ProtonProtocolManagerFactory implements ProtocolManagerFactory
+public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor>
 {
    private static final String AMQP_PROTOCOL_NAME = "AMQP";
 
@@ -32,7 +34,14 @@ public class ProtonProtocolManagerFactory implements ProtocolManagerFactory
    @Override
    public ProtocolManager createProtocolManager(ActiveMQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
    {
-      return new ProtonProtocolManager(server);
+      return new ProtonProtocolManager(this, server);
+   }
+
+   @Override
+   public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors)
+   {
+      // no interceptors on Proton
+      return Collections.emptyList();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
index dfa4e4a..74d7d77 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.core.protocol.openwire;
 
+import javax.jms.InvalidClientIDException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -27,12 +28,12 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import javax.jms.InvalidClientIDException;
-
 import io.netty.channel.ChannelPipeline;
-
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.BaseInterceptor;
+import org.apache.activemq.api.core.Interceptor;
+import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -54,16 +55,6 @@ import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.core.server.ActiveMQServerLogger;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.openwire.OpenWireFormatFactory;
-import org.apache.activemq.state.ConnectionState;
-import org.apache.activemq.state.ProducerState;
-import org.apache.activemq.state.SessionState;
-import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.util.InetAddressUtil;
-import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.core.journal.IOAsyncTask;
 import org.apache.activemq.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.core.protocol.openwire.amq.AMQPersistenceAdapter;
@@ -74,16 +65,26 @@ import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionStat
 import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.core.security.CheckType;
 import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.ActiveMQServerLogger;
 import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
 import org.apache.activemq.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.spi.core.protocol.MessageConverter;
 import org.apache.activemq.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.spi.core.remoting.Acceptor;
 import org.apache.activemq.spi.core.remoting.Connection;
 import org.apache.activemq.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.state.ConnectionState;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.state.SessionState;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.InetAddressUtil;
+import org.apache.activemq.util.LongSequenceGenerator;
 
-public class OpenWireProtocolManager implements ProtocolManager
+public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
 {
    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
    private static final IdGenerator ID_GENERATOR = new IdGenerator();
@@ -91,6 +92,8 @@ public class OpenWireProtocolManager implements ProtocolManager
    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
    private final ActiveMQServer server;
 
+   private final OpenWireProtocolManagerFactory factory;
+
    private OpenWireFormatFactory wireFactory;
 
    private boolean tightEncodingEnabled = true;
@@ -104,7 +107,7 @@ public class OpenWireProtocolManager implements ProtocolManager
 
    // from broker
    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates = Collections
-         .synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
+      .synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
 
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>();
 
@@ -118,8 +121,9 @@ public class OpenWireProtocolManager implements ProtocolManager
 
    private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<TransactionId, AMQSession>();
 
-   public OpenWireProtocolManager(ActiveMQServer server)
+   public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server)
    {
+      this.factory = factory;
       this.server = server;
       this.wireFactory = new OpenWireFormatFactory();
       // preferred prop, should be done via config
@@ -128,17 +132,30 @@ public class OpenWireProtocolManager implements ProtocolManager
       advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
    }
 
+
+   public ProtocolManagerFactory<Interceptor> getFactory()
+   {
+      return factory;
+   }
+
+
+   @Override
+   public void updateInterceptors(List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors)
+   {
+      // NO-OP
+   }
+
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed,
-         Connection connection)
+                                                Connection connection)
    {
       OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
       OpenWireConnection owConn = new OpenWireConnection(acceptorUsed,
-            connection, this, wf);
+                                                         connection, this, wf);
       owConn.init();
 
       return new ConnectionEntry(owConn, null, System.currentTimeMillis(),
-            1 * 60 * 1000);
+                                 1 * 60 * 1000);
    }
 
    @Override
@@ -171,7 +188,7 @@ public class OpenWireProtocolManager implements ProtocolManager
       if (array.length < 8)
       {
          throw new IllegalArgumentException("Protocol header length changed "
-               + array.length);
+                                               + array.length);
       }
 
       int start = this.prefixPacketSize ? 4 : 0;
@@ -207,7 +224,7 @@ public class OpenWireProtocolManager implements ProtocolManager
    }
 
    public void handleCommand(OpenWireConnection openWireConnection,
-         Object command)
+                             Object command)
    {
       Command amqCmd = (Command) command;
       byte type = amqCmd.getDataStructureType();
@@ -221,14 +238,14 @@ public class OpenWireProtocolManager implements ProtocolManager
    }
 
    public void sendReply(final OpenWireConnection connection,
-         final Command command)
+                         final Command command)
    {
       server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
       {
          public void onError(final int errorCode, final String errorMessage)
          {
             ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode,
-                  errorMessage);
+                                                                  errorMessage);
          }
 
          public void done()
@@ -285,7 +302,7 @@ public class OpenWireProtocolManager implements ProtocolManager
       if (clientId == null)
       {
          throw new InvalidClientIDException(
-               "No clientID specified for connection request");
+            "No clientID specified for connection request");
       }
       synchronized (clientIdSet)
       {
@@ -308,8 +325,8 @@ public class OpenWireProtocolManager implements ProtocolManager
             else
             {
                throw new InvalidClientIDException("Broker: " + getBrokerName()
-                     + " - Client: " + clientId + " already connected from "
-                     + oldContext.getConnection().getRemoteAddress());
+                                                     + " - Client: " + clientId + " already connected from "
+                                                     + oldContext.getConnection().getRemoteAddress());
             }
          }
          else
@@ -329,11 +346,11 @@ public class OpenWireProtocolManager implements ProtocolManager
 
       // init the conn
       addSessions(context.getConnection(), context.getConnectionState()
-            .getSessionIds());
+         .getSessionIds());
    }
 
    private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic,
-         Command copy) throws Exception
+                             Command copy) throws Exception
    {
       this.fireAdvisory(context, topic, copy, null);
    }
@@ -351,26 +368,26 @@ public class OpenWireProtocolManager implements ProtocolManager
     * See AdvisoryBroker.fireAdvisory()
     */
    private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic,
-         Command command, ConsumerId targetConsumerId) throws Exception
+                             Command command, ConsumerId targetConsumerId) throws Exception
    {
       ActiveMQMessage advisoryMessage = new ActiveMQMessage();
       advisoryMessage.setStringProperty(
-            AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
+         AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
       String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
       advisoryMessage.setStringProperty(
-            AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
+         AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
 
       String url = "tcp://localhost:61616";
 
       advisoryMessage.setStringProperty(
-            AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
+         AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
 
       // set the data structure
       advisoryMessage.setDataStructure(command);
       advisoryMessage.setPersistent(false);
       advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
       advisoryMessage.setMessageId(new MessageId(advisoryProducerId,
-            messageIdGenerator.getNextSequenceId()));
+                                                 messageIdGenerator.getNextSequenceId()));
       advisoryMessage.setTargetConsumerId(targetConsumerId);
       advisoryMessage.setDestination(topic);
       advisoryMessage.setResponseRequired(false);
@@ -402,7 +419,7 @@ public class OpenWireProtocolManager implements ProtocolManager
          try
          {
             brokerName = InetAddressUtil.getLocalHostName().toLowerCase(
-                  Locale.ENGLISH);
+               Locale.ENGLISH);
          }
          catch (Exception e)
          {
@@ -445,34 +462,34 @@ public class OpenWireProtocolManager implements ProtocolManager
       SessionId sessionId = info.getProducerId().getParentId();
       ConnectionId connectionId = sessionId.getParentId();
       AMQTransportConnectionState cs = theConn
-            .lookupConnectionState(connectionId);
+         .lookupConnectionState(connectionId);
       if (cs == null)
       {
          throw new IllegalStateException(
-               "Cannot add a producer to a connection that had not been registered: "
-                     + connectionId);
+            "Cannot add a producer to a connection that had not been registered: "
+               + connectionId);
       }
       SessionState ss = cs.getSessionState(sessionId);
       if (ss == null)
       {
          throw new IllegalStateException(
-               "Cannot add a producer to a session that had not been registered: "
-                     + sessionId);
+            "Cannot add a producer to a session that had not been registered: "
+               + sessionId);
       }
       // Avoid replaying dup commands
       if (!ss.getProducerIds().contains(info.getProducerId()))
       {
          ActiveMQDestination destination = info.getDestination();
          if (destination != null
-               && !AdvisorySupport.isAdvisoryTopic(destination))
+            && !AdvisorySupport.isAdvisoryTopic(destination))
          {
             if (theConn.getProducerCount(connectionId) >= theConn
-                  .getMaximumProducersAllowedPerConnection())
+               .getMaximumProducersAllowedPerConnection())
             {
                throw new IllegalStateException(
-                     "Can't add producer on connection " + connectionId
-                           + ": at maximum limit: "
-                           + theConn.getMaximumProducersAllowedPerConnection());
+                  "Can't add producer on connection " + connectionId
+                     + ": at maximum limit: "
+                     + theConn.getMaximumProducersAllowedPerConnection());
             }
          }
 
@@ -503,35 +520,35 @@ public class OpenWireProtocolManager implements ProtocolManager
       SessionId sessionId = info.getConsumerId().getParentId();
       ConnectionId connectionId = sessionId.getParentId();
       AMQTransportConnectionState cs = theConn
-            .lookupConnectionState(connectionId);
+         .lookupConnectionState(connectionId);
       if (cs == null)
       {
          throw new IllegalStateException(
-               "Cannot add a consumer to a connection that had not been registered: "
-                     + connectionId);
+            "Cannot add a consumer to a connection that had not been registered: "
+               + connectionId);
       }
       SessionState ss = cs.getSessionState(sessionId);
       if (ss == null)
       {
          throw new IllegalStateException(
-               this.server
-                     + " Cannot add a consumer to a session that had not been registered: "
-                     + sessionId);
+            this.server
+               + " Cannot add a consumer to a session that had not been registered: "
+               + sessionId);
       }
       // Avoid replaying dup commands
       if (!ss.getConsumerIds().contains(info.getConsumerId()))
       {
          ActiveMQDestination destination = info.getDestination();
          if (destination != null
-               && !AdvisorySupport.isAdvisoryTopic(destination))
+            && !AdvisorySupport.isAdvisoryTopic(destination))
          {
             if (theConn.getConsumerCount(connectionId) >= theConn
-                  .getMaximumConsumersAllowedPerConnection())
+               .getMaximumConsumersAllowedPerConnection())
             {
                throw new IllegalStateException(
-                     "Can't add consumer on connection " + connectionId
-                           + ": at maximum limit: "
-                           + theConn.getMaximumConsumersAllowedPerConnection());
+                  "Can't add consumer on connection " + connectionId
+                     + ": at maximum limit: "
+                     + theConn.getMaximumConsumersAllowedPerConnection());
             }
          }
 
@@ -562,7 +579,7 @@ public class OpenWireProtocolManager implements ProtocolManager
       {
          SessionId sid = iter.next();
          addSession(theConn, theConn.getState().getSessionState(sid).getInfo(),
-               true);
+                    true);
       }
    }
 
@@ -572,10 +589,10 @@ public class OpenWireProtocolManager implements ProtocolManager
    }
 
    public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss,
-         boolean internal)
+                                boolean internal)
    {
       AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss,
-            server, theConn, this);
+                                             server, theConn, this);
       amqSession.initialize();
       amqSession.setInternal(internal);
       sessions.put(ss.getSessionId(), amqSession);
@@ -583,7 +600,7 @@ public class OpenWireProtocolManager implements ProtocolManager
    }
 
    public void removeConnection(AMQConnectionContext context,
-         ConnectionInfo info, Throwable error)
+                                ConnectionInfo info, Throwable error)
    {
       // todo roll back tx
       this.connections.remove(context.getConnection());
@@ -630,13 +647,13 @@ public class OpenWireProtocolManager implements ProtocolManager
    }
 
    public void addDestination(OpenWireConnection connection,
-         DestinationInfo info) throws Exception
+                              DestinationInfo info) throws Exception
    {
       ActiveMQDestination dest = info.getDestination();
       if (dest.isQueue())
       {
          SimpleString qName = new SimpleString("jms.queue."
-               + dest.getPhysicalName());
+                                                  + dest.getPhysicalName());
          ConnectionState state = connection.brokerConnectionStates.get(info.getConnectionId());
          ConnectionInfo connInfo = state.getInfo();
          if (connInfo != null)
@@ -646,7 +663,7 @@ public class OpenWireProtocolManager implements ProtocolManager
 
             AMQServerSession fakeSession = new AMQServerSession(user, pass);
             CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
-            ((ActiveMQServerImpl)server).getSecurityStore().check(qName, checkType, fakeSession);
+            ((ActiveMQServerImpl) server).getSecurityStore().check(qName, checkType, fakeSession);
          }
          this.server.createQueue(qName, qName, null, false, true);
          if (dest.isTemporary())

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
index 42f8f4d..5593ee6 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
@@ -16,14 +16,16 @@
  */
 package org.apache.activemq.core.protocol.openwire;
 
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.activemq.api.core.BaseInterceptor;
 import org.apache.activemq.api.core.Interceptor;
 import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
 
-public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory
+public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor>
 {
    public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE";
 
@@ -31,7 +33,13 @@ public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory
 
    public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
    {
-      return new OpenWireProtocolManager(server);
+      return new OpenWireProtocolManager(this, server);
+   }
+
+   @Override
+   public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors)
+   {
+      return Collections.emptyList();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java
index 9af6e27..619c29f 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.core.protocol.stomp;
 
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.api.core.Interceptor;
-import org.apache.activemq.core.protocol.core.Packet;
-import org.apache.activemq.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.api.core.BaseInterceptor;
 
 /**
  * This class is a simple way to intercepting client calls on ActiveMQ using STOMP protocol.
@@ -27,32 +24,6 @@ import org.apache.activemq.spi.core.protocol.RemotingConnection;
  * To add an interceptor to ActiveMQ server, you have to modify the server configuration file
  * {@literal activemq-configuration.xml}.<br>
  */
-public abstract class StompFrameInterceptor implements Interceptor
+public interface StompFrameInterceptor extends BaseInterceptor<StompFrame>
 {
-
-   /**
-    * Intercepts a packet which is received before it is sent to the channel.
-    * By default does not do anything and returns true allowing other interceptors perform logic.
-    *
-    * @param packet     the packet being received
-    * @param connection the connection the packet was received on
-    * @return {@code true} to process the next interceptor and handle the packet,
-    * {@code false} to abort processing of the packet
-    * @throws ActiveMQException
-    */
-   @Override
-   public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
-   {
-      return true;
-   }
-
-  /**
-   * Intercepts a stomp frame sent by a client.
-   *
-   * @param stompFrame     the stomp frame being received
-   * @param connection the connection the stomp frame was received on
-   * @return {@code true} to process the next interceptor and handle the stomp frame,
-   * {@code false} to abort processing of the stomp frame
-   */
-   public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
index 9fb0362..3780180 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
@@ -26,10 +26,9 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 
 import io.netty.channel.ChannelPipeline;
-
 import org.apache.activemq.api.core.ActiveMQBuffer;
 import org.apache.activemq.api.core.ActiveMQExceptionType;
-import org.apache.activemq.api.core.Interceptor;
+import org.apache.activemq.api.core.BaseInterceptor;
 import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.api.core.client.ActiveMQClient;
 import org.apache.activemq.api.core.management.CoreNotificationType;
@@ -49,6 +48,7 @@ import org.apache.activemq.core.server.management.NotificationListener;
 import org.apache.activemq.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.spi.core.protocol.MessageConverter;
 import org.apache.activemq.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.spi.core.remoting.Acceptor;
 import org.apache.activemq.spi.core.remoting.Connection;
@@ -62,7 +62,7 @@ import static org.apache.activemq.core.protocol.stomp.ActiveMQStompProtocolMessa
 /**
  * StompProtocolManager
  */
-class StompProtocolManager implements ProtocolManager, NotificationListener
+class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, NotificationListener
 {
    // Constants -----------------------------------------------------
 
@@ -70,6 +70,8 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
 
    private final ActiveMQServer server;
 
+   private final StompProtocolManagerFactory factory;
+
    private final Executor executor;
 
    private final Map<String, StompSession> transactedSessions = new HashMap<String, StompSession>();
@@ -79,15 +81,16 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
 
    private final Set<String> destinations = new ConcurrentHashSet<String>();
 
-   private final List<Interceptor> incomingInterceptors;
-   private final List<Interceptor> outgoingInterceptors;
+   private final List<StompFrameInterceptor> incomingInterceptors;
+   private final List<StompFrameInterceptor> outgoingInterceptors;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public StompProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, final List<Interceptor> outgoingInterceptors)
+   public StompProtocolManager(final StompProtocolManagerFactory factory, final ActiveMQServer server, final List<StompFrameInterceptor> incomingInterceptors, final List<StompFrameInterceptor> outgoingInterceptors)
    {
+      this.factory = factory;
       this.server = server;
       this.executor = server.getExecutorFactory().getExecutor();
       ManagementService service = server.getManagementService();
@@ -102,6 +105,22 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
    }
 
    @Override
+   public ProtocolManagerFactory<StompFrameInterceptor> getFactory()
+   {
+      return factory;
+   }
+
+   @Override
+   public void updateInterceptors(List<BaseInterceptor> incoming, List<BaseInterceptor> outgoing)
+   {
+      this.incomingInterceptors.clear();
+      this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
+
+      this.outgoingInterceptors.clear();
+      this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
+   }
+
+   @Override
    public MessageConverter getConverter()
    {
       return null;
@@ -345,7 +364,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
             ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
 
             ActiveMQStompException e = new ActiveMQStompException("Error sending reply",
-                                                                ActiveMQExceptionType.createException(errorCode, errorMessage));
+                                                                  ActiveMQExceptionType.createException(errorCode, errorMessage));
 
             StompFrame error = e.getFrame();
             send(connection, error);
@@ -419,7 +438,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
       if (stompSession.containsSubscription(subscriptionID))
       {
          throw new ActiveMQStompException("There already is a subscription for: " + subscriptionID +
-                                            ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
+                                             ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
       }
       long consumerID = server.getStorageManager().generateID();
       String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
@@ -514,26 +533,23 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
       return server;
    }
 
-   private void invokeInterceptors(List<Interceptor> interceptors, final StompFrame frame, final StompConnection connection)
+   private void invokeInterceptors(List<StompFrameInterceptor> interceptors, final StompFrame frame, final StompConnection connection)
    {
       if (interceptors != null && !interceptors.isEmpty())
       {
-         for (Interceptor interceptor : interceptors)
+         for (StompFrameInterceptor interceptor : interceptors)
          {
-            if (interceptor instanceof StompFrameInterceptor)
+            try
             {
-               try
+               if (!interceptor.intercept(frame, connection))
                {
-                  if (!((StompFrameInterceptor)interceptor).intercept(frame, connection))
-                  {
-                     break;
-                  }
-               }
-               catch (Exception e)
-               {
-                  ActiveMQServerLogger.LOGGER.error(e);
+                  break;
                }
             }
+            catch (Exception e)
+            {
+               ActiveMQServerLogger.LOGGER.error(e);
+            }
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
index 8cf5e6e..1edf561 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
@@ -18,20 +18,26 @@ package org.apache.activemq.core.protocol.stomp;
 
 import java.util.List;
 
-import org.apache.activemq.api.core.Interceptor;
+import org.apache.activemq.api.core.BaseInterceptor;
 import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
 
-public class StompProtocolManagerFactory implements ProtocolManagerFactory
+public class StompProtocolManagerFactory extends AbstractProtocolManagerFactory<StompFrameInterceptor>
 {
    public static final String STOMP_PROTOCOL_NAME = "STOMP";
 
    private static String[] SUPPORTED_PROTOCOLS = {STOMP_PROTOCOL_NAME};
 
-   public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
+   public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<StompFrameInterceptor> incomingInterceptors, List<StompFrameInterceptor> outgoingInterceptors)
    {
-      return new StompProtocolManager(server, incomingInterceptors, outgoingInterceptors);
+      return new StompProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
+   }
+
+   @Override
+   public List<StompFrameInterceptor> filterInterceptors(List<BaseInterceptor> interceptors)
+   {
+      return filterInterceptors(StompFrameInterceptor.class, interceptors);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java
index 0785e3a..0768ccf 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java
@@ -25,11 +25,12 @@ import java.util.concurrent.RejectedExecutionException;
 
 import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.BaseInterceptor;
 import org.apache.activemq.api.core.Interceptor;
 import org.apache.activemq.api.core.Pair;
 import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.api.core.client.TopologyMember;
 import org.apache.activemq.core.config.Configuration;
 import org.apache.activemq.core.protocol.ServerPacketDecoder;
@@ -53,11 +54,12 @@ import org.apache.activemq.core.server.ActiveMQServerLogger;
 import org.apache.activemq.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.spi.core.protocol.MessageConverter;
 import org.apache.activemq.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.spi.core.remoting.Acceptor;
 import org.apache.activemq.spi.core.remoting.Connection;
 
-class CoreProtocolManager implements ProtocolManager
+class CoreProtocolManager implements ProtocolManager<Interceptor>
 {
    private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
 
@@ -67,8 +69,12 @@ class CoreProtocolManager implements ProtocolManager
 
    private final List<Interceptor> outgoingInterceptors;
 
-   CoreProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
+   private final CoreProtocolManagerFactory protocolManagerFactory;
+
+   CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
    {
+      this.protocolManagerFactory = factory;
+
       this.server = server;
 
       this.incomingInterceptors = incomingInterceptors;
@@ -76,8 +82,26 @@ class CoreProtocolManager implements ProtocolManager
       this.outgoingInterceptors = outgoingInterceptors;
    }
 
+
+   @Override
+   public ProtocolManagerFactory<Interceptor> getFactory()
+   {
+      return protocolManagerFactory;
+   }
+
+   @Override
+   public void updateInterceptors(List<BaseInterceptor> incoming, List<BaseInterceptor> outgoing)
+   {
+      this.incomingInterceptors.clear();
+      this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
+
+      this.outgoingInterceptors.clear();
+      this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
+   }
+
    /**
     * no need to implement this now
+    *
     * @return
     */
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java
index e9d0ac5..147abe0 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java
@@ -18,19 +18,36 @@ package org.apache.activemq.core.protocol.core.impl;
 
 import java.util.List;
 
+import org.apache.activemq.api.core.BaseInterceptor;
 import org.apache.activemq.api.core.Interceptor;
 import org.apache.activemq.api.core.client.ActiveMQClient;
 import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
 
-public class CoreProtocolManagerFactory implements ProtocolManagerFactory
+public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor>
 {
    private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL};
 
+   /**
+    * {@inheritDoc} *
+    * @param server
+    * @param incomingInterceptors
+    * @param outgoingInterceptors
+    * @return
+    */
    public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
    {
-      return new CoreProtocolManager(server, incomingInterceptors, outgoingInterceptors);
+      return new CoreProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
+   }
+
+   @Override
+   public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors)
+   {
+      // This is using this tool method
+      // it wouldn't be possible to write a generic method without this class parameter
+      // and I didn't want to bloat the cllaers for this
+      return filterInterceptors(Interceptor.class, interceptors);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java
index 10b0106..147b39a 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.activemq.api.core.ActiveMQBuffer;
 import org.apache.activemq.api.core.ActiveMQException;
 import org.apache.activemq.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.api.core.BaseInterceptor;
 import org.apache.activemq.api.core.Interceptor;
 import org.apache.activemq.api.core.TransportConfiguration;
 import org.apache.activemq.core.config.Configuration;
@@ -53,8 +54,8 @@ import org.apache.activemq.core.server.ActiveMQServer;
 import org.apache.activemq.core.server.ActiveMQServerLogger;
 import org.apache.activemq.core.server.cluster.ClusterConnection;
 import org.apache.activemq.core.server.cluster.ClusterManager;
-import org.apache.activemq.core.server.impl.ServiceRegistry;
 import org.apache.activemq.core.server.impl.ServerSessionImpl;
+import org.apache.activemq.core.server.impl.ServiceRegistry;
 import org.apache.activemq.core.server.management.ManagementService;
 import org.apache.activemq.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.spi.core.protocol.ProtocolManager;
@@ -65,9 +66,9 @@ import org.apache.activemq.spi.core.remoting.AcceptorFactory;
 import org.apache.activemq.spi.core.remoting.BufferHandler;
 import org.apache.activemq.spi.core.remoting.Connection;
 import org.apache.activemq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.apache.activemq.utils.ActiveMQThreadFactory;
 import org.apache.activemq.utils.ClassloadingUtil;
 import org.apache.activemq.utils.ConfigurationHelper;
-import org.apache.activemq.utils.ActiveMQThreadFactory;
 import org.apache.activemq.utils.ReusableLatch;
 
 public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener
@@ -84,9 +85,9 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
 
    private final Set<TransportConfiguration> acceptorsConfig;
 
-   private final List<Interceptor> incomingInterceptors = new CopyOnWriteArrayList<Interceptor>();
+   private final List<BaseInterceptor> incomingInterceptors = new CopyOnWriteArrayList<>();
 
-   private final List<Interceptor> outgoingInterceptors = new CopyOnWriteArrayList<Interceptor>();
+   private final List<BaseInterceptor> outgoingInterceptors = new CopyOnWriteArrayList<>();
 
    private final Map<String, Acceptor> acceptors = new HashMap<String, Acceptor>();
 
@@ -147,7 +148,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
 
       ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0]);
       this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0],
-                           coreProtocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors));
+                           coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors),
+                                                                            coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
 
       if (config.isResolveProtocols())
       {
@@ -160,7 +162,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
                for (String protocol : protocols)
                {
                   ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol);
-                  protocolMap.put(protocol, next.createProtocolManager(server, incomingInterceptors, outgoingInterceptors));
+                  protocolMap.put(protocol, next.createProtocolManager(server, next.filterInterceptors(incomingInterceptors),
+                                                                       next.filterInterceptors(outgoingInterceptors)));
                }
             }
          }
@@ -190,11 +193,11 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
       outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors());
    }
 
-   private void addReflectivelyInstantiatedInterceptors(List<String> classNames, List<Interceptor> interceptors)
+   private void addReflectivelyInstantiatedInterceptors(List<String> classNames, List<BaseInterceptor> interceptors)
    {
       for (String className : classNames)
       {
-         Interceptor interceptor = ((Interceptor) safeInitNewInstance(className));
+         BaseInterceptor interceptor = ((BaseInterceptor) safeInitNewInstance(className));
          interceptors.add(interceptor);
       }
    }
@@ -221,8 +224,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
       // to support many hundreds of connections, but the main thread pool must be kept small for better performance
 
       ThreadFactory tFactory = new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() +
-                                                           "-" +
-                                                           System.identityHashCode(this), false, tccl);
+                                                            "-" +
+                                                            System.identityHashCode(this), false, tccl);
 
       threadPool = Executors.newCachedThreadPool(tFactory);
 
@@ -620,24 +623,43 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
    public void addIncomingInterceptor(final Interceptor interceptor)
    {
       incomingInterceptors.add(interceptor);
+
+      updateProtocols();
    }
 
    @Override
    public boolean removeIncomingInterceptor(final Interceptor interceptor)
    {
-      return incomingInterceptors.remove(interceptor);
+      if (incomingInterceptors.remove(interceptor))
+      {
+         updateProtocols();
+         return true;
+      }
+      else
+      {
+         return false;
+      }
    }
 
    @Override
    public void addOutgoingInterceptor(final Interceptor interceptor)
    {
       outgoingInterceptors.add(interceptor);
+      updateProtocols();
    }
 
    @Override
    public boolean removeOutgoingInterceptor(final Interceptor interceptor)
    {
-      return outgoingInterceptors.remove(interceptor);
+      if (outgoingInterceptors.remove(interceptor))
+      {
+         updateProtocols();
+         return true;
+      }
+      else
+      {
+         return false;
+      }
    }
 
    private ClusterConnection lookupClusterConnection(TransportConfiguration acceptorConfig)
@@ -803,4 +825,13 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
       });
    }
 
+   protected void updateProtocols()
+   {
+      for (ProtocolManager<?> protocolManager : this.protocolMap.values())
+      {
+         protocolManager.updateInterceptors(incomingInterceptors, outgoingInterceptors);
+      }
+
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java
new file mode 100644
index 0000000..df2d555
--- /dev/null
+++ b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.spi.core.protocol;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.activemq.api.core.BaseInterceptor;
+
+public abstract class AbstractProtocolManagerFactory<P extends BaseInterceptor> implements ProtocolManagerFactory<P>
+{
+
+   /**
+    * This method exists because java templates won't store the type of P at runtime.
+    * So it's not possible to write a generic method with having the Class Type.
+    * This will serve as a tool for sub classes to filter properly* * *
+    *
+    * @param type
+    * @param listIn
+    * @return
+    */
+   protected List<P> filterInterceptors(Class<P> type, List<? extends BaseInterceptor> listIn)
+   {
+      if (listIn == null)
+      {
+         return Collections.emptyList();
+      }
+      else
+      {
+         CopyOnWriteArrayList<P> listOut = new CopyOnWriteArrayList();
+         for (BaseInterceptor<?> in : listIn)
+         {
+            if (type.isInstance(in))
+            {
+               listOut.add((P) in);
+            }
+         }
+         return listOut;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java
index 254ac00..c07d1b7 100644
--- a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java
+++ b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java
@@ -16,14 +16,27 @@
  */
 package org.apache.activemq.spi.core.protocol;
 
+import java.util.List;
+
 import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.BaseInterceptor;
 import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.spi.core.remoting.Acceptor;
 import org.apache.activemq.spi.core.remoting.Connection;
 
-public interface ProtocolManager
+public interface ProtocolManager<P extends BaseInterceptor>
 {
+   ProtocolManagerFactory<P> getFactory();
+
+   /**
+    * This method will receive all the interceptors on the system and you should filter them out *
+    *
+    * @param incomingInterceptors
+    * @param outgoingInterceptors
+    */
+   void updateInterceptors(List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors);
+
    ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection);
 
    void removeHandler(final String name);
@@ -37,6 +50,7 @@ public interface ProtocolManager
    /**
     * Gets the Message Converter towards ActiveMQ.
     * Notice this being null means no need to convert
+    *
     * @return
     */
    MessageConverter getConverter();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java
index 25f3d62..69f2426 100644
--- a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java
+++ b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java
@@ -18,12 +18,28 @@ package org.apache.activemq.spi.core.protocol;
 
 import java.util.List;
 
-import org.apache.activemq.api.core.Interceptor;
+import org.apache.activemq.api.core.BaseInterceptor;
 import org.apache.activemq.core.server.ActiveMQServer;
 
-public interface ProtocolManagerFactory
+public interface ProtocolManagerFactory<P extends BaseInterceptor>
 {
-   ProtocolManager createProtocolManager(ActiveMQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors);
+   /**
+    * When you create the ProtocolManager, you should filter out any interceptors that won't belong
+    * to this Protocol.
+    * For example don't send any core Interceptors {@link org.apache.activemq.api.core.Interceptor} to Stomp * * *
+    * @param server
+    * @param incomingInterceptors
+    * @param outgoingInterceptors
+    * @return
+    */
+   ProtocolManager createProtocolManager(ActiveMQServer server, List<P> incomingInterceptors, List<P> outgoingInterceptors);
+
+   /**
+    * This should get the entire list and only return the ones this factory can deal with *
+    * @param interceptors
+    * @return
+    */
+   List<P> filterInterceptors(List<BaseInterceptor> interceptors);
 
    String[] getProtocols();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/docs/user-manual/en/intercepting-operations.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/intercepting-operations.md b/docs/user-manual/en/intercepting-operations.md
index 145aba2..ad03688 100644
--- a/docs/user-manual/en/intercepting-operations.md
+++ b/docs/user-manual/en/intercepting-operations.md
@@ -20,12 +20,12 @@ public interface Interceptor
 }
 ```
 
-For stomp protocol an interceptor must extend the `StompFrameInterceptor class`:
+For stomp protocol an interceptor must implement the `StompFrameInterceptor class`:
 
 ``` java
 package org.apache.activemq.core.protocol.stomp;
 
-public abstract class StompFrameInterceptor
+public interface StompFrameInterceptor
 {
    public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
index 8df0879..17a1943 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
@@ -26,9 +26,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.activemq.api.core.Interceptor;
 import org.apache.activemq.api.core.TransportConfiguration;
 import org.apache.activemq.api.core.client.ActiveMQClient;
 import org.apache.activemq.core.config.Configuration;
+import org.apache.activemq.core.protocol.core.Packet;
 import org.apache.activemq.core.protocol.stomp.Stomp;
 import org.apache.activemq.core.protocol.stomp.StompFrame;
 import org.apache.activemq.core.protocol.stomp.StompFrameInterceptor;
@@ -820,11 +822,25 @@ public class ExtraStompTest extends StompTestBase
       return server;
    }
 
-   static List<StompFrame> incomingInterceptedFrames = new ArrayList<StompFrame>();
-   static List<StompFrame> outgoingInterceptedFrames = new ArrayList<StompFrame>();
 
-   public static class MyIncomingStompFrameInterceptor extends StompFrameInterceptor
+   public static class MyCoreInterceptor implements Interceptor
    {
+      static List<Packet> incomingInterceptedFrames = new ArrayList<Packet>();
+
+      @Override
+      public boolean intercept(Packet packet, RemotingConnection connection)
+      {
+         incomingInterceptedFrames.add(packet);
+         return true;
+      }
+
+   }
+
+
+   public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor
+   {
+      static List<StompFrame> incomingInterceptedFrames = new ArrayList<StompFrame>();
+
       @Override
       public boolean intercept(StompFrame stompFrame, RemotingConnection connection)
       {
@@ -834,8 +850,12 @@ public class ExtraStompTest extends StompTestBase
       }
    }
 
-   public static class MyOutgoingStompFrameInterceptor extends StompFrameInterceptor
+
+   public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor
    {
+
+      static List<StompFrame> outgoingInterceptedFrames = new ArrayList<StompFrame>();
+
       @Override
       public boolean intercept(StompFrame stompFrame, RemotingConnection connection)
       {
@@ -848,17 +868,23 @@ public class ExtraStompTest extends StompTestBase
    @Test
    public void stompFrameInterceptor() throws Exception
    {
+      MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear();
+      MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear();
       try
       {
          List<String> incomingInterceptorList = new ArrayList<String>();
          incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyIncomingStompFrameInterceptor");
+         incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyCoreInterceptor");
          List<String> outgoingInterceptorList = new ArrayList<String>();
          outgoingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyOutgoingStompFrameInterceptor");
 
          server = createServerWithStompInterceptor(incomingInterceptorList, outgoingInterceptorList);
          server.start();
 
-         setUpAfterServer();
+         setUpAfterServer(); // This will make some calls through core
+
+         // So we clear them here
+         MyCoreInterceptor.incomingInterceptedFrames.clear();
 
          String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
          sendFrame(frame);
@@ -868,16 +894,20 @@ public class ExtraStompTest extends StompTestBase
          frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
          sendFrame(frame);
 
+         assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size());
          sendMessage(getName());
 
+         // Something was supposed to be called on sendMessages
+         assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0);
+
          receiveFrame(10000);
 
          frame = "SEND\n" + "destination:" +
-                 getQueuePrefix() +
-                 getQueueName() +
-                 "\n\n" +
-                 "Hello World" +
-                 Stomp.NULL;
+            getQueuePrefix() +
+            getQueueName() +
+            "\n\n" +
+            "Hello World" +
+            Stomp.NULL;
          sendFrame(frame);
 
          receiveFrame(10000);
@@ -904,22 +934,32 @@ public class ExtraStompTest extends StompTestBase
       outgoingCommands.add("MESSAGE");
       outgoingCommands.add("MESSAGE");
 
-      Assert.assertEquals(4, incomingInterceptedFrames.size());
-      Assert.assertEquals(3, outgoingInterceptedFrames.size());
+      long timeout = System.currentTimeMillis() + 1000;
+
+      // Things are async, giving some time to things arrive before we actually assert
+      while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 &&
+         MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 &&
+         timeout > System.currentTimeMillis())
+      {
+         Thread.sleep(10);
+      }
+
+      Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size());
+      Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size());
 
-      for (int i = 0; i < incomingInterceptedFrames.size(); i++)
+      for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++)
       {
-         Assert.assertEquals(incomingCommands.get(i), incomingInterceptedFrames.get(i).getCommand());
-         Assert.assertEquals("incomingInterceptedVal", incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp"));
+         Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand());
+         Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp"));
       }
 
-      for (int i = 0; i < outgoingInterceptedFrames.size(); i++)
+      for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++)
       {
-         Assert.assertEquals(outgoingCommands.get(i), outgoingInterceptedFrames.get(i).getCommand());
+         Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand());
       }
 
-      Assert.assertEquals("incomingInterceptedVal", outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp"));
-      Assert.assertEquals("outgoingInterceptedVal", outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
+      Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp"));
+      Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
    }
 
    protected JMSServerManager createServerWithStompInterceptor(List<String> stompIncomingInterceptor, List<String> stompOutgoingInterceptor) throws Exception
@@ -932,22 +972,22 @@ public class ExtraStompTest extends StompTestBase
       TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
 
       Configuration config = createBasicConfig()
-              .setPersistenceEnabled(false)
-              .addAcceptorConfiguration(stompTransport)
-              .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY))
-              .setIncomingInterceptorClassNames(stompIncomingInterceptor)
-              .setOutgoingInterceptorClassNames(stompOutgoingInterceptor);
+         .setPersistenceEnabled(false)
+         .addAcceptorConfiguration(stompTransport)
+         .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY))
+         .setIncomingInterceptorClassNames(stompIncomingInterceptor)
+         .setOutgoingInterceptorClassNames(stompOutgoingInterceptor);
 
       ActiveMQServer hornetQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
 
       JMSConfiguration jmsConfig = new JMSConfigurationImpl();
       jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl()
-              .setName(getQueueName())
-              .setDurable(false)
-              .setBindings(getQueueName()));
+                                                .setName(getQueueName())
+                                                .setDurable(false)
+                                                .setBindings(getQueueName()));
       jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl()
-              .setName(getTopicName())
-              .setBindings(getTopicName()));
+                                                .setName(getTopicName())
+                                                .setBindings(getTopicName()));
       server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
       server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
       return server;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/519a47f0/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
index 2387c71..937b85d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
@@ -182,10 +182,10 @@ public abstract class StompTestBase extends UnitTestCase
       createBootstrap();
 
       connection = connectionFactory.createConnection();
+      connection.start();
       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       queue = session.createQueue(getQueueName());
       topic = session.createTopic(getTopicName());
-      connection.start();
 
    }
 


[2/4] activemq-6 git commit: ACTIVEMQ6-89 Added possibility to intercept stomp frames

Posted by cl...@apache.org.
ACTIVEMQ6-89 Added possibility to intercept stomp frames

https://issues.apache.org/jira/browse/ACTIVEMQ6-89

This was originally contributed at #182. We have squashed the commits and rebased them here
This closes #182


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/b2524b1b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/b2524b1b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/b2524b1b

Branch: refs/heads/master
Commit: b2524b1be49198b1a96b78e70d7777b5214b8a8a
Parents: 9da0a37
Author: nberdikov <nb...@groupon.com>
Authored: Fri Mar 13 10:29:33 2015 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 31 11:23:05 2015 -0400

----------------------------------------------------------------------
 .../protocol/stomp/StompFrameInterceptor.java   |  58 ++++++++
 .../protocol/stomp/StompProtocolManager.java    |  35 ++++-
 .../stomp/StompProtocolManagerFactory.java      |   2 +-
 docs/user-manual/en/intercepting-operations.md  |  11 ++
 .../tests/integration/stomp/ExtraStompTest.java | 138 +++++++++++++++++++
 5 files changed, 242 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/b2524b1b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java
new file mode 100644
index 0000000..9af6e27
--- /dev/null
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.core.protocol.stomp;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.Interceptor;
+import org.apache.activemq.core.protocol.core.Packet;
+import org.apache.activemq.spi.core.protocol.RemotingConnection;
+
+/**
+ * This class is a simple way to intercepting client calls on ActiveMQ using STOMP protocol.
+ * <p>
+ * To add an interceptor to ActiveMQ server, you have to modify the server configuration file
+ * {@literal activemq-configuration.xml}.<br>
+ */
+public abstract class StompFrameInterceptor implements Interceptor
+{
+
+   /**
+    * Intercepts a packet which is received before it is sent to the channel.
+    * By default does not do anything and returns true allowing other interceptors perform logic.
+    *
+    * @param packet     the packet being received
+    * @param connection the connection the packet was received on
+    * @return {@code true} to process the next interceptor and handle the packet,
+    * {@code false} to abort processing of the packet
+    * @throws ActiveMQException
+    */
+   @Override
+   public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
+   {
+      return true;
+   }
+
+  /**
+   * Intercepts a stomp frame sent by a client.
+   *
+   * @param stompFrame     the stomp frame being received
+   * @param connection the connection the stomp frame was received on
+   * @return {@code true} to process the next interceptor and handle the stomp frame,
+   * {@code false} to abort processing of the stomp frame
+   */
+   public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/b2524b1b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
index 1b71987..9fb0362 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
@@ -79,11 +79,14 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
 
    private final Set<String> destinations = new ConcurrentHashSet<String>();
 
+   private final List<Interceptor> incomingInterceptors;
+   private final List<Interceptor> outgoingInterceptors;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public StompProtocolManager(final ActiveMQServer server, final List<Interceptor> interceptors)
+   public StompProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, final List<Interceptor> outgoingInterceptors)
    {
       this.server = server;
       this.executor = server.getExecutorFactory().getExecutor();
@@ -94,6 +97,8 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
          destinations.add(service.getManagementAddress().toString());
          service.addNotificationListener(this);
       }
+      this.incomingInterceptors = incomingInterceptors;
+      this.outgoingInterceptors = outgoingInterceptors;
    }
 
    @Override
@@ -166,6 +171,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
 
          try
          {
+            invokeInterceptors(this.incomingInterceptors, request, conn);
             conn.handleFrame(request);
          }
          finally
@@ -201,6 +207,9 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
       {
          ActiveMQServerLogger.LOGGER.trace("sent " + frame);
       }
+
+      invokeInterceptors(this.outgoingInterceptors, frame, connection);
+
       synchronized (connection)
       {
          if (connection.isDestroyed())
@@ -504,4 +513,28 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
    {
       return server;
    }
+
+   private void invokeInterceptors(List<Interceptor> interceptors, final StompFrame frame, final StompConnection connection)
+   {
+      if (interceptors != null && !interceptors.isEmpty())
+      {
+         for (Interceptor interceptor : interceptors)
+         {
+            if (interceptor instanceof StompFrameInterceptor)
+            {
+               try
+               {
+                  if (!((StompFrameInterceptor)interceptor).intercept(frame, connection))
+                  {
+                     break;
+                  }
+               }
+               catch (Exception e)
+               {
+                  ActiveMQServerLogger.LOGGER.error(e);
+               }
+            }
+         }
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/b2524b1b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
index 72e7734..8cf5e6e 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
@@ -31,7 +31,7 @@ public class StompProtocolManagerFactory implements ProtocolManagerFactory
 
    public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
    {
-      return new StompProtocolManager(server, incomingInterceptors);
+      return new StompProtocolManager(server, incomingInterceptors, outgoingInterceptors);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/b2524b1b/docs/user-manual/en/intercepting-operations.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/intercepting-operations.md b/docs/user-manual/en/intercepting-operations.md
index 4258de2..145aba2 100644
--- a/docs/user-manual/en/intercepting-operations.md
+++ b/docs/user-manual/en/intercepting-operations.md
@@ -20,6 +20,17 @@ public interface Interceptor
 }
 ```
 
+For stomp protocol an interceptor must extend the `StompFrameInterceptor class`:
+
+``` java
+package org.apache.activemq.core.protocol.stomp;
+
+public abstract class StompFrameInterceptor
+{
+   public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection);
+}
+```
+
 The returned boolean value is important:
 
 -   if `true` is returned, the process continues normally

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/b2524b1b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
index 976f74f..8df0879 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
@@ -20,14 +20,18 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.QueueBrowser;
 import javax.jms.TextMessage;
+import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.api.core.TransportConfiguration;
 import org.apache.activemq.api.core.client.ActiveMQClient;
 import org.apache.activemq.core.config.Configuration;
 import org.apache.activemq.core.protocol.stomp.Stomp;
+import org.apache.activemq.core.protocol.stomp.StompFrame;
+import org.apache.activemq.core.protocol.stomp.StompFrameInterceptor;
 import org.apache.activemq.core.protocol.stomp.StompProtocolManagerFactory;
 import org.apache.activemq.core.registry.JndiBindingRegistry;
 import org.apache.activemq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -41,6 +45,7 @@ import org.apache.activemq.jms.server.config.impl.JMSConfigurationImpl;
 import org.apache.activemq.jms.server.config.impl.JMSQueueConfigurationImpl;
 import org.apache.activemq.jms.server.config.impl.TopicConfigurationImpl;
 import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase;
 import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase.TestLargeMessageInputStream;
 import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame;
@@ -815,4 +820,137 @@ public class ExtraStompTest extends StompTestBase
       return server;
    }
 
+   static List<StompFrame> incomingInterceptedFrames = new ArrayList<StompFrame>();
+   static List<StompFrame> outgoingInterceptedFrames = new ArrayList<StompFrame>();
+
+   public static class MyIncomingStompFrameInterceptor extends StompFrameInterceptor
+   {
+      @Override
+      public boolean intercept(StompFrame stompFrame, RemotingConnection connection)
+      {
+         incomingInterceptedFrames.add(stompFrame);
+         stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal");
+         return true;
+      }
+   }
+
+   public static class MyOutgoingStompFrameInterceptor extends StompFrameInterceptor
+   {
+      @Override
+      public boolean intercept(StompFrame stompFrame, RemotingConnection connection)
+      {
+         outgoingInterceptedFrames.add(stompFrame);
+         stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal");
+         return true;
+      }
+   }
+
+   @Test
+   public void stompFrameInterceptor() throws Exception
+   {
+      try
+      {
+         List<String> incomingInterceptorList = new ArrayList<String>();
+         incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyIncomingStompFrameInterceptor");
+         List<String> outgoingInterceptorList = new ArrayList<String>();
+         outgoingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyOutgoingStompFrameInterceptor");
+
+         server = createServerWithStompInterceptor(incomingInterceptorList, outgoingInterceptorList);
+         server.start();
+
+         setUpAfterServer();
+
+         String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+         sendFrame(frame);
+
+         frame = receiveFrame(100000);
+
+         frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
+         sendFrame(frame);
+
+         sendMessage(getName());
+
+         receiveFrame(10000);
+
+         frame = "SEND\n" + "destination:" +
+                 getQueuePrefix() +
+                 getQueueName() +
+                 "\n\n" +
+                 "Hello World" +
+                 Stomp.NULL;
+         sendFrame(frame);
+
+         receiveFrame(10000);
+
+         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+
+         sendFrame(frame);
+
+      }
+      finally
+      {
+         cleanUp();
+         server.stop();
+      }
+
+      List<String> incomingCommands = new ArrayList<String>(4);
+      incomingCommands.add("CONNECT");
+      incomingCommands.add("SUBSCRIBE");
+      incomingCommands.add("SEND");
+      incomingCommands.add("DISCONNECT");
+
+      List<String> outgoingCommands = new ArrayList<String>(3);
+      outgoingCommands.add("CONNECTED");
+      outgoingCommands.add("MESSAGE");
+      outgoingCommands.add("MESSAGE");
+
+      Assert.assertEquals(4, incomingInterceptedFrames.size());
+      Assert.assertEquals(3, outgoingInterceptedFrames.size());
+
+      for (int i = 0; i < incomingInterceptedFrames.size(); i++)
+      {
+         Assert.assertEquals(incomingCommands.get(i), incomingInterceptedFrames.get(i).getCommand());
+         Assert.assertEquals("incomingInterceptedVal", incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp"));
+      }
+
+      for (int i = 0; i < outgoingInterceptedFrames.size(); i++)
+      {
+         Assert.assertEquals(outgoingCommands.get(i), outgoingInterceptedFrames.get(i).getCommand());
+      }
+
+      Assert.assertEquals("incomingInterceptedVal", outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp"));
+      Assert.assertEquals("outgoingInterceptedVal", outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
+   }
+
+   protected JMSServerManager createServerWithStompInterceptor(List<String> stompIncomingInterceptor, List<String> stompOutgoingInterceptor) throws Exception
+   {
+
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
+      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+      params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
+      TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+
+      Configuration config = createBasicConfig()
+              .setPersistenceEnabled(false)
+              .addAcceptorConfiguration(stompTransport)
+              .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY))
+              .setIncomingInterceptorClassNames(stompIncomingInterceptor)
+              .setOutgoingInterceptorClassNames(stompOutgoingInterceptor);
+
+      ActiveMQServer hornetQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
+
+      JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+      jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl()
+              .setName(getQueueName())
+              .setDurable(false)
+              .setBindings(getQueueName()));
+      jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl()
+              .setName(getTopicName())
+              .setBindings(getTopicName()));
+      server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+      server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
+      return server;
+   }
+
 }


[3/4] activemq-6 git commit: Improving a test that failed due to what seemed a race

Posted by cl...@apache.org.
Improving a test that failed due to what seemed a race


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/2f819a63
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/2f819a63
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/2f819a63

Branch: refs/heads/master
Commit: 2f819a63e7afa734d096156277358933c3311c45
Parents: 519a47f
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Mar 16 19:17:52 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 31 11:23:06 2015 -0400

----------------------------------------------------------------------
 .../activemq/tests/unit/core/server/impl/QueueImplTest.java | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/2f819a63/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java
index 8286ab6..3be1f61 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java
@@ -625,8 +625,13 @@ public class QueueImplTest extends UnitTestCase
       queue.resume();
 
       // Need to make sure the consumers will receive the messages before we do these assertions
-      long timeout = System.currentTimeMillis() + 1000;
-      while (cons1.getReferences().size() != numMessages / 2 && cons2.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis())
+      long timeout = System.currentTimeMillis() + 5000;
+      while (cons1.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis())
+      {
+         Thread.sleep(1);
+      }
+
+      while (cons2.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis())
       {
          Thread.sleep(1);
       }


[4/4] activemq-6 git commit: This closes #191 - Stomp interceptors

Posted by cl...@apache.org.
This closes #191 - Stomp interceptors


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/c65ca252
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/c65ca252
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/c65ca252

Branch: refs/heads/master
Commit: c65ca252f89ed8630c87a7b823b608b790347630
Parents: 9da0a37 2f819a6
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 31 11:24:30 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 31 11:24:30 2015 -0400

----------------------------------------------------------------------
 .../activemq/api/core/BaseInterceptor.java      |  35 ++++
 .../apache/activemq/api/core/Interceptor.java   |  13 +-
 .../protocol/proton/ProtonProtocolManager.java  |  26 ++-
 .../proton/ProtonProtocolManagerFactory.java    |  15 +-
 .../openwire/OpenWireProtocolManager.java       | 141 ++++++++-------
 .../OpenWireProtocolManagerFactory.java         |  14 +-
 .../protocol/stomp/StompFrameInterceptor.java   |  29 +++
 .../protocol/stomp/StompProtocolManager.java    |  61 ++++++-
 .../stomp/StompProtocolManagerFactory.java      |  16 +-
 .../protocol/core/impl/CoreProtocolManager.java |  30 +++-
 .../core/impl/CoreProtocolManagerFactory.java   |  23 ++-
 .../server/impl/RemotingServiceImpl.java        |  55 ++++--
 .../AbstractProtocolManagerFactory.java         |  57 ++++++
 .../spi/core/protocol/ProtocolManager.java      |  16 +-
 .../core/protocol/ProtocolManagerFactory.java   |  22 ++-
 docs/user-manual/en/intercepting-operations.md  |  11 ++
 .../tests/integration/stomp/ExtraStompTest.java | 178 +++++++++++++++++++
 .../tests/integration/stomp/StompTestBase.java  |   2 +-
 .../unit/core/server/impl/QueueImplTest.java    |   9 +-
 19 files changed, 634 insertions(+), 119 deletions(-)
----------------------------------------------------------------------