You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/05/03 16:17:11 UTC

[1/3] activemq-artemis git commit: ARTEMIS-898 - Adding Plugin Support

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 303d97c76 -> 5391d42e4


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
new file mode 100644
index 0000000..9f6b4ea
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CorePluginTest extends JMSTestBase {
+
+   private Queue queue;
+
+   private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+   private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+   public static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
+
+   @Override
+   protected Configuration createDefaultConfig(boolean netty) throws Exception {
+      Configuration config = super.createDefaultConfig(netty);
+      config.registerBrokerPlugin(verifier);
+      return config;
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      queue = createQueue("queue1");
+   }
+
+
+   @Test
+   public void testSendReceive() throws Exception {
+      conn = cf.createConnection();
+      conn.start();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer prod = sess.createProducer(queue);
+      MessageConsumer cons = sess.createConsumer(queue);
+
+      TextMessage msg1 = sess.createTextMessage("test");
+      prod.send(msg1);
+      TextMessage received1 = (TextMessage)cons.receive(1000);
+      assertNotNull(received1);
+
+      conn.close();
+
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
+            BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE);
+      verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
+            BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+            BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
+            AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
+      verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
+            AFTER_CLOSE_SESSION);
+   }
+
+   @Test
+   public void testDestroyQueue() throws Exception {
+      conn = cf.createConnection();
+      conn.start();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      sess.createProducer(queue);
+      conn.close();
+
+      server.destroyQueue(new SimpleString(queue.getQueueName()));
+
+      verifier.validatePluginMethodsEquals(1, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_DESTROY_QUEUE,
+            AFTER_DESTROY_QUEUE);
+   }
+
+   @Test
+   public void testMessageExpireServer() throws Exception {
+      server.registerBrokerPlugin(new ExpiredPluginVerifier());
+
+      conn = cf.createConnection();
+      conn.setClientID("test");
+      conn.start();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer prod = sess.createProducer(queue);
+      prod.setTimeToLive(1);
+      MessageConsumer cons = sess.createConsumer(queue);
+      Thread.sleep(100);
+      TextMessage msg1 = sess.createTextMessage("test");
+      prod.send(msg1);
+      Thread.sleep(100);
+      assertNull(cons.receive(100));
+
+      conn.close();
+
+      verifier.validatePluginMethodsEquals(0, BEFORE_DELIVER, AFTER_DELIVER, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
+            BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+            BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED,
+            BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
+            AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED);
+      verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
+            AFTER_CLOSE_SESSION);
+
+   }
+
+   @Test
+   public void testMessageExpireClient() throws Exception {
+      server.registerBrokerPlugin(new ExpiredPluginVerifier());
+
+      conn = cf.createConnection();
+      conn.start();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer prod = sess.createProducer(queue);
+      prod.setTimeToLive(500);
+      MessageConsumer cons = sess.createConsumer(queue);
+
+      TextMessage msg1 = sess.createTextMessage("test");
+      prod.send(msg1);
+      Thread.sleep(500);
+      assertNull(cons.receive(500));
+
+      conn.close();
+
+      verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
+            BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+            BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
+            AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED);
+      verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
+            AFTER_CLOSE_SESSION);
+
+   }
+
+   @Test
+   public void testSimpleBridge() throws Exception {
+      server.stop();
+      ActiveMQServer server0;
+      ActiveMQServer server1;
+
+      Map<String, Object> server0Params = new HashMap<>();
+      server0 = createClusteredServerWithParams(false, 0, false, server0Params);
+
+      Map<String, Object> server1Params = new HashMap<>();
+      server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      server1 = createClusteredServerWithParams(false, 1, false, server1Params);
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+
+      TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params);
+
+      HashMap<String, TransportConfiguration> connectors = new HashMap<>();
+      connectors.put(server1tc.getName(), server1tc);
+      server0.getConfiguration().setConnectorConfigurations(connectors);
+      server0.registerBrokerPlugin(verifier);
+
+      ArrayList<String> connectorConfig = new ArrayList<>();
+      connectorConfig.add(server1tc.getName());
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1")
+                                                                         .setQueueName(queueName0)
+                                                                         .setForwardingAddress(forwardAddress)
+                                                                         .setRetryInterval(1000)
+                                                                         .setReconnectAttemptsOnSameNode(-1)
+                                                                         .setUseDuplicateDetection(false)
+                                                                         .setStaticConnectors(connectorConfig);
+
+      List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
+      bridgeConfigs.add(bridgeConfiguration);
+      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+      CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
+      List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
+      queueConfigs0.add(queueConfig0);
+      server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+      server1.start();
+      server0.start();
+
+      verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+
+      server0.stop();
+      server1.stop();
+
+   }
+
+   private class ExpiredPluginVerifier implements ActiveMQServerPlugin {
+
+      @Override
+      public void messageAcknowledged(MessageReference ref, AckReason reason) {
+         assertEquals(AckReason.EXPIRED, reason);
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
new file mode 100644
index 0000000..14aa4a1
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -0,0 +1,276 @@
+/**
+ *
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.cluster.Bridge;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class MethodCalledVerifier implements ActiveMQServerPlugin {
+
+   private final Map<String, AtomicInteger> methodCalls;
+
+   public static final String AFTER_CREATE_CONNECTION = "afterCreateConnection";
+   public static final String AFTER_DESTROY_CONNECTION = "afterDestroyConnection";
+   public static final String BEFORE_CREATE_SESSION = "beforeCreateSession";
+   public static final String AFTER_CREATE_SESSION = "afterCreateSession";
+   public static final String BEFORE_CLOSE_SESSION = "beforeCloseSession";
+   public static final String AFTER_CLOSE_SESSION = "afterCloseSession";
+   public static final String BEFORE_SESSION_METADATA_ADDED = "beforeSessionMetadataAdded";
+   public static final String AFTER_SESSION_METADATA_ADDED = "afterSessionMetadataAdded";
+   public static final String BEFORE_CREATE_CONSUMER = "beforeCreateConsumer";
+   public static final String AFTER_CREATE_CONSUMER = "afterCreateConsumer";
+   public static final String BEFORE_CLOSE_CONSUMER = "beforeCloseConsumer";
+   public static final String AFTER_CLOSE_CONSUMER = "afterCloseConsumer";
+   public static final String BEFORE_CREATE_QUEUE = "beforeCreateQueue";
+   public static final String AFTER_CREATE_QUEUE = "afterCreateQueue";
+   public static final String BEFORE_DESTROY_QUEUE = "beforeDestroyQueue";
+   public static final String AFTER_DESTROY_QUEUE = "afterDestroyQueue";
+   public static final String MESSAGE_EXPIRED = "messageExpired";
+   public static final String MESSAGE_ACKED = "messageAcknowledged";
+   public static final String BEFORE_SEND = "beforeSend";
+   public static final String AFTER_SEND = "afterSend";
+   public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute";
+   public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute";
+   public static final String BEFORE_DELIVER = "beforeDeliver";
+   public static final String AFTER_DELIVER = "afterDeliver";
+   public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
+   public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge";
+
+   /**
+    * @param methods
+    */
+   public MethodCalledVerifier(Map<String, AtomicInteger> methodCalls) {
+      super();
+      this.methodCalls = methodCalls;
+   }
+
+   @Override
+   public void afterCreateConnection(RemotingConnection connection) {
+      Preconditions.checkNotNull(connection);
+      methodCalled(AFTER_CREATE_CONNECTION);
+   }
+
+   @Override
+   public void afterDestroyConnection(RemotingConnection connection) {
+      Preconditions.checkNotNull(connection);
+      methodCalled(AFTER_DESTROY_CONNECTION);
+   }
+
+   @Override
+   public void beforeCreateSession(String name, String username, int minLargeMessageSize, RemotingConnection connection,
+                                   boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa,
+                                   String defaultAddress, SessionCallback callback, boolean autoCreateQueues,
+                                   OperationContext context, Map<SimpleString, RoutingType> prefixes) {
+      Preconditions.checkNotNull(connection);
+      methodCalled(BEFORE_CREATE_SESSION);
+   }
+
+   @Override
+   public void afterCreateSession(ServerSession session) {
+      Preconditions.checkNotNull(session);
+      methodCalled(AFTER_CREATE_SESSION);
+   }
+
+   @Override
+   public void beforeCloseSession(ServerSession session, boolean failed) {
+      Preconditions.checkNotNull(session);
+      methodCalled(BEFORE_CLOSE_SESSION);
+   }
+
+   @Override
+   public void afterCloseSession(ServerSession session, boolean failed) {
+      Preconditions.checkNotNull(session);
+      methodCalled(AFTER_CLOSE_SESSION);
+   }
+
+   @Override
+   public void beforeSessionMetadataAdded(ServerSession session, String key, String data) {
+      Preconditions.checkNotNull(key);
+      methodCalled(BEFORE_SESSION_METADATA_ADDED);
+   }
+
+   @Override
+   public void afterSessionMetadataAdded(ServerSession session, String key, String data) {
+      Preconditions.checkNotNull(key);
+      methodCalled(AFTER_SESSION_METADATA_ADDED);
+   }
+
+   @Override
+   public void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
+                                    boolean browseOnly, boolean supportLargeMessage) {
+      Preconditions.checkNotNull(queueName);
+      methodCalled(BEFORE_CREATE_CONSUMER);
+   }
+
+   @Override
+   public void afterCreateConsumer(ServerConsumer consumer) {
+      Preconditions.checkNotNull(consumer);
+      methodCalled(AFTER_CREATE_CONSUMER);
+   }
+
+   @Override
+   public void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
+      Preconditions.checkNotNull(consumer);
+      methodCalled(BEFORE_CLOSE_CONSUMER);
+   }
+
+   @Override
+   public void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
+      Preconditions.checkNotNull(consumer);
+      methodCalled(AFTER_CLOSE_CONSUMER);
+   }
+
+   @Override
+   public void beforeCreateQueue(QueueConfig queueConfig) {
+      Preconditions.checkNotNull(queueConfig);
+      methodCalled(BEFORE_CREATE_QUEUE);
+   }
+
+   @Override
+   public void afterCreateQueue(org.apache.activemq.artemis.core.server.Queue queue) {
+      Preconditions.checkNotNull(queue);
+      methodCalled(AFTER_CREATE_QUEUE);
+   }
+
+   @Override
+   public void beforeDestroyQueue(SimpleString queueName, SecurityAuth session, boolean checkConsumerCount,
+         boolean removeConsumers, boolean autoDeleteAddress) {
+      Preconditions.checkNotNull(queueName);
+      methodCalled(BEFORE_DESTROY_QUEUE);
+   }
+
+   @Override
+   public void afterDestroyQueue(Queue queue, SimpleString address, SecurityAuth session, boolean checkConsumerCount,
+         boolean removeConsumers, boolean autoDeleteAddress) {
+      Preconditions.checkNotNull(queue);
+      methodCalled(AFTER_DESTROY_QUEUE);
+   }
+
+   @Override
+   public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
+      Preconditions.checkNotNull(message);
+      methodCalled(MESSAGE_EXPIRED);
+   }
+
+   @Override
+   public void messageAcknowledged(MessageReference ref, AckReason reason) {
+      Preconditions.checkNotNull(ref);
+      Preconditions.checkNotNull(reason);
+      methodCalled(MESSAGE_ACKED);
+   }
+
+   @Override
+   public void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
+      Preconditions.checkNotNull(message);
+      methodCalled(BEFORE_SEND);
+   }
+
+   @Override
+   public void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
+                         RoutingStatus result) {
+      Preconditions.checkNotNull(message);
+      Preconditions.checkNotNull(result);
+      methodCalled(AFTER_SEND);
+   }
+
+   @Override
+   public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
+      Preconditions.checkNotNull(message);
+      Preconditions.checkNotNull(context);
+      methodCalled(BEFORE_MESSAGE_ROUTE);
+   }
+
+   @Override
+   public void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
+                                 RoutingStatus result) {
+      Preconditions.checkNotNull(message);
+      Preconditions.checkNotNull(context);
+      Preconditions.checkNotNull(result);
+      methodCalled(AFTER_MESSAGE_ROUTE);
+   }
+
+   @Override
+   public void beforeDeliver(MessageReference reference) {
+      Preconditions.checkNotNull(reference);
+      methodCalled(BEFORE_DELIVER);
+   }
+
+   @Override
+   public void afterDeliver(MessageReference reference) {
+      Preconditions.checkNotNull(reference);
+      methodCalled(AFTER_DELIVER);
+   }
+
+   @Override
+   public void beforeDeployBridge(BridgeConfiguration config) {
+      Preconditions.checkNotNull(config);
+      methodCalled(BEFORE_DEPLOY_BRIDGE);
+   }
+
+   @Override
+   public void afterDeployBridge(Bridge bridge) {
+      Preconditions.checkNotNull(bridge);
+      methodCalled(AFTER_DEPLOY_BRIDGE);
+   }
+
+   public void validatePluginMethodsEquals(int count, String... names) {
+      Arrays.asList(names).forEach(name -> {
+         assertEquals("validating method " + name, count, methodCalls.getOrDefault(name, new AtomicInteger()).get());
+      });
+   }
+
+   public void validatePluginMethodsAtLeast(int count, String... names) {
+      Arrays.asList(names).forEach(name -> {
+         assertTrue("validating method " + name, count <= methodCalls.getOrDefault(name, new AtomicInteger()).get());
+      });
+   }
+
+   private void methodCalled(String name) {
+      methodCalls.computeIfAbsent(name, k -> new AtomicInteger()).incrementAndGet();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
new file mode 100644
index 0000000..5e7f127
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MqttPluginTest extends MQTTTestSupport {
+
+
+   private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+   private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
+      sessions.setAccessible(true);
+      sessions.set(null, new ConcurrentHashMap<>());
+
+      Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
+      connectedClients.setAccessible(true);
+      connectedClients.set(null, new ConcurrentHashSet<>());
+      super.setUp();
+
+   }
+
+   @Override
+   public void configureBroker() throws Exception {
+      super.configureBroker();
+      server.registerBrokerPlugin(verifier);
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testSendAndReceiveMQTT() throws Exception {
+      final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+      initializeConnection(subscriptionProvider);
+
+      subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
+
+      final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES);
+
+      Thread thread = new Thread(new Runnable() {
+         @Override
+         public void run() {
+            for (int i = 0; i < NUM_MESSAGES; i++) {
+               try {
+                  byte[] payload = subscriptionProvider.receive(10000);
+                  assertNotNull("Should get a message", payload);
+                  latch.countDown();
+               } catch (Exception e) {
+                  e.printStackTrace();
+                  break;
+               }
+
+            }
+         }
+      });
+      thread.start();
+
+      final MQTTClientProvider publishProvider = getMQTTClientProvider();
+      initializeConnection(publishProvider);
+
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         String payload = "Message " + i;
+         publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
+      }
+
+      latch.await(10, TimeUnit.SECONDS);
+      assertEquals(0, latch.getCount());
+      subscriptionProvider.disconnect();
+      publishProvider.disconnect();
+
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
+            AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
+            AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
+            MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+            AFTER_DELIVER);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java
new file mode 100644
index 0000000..afb6841
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+
+public class OpenwirePluginTest extends BasicOpenWireTest {
+
+   private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+   private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+
+   @Override
+   protected ActiveMQServer createServer(boolean realFiles, Configuration configuration, long pageSize,
+                                         long maxAddressSize, Map<String, AddressSettings> settings) {
+      ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
+      server.registerBrokerPlugin(verifier);
+      return server;
+   }
+
+   @Test
+   public void testAckedMessageAreConsumed() throws JMSException {
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("Hello"));
+
+      // Consume the message...
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(1000);
+      assertNotNull(msg);
+
+      // Reset the session.
+      session.close();
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      // Attempt to Consume the message...
+      consumer = session.createConsumer(queue);
+      msg = consumer.receive(1000);
+      assertNull(msg);
+
+      session.close();
+      connection.close();
+
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
+            BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
+            AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
+            AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
+            MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+            AFTER_DELIVER);
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
new file mode 100644
index 0000000..c771272
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StompPluginTest extends StompTestBase {
+
+   private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+   public static final String CLIENT_ID = "myclientid";
+
+   private StompClientConnectionV12 conn;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      conn = (StompClientConnectionV12)StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         boolean connected = conn != null && conn.isConnected();
+         log.debug("Connection 1.2 : " + connected);
+         if (connected) {
+            conn.disconnect();
+         }
+      } finally {
+         super.tearDown();
+      }
+   }
+
+   private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+   private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+
+   @Override
+   protected JMSServerManager createServer() throws Exception {
+      JMSServerManager server = super.createServer();
+      server.getActiveMQServer().registerBrokerPlugin(verifier);
+      return server;
+   }
+
+   @Test
+   public void testSendAndReceive() throws Exception {
+
+      // subscribehoward county escaped
+      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
+      newConn.connect(defUser, defPass);
+      subscribe(newConn, "a-sub");
+
+      send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!");
+      ClientStompFrame frame = newConn.receiveFrame();
+
+      System.out.println("received " + frame);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+
+      // unsub
+      unsubscribe(newConn, "a-sub");
+
+      newConn.disconnect();
+
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
+            AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
+            AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
+            MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+            AFTER_DELIVER);
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
index c0a4112..121a4b0 100644
--- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
+++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
@@ -67,7 +67,10 @@ public class QueueImplTest extends ActiveMQTestBase {
 
    @Test
    public void testScheduledNoConsumer() throws Exception {
-      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
+      QueueImpl queue =
+               new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
+                             false, scheduledExecutor, null, null, null,
+                             Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
 
       // Send one scheduled
 
@@ -132,7 +135,10 @@ public class QueueImplTest extends ActiveMQTestBase {
 
    @Test
    public void testScheduled() throws Exception {
-      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
+      QueueImpl queue =
+               new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
+                             false, scheduledExecutor, null, null, null,
+                             Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
 
       FakeConsumer consumer = null;
 
@@ -230,7 +236,10 @@ public class QueueImplTest extends ActiveMQTestBase {
          public void disconnect() {
          }
       };
-      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
+      QueueImpl queue =
+               new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false,
+                             scheduledExecutor, null, null, null,
+                             Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
       MessageReference messageReference = generateReference(queue, 1);
       queue.addConsumer(consumer);
       messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 78179a8..0a08eb6 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1309,6 +1309,7 @@ public class QueueImplTest extends ActiveMQTestBase {
    }
 
    private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) {
-      return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor, new FakePostOffice(), null, null, executor);
+      return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor,
+                           new FakePostOffice(), null, null, executor, null);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
index 06c7e1e..40c117a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
@@ -40,7 +40,9 @@ public final class FakeQueueFactory implements QueueFactory {
 
    @Override
    public Queue createQueueWith(final QueueConfig config) {
-      return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, null, null, executor);
+      return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(),
+                           config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(),
+                           scheduledExecutor, postOffice, null, null, executor, null);
    }
 
    @Deprecated
@@ -54,7 +56,8 @@ public final class FakeQueueFactory implements QueueFactory {
                             final boolean durable,
                             final boolean temporary,
                             final boolean autoCreated) {
-      return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, null, null, executor);
+      return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated,
+                           scheduledExecutor, postOffice, null, null, executor, null);
    }
 
    @Override


[2/3] activemq-artemis git commit: ARTEMIS-898 - Adding Plugin Support

Posted by cl...@apache.org.
ARTEMIS-898 - Adding Plugin Support

Adding a new ActievMQServerPlugin interface to support adding custom
behavior to the broker at certain events such as connection or session
creation.

https://issues.apache.org/jira/browse/ARTEMIS-898


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1e1ede84
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1e1ede84
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1e1ede84

Branch: refs/heads/master
Commit: 1e1ede84c0483099f27741bc046ef95c08e1d090
Parents: 303d97c
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue May 2 09:46:17 2017 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed May 3 11:21:32 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/config/Configuration.java      |  21 ++
 .../core/config/impl/ConfigurationImpl.java     |  24 ++
 .../core/postoffice/impl/PostOfficeImpl.java    |   4 +
 .../server/impl/RemotingServiceImpl.java        |   7 +-
 .../artemis/core/server/ActiveMQServer.java     |  18 +-
 .../core/server/cluster/ClusterManager.java     |   3 +
 .../core/server/impl/ActiveMQServerImpl.java    |  57 +++-
 .../core/server/impl/LastValueQueue.java        |   6 +-
 .../core/server/impl/QueueFactoryImpl.java      |  19 +-
 .../artemis/core/server/impl/QueueImpl.java     |  31 +-
 .../core/server/impl/ServerConsumerImpl.java    |   7 +
 .../core/server/impl/ServerSessionImpl.java     |  33 +-
 .../server/plugin/ActiveMQPluginRunnable.java   |  24 ++
 .../server/plugin/ActiveMQServerPlugin.java     | 336 +++++++++++++++++++
 .../integration/amqp/AmqpClientTestSupport.java |   7 +
 .../integration/client/HangConsumerTest.java    |  37 +-
 .../client/InterruptedLargeMessageTest.java     |  18 +-
 .../jms/client/TopicCleanupTest.java            |  12 +-
 .../openwire/amq/JmsResourceProvider.java       |   2 +-
 .../integration/plugin/AmqpPluginTest.java      | 131 ++++++++
 .../integration/plugin/CorePluginTest.java      | 257 ++++++++++++++
 .../plugin/MethodCalledVerifier.java            | 276 +++++++++++++++
 .../integration/plugin/MqttPluginTest.java      | 132 ++++++++
 .../integration/plugin/OpenwirePluginTest.java  | 109 ++++++
 .../integration/plugin/StompPluginTest.java     | 126 +++++++
 .../timing/core/server/impl/QueueImplTest.java  |  15 +-
 .../unit/core/server/impl/QueueImplTest.java    |   3 +-
 .../server/impl/fakes/FakeQueueFactory.java     |   7 +-
 28 files changed, 1664 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 7dfb1a5..7da5b02 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
 
@@ -1081,4 +1082,24 @@ public interface Configuration {
    Configuration setNetworkCheckPing6Command(String command);
 
    String getInternalNamingPrefix();
+
+   /**
+    * @param plugins
+    */
+   void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
+
+   /**
+    * @param plugin
+    */
+   void registerBrokerPlugin(ActiveMQServerPlugin plugin);
+
+   /**
+    * @param plugin
+    */
+   void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
+
+   /**
+    * @return
+    */
+   List<ActiveMQServerPlugin> getBrokerPlugins();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 2a538ca..8edeb5b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@@ -63,6 +64,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
 import org.apache.activemq.artemis.utils.Env;
@@ -232,6 +234,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
 
    private List<SecuritySettingPlugin> securitySettingPlugins = new ArrayList<>();
 
+   private final List<ActiveMQServerPlugin> brokerPlugins = new CopyOnWriteArrayList<>();
+
    private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>();
 
    protected List<ConnectorServiceConfiguration> connectorServiceConfigurations = new ArrayList<>();
@@ -1321,6 +1325,26 @@ public class ConfigurationImpl implements Configuration, Serializable {
    }
 
    @Override
+   public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
+      brokerPlugins.addAll(plugins);
+   }
+
+   @Override
+   public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
+      brokerPlugins.add(plugin);
+   }
+
+   @Override
+   public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
+      brokerPlugins.remove(plugin);
+   }
+
+   @Override
+   public List<ActiveMQServerPlugin> getBrokerPlugins() {
+      return brokerPlugins;
+   }
+
+   @Override
    public File getBrokerInstance() {
       if (artemisInstance != null) {
          return artemisInstance;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 2ef7657..a927768 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -783,7 +783,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          }
       } else {
          try {
+            server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
             processRoute(message, context, direct);
+            final RoutingStatus finalResult = result;
+            server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct,
+                  rejectDuplicates, finalResult) : null);
          } catch (ActiveMQAddressFullException e) {
             if (startedTX.get()) {
                context.getTransaction().rollback();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index e0e5b52..7c9c675 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -514,6 +514,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       }
 
       ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null);
 
       if (logger.isTraceEnabled()) {
          logger.trace("Connection created " + connection);
@@ -534,8 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       ConnectionEntry conn = connections.get(connectionID);
 
       if (conn != null && !conn.connection.isSupportReconnect()) {
-         removeConnection(connectionID);
-
+         RemotingConnection removedConnection = removeConnection(connectionID);
+         if (removedConnection != null) {
+            server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null);
+         }
          conn.connection.fail(new ActiveMQRemoteDisconnectException());
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index bfd9aec..e16557f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.server;
 
-import javax.management.MBeanServer;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -24,6 +23,8 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import javax.management.MBeanServer;
+
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -47,6 +48,8 @@ import org.apache.activemq.artemis.core.server.impl.Activation;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.server.reload.ReloadManager;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -186,6 +189,18 @@ public interface ActiveMQServer extends ServiceComponent {
     */
    void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception;
 
+   void registerBrokerPlugin(ActiveMQServerPlugin plugin);
+
+   void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
+
+   void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
+
+   List<ActiveMQServerPlugin> getBrokerPlugins();
+
+   void callBrokerPlugins(ActiveMQPluginRunnable pluginRun);
+
+   boolean hasBrokerPlugins();
+
    void checkQueueCreationLimit(String username) throws Exception;
 
    ServerSession createSession(String name,
@@ -447,4 +462,5 @@ public interface ActiveMQServer extends ServiceComponent {
    void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
 
    String getInternalNamingPrefix();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index d2219c2..70edb68 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -405,6 +405,8 @@ public final class ClusterManager implements ActiveMQComponent {
          return;
       }
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeployBridge(config) : null);
+
       Queue queue = (Queue) binding.getBindable();
 
       ServerLocatorInternal serverLocator;
@@ -478,6 +480,7 @@ public final class ClusterManager implements ActiveMQComponent {
 
       bridge.start();
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeployBridge(bridge) : null);
    }
 
    public static class IncomingInterceptorLookingForExceptionMessage implements Interceptor {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 8482cb3..06964ee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
-import javax.management.MBeanServer;
-import javax.security.cert.X509Certificate;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -48,6 +46,9 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.MBeanServer;
+import javax.security.cert.X509Certificate;
+
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -144,6 +145,8 @@ import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
 import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
 import org.apache.activemq.artemis.core.server.reload.ReloadManager;
 import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
@@ -1309,10 +1312,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       checkSessionLimit(validatedUser);
 
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
+            autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
+
       final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes);
 
       sessions.put(name, session);
 
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateSession(session) : null);
+
       return session;
    }
 
@@ -1705,6 +1713,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          return;
       }
 
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
+            removeConsumers, autoDeleteAddress) : null);
+
       addressSettingsRepository.clearCache();
 
       Binding binding = postOffice.getBinding(queueName);
@@ -1743,6 +1754,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
 
       callPostQueueDeletionCallbacks(address, queueName);
+
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
+            removeConsumers, autoDeleteAddress) : null);
    }
 
    @Override
@@ -1808,6 +1822,38 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
+      configuration.registerBrokerPlugins(plugins);
+   }
+
+   @Override
+   public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
+      configuration.registerBrokerPlugin(plugin);
+   }
+
+   @Override
+   public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
+      configuration.unRegisterBrokerPlugin(plugin);
+   }
+
+   @Override
+   public List<ActiveMQServerPlugin> getBrokerPlugins() {
+      return configuration.getBrokerPlugins();
+   }
+
+   @Override
+   public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) {
+      if (pluginRun != null) {
+         getBrokerPlugins().forEach(plugin -> pluginRun.run(plugin));
+      }
+   }
+
+   @Override
+   public boolean hasBrokerPlugins() {
+      return !getBrokerPlugins().isEmpty();
+   }
+
+   @Override
    public ExecutorFactory getExecutorFactory() {
       return executorFactory;
    }
@@ -2103,7 +2149,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       securityStore = new SecurityStoreImpl(securityRepository, securityManager, configuration.getSecurityInvalidationInterval(), configuration.isSecurityEnabled(), configuration.getClusterUser(), configuration.getClusterPassword(), managementService);
 
-      queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager);
+      queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager, this);
 
       pagingManager = createPagingManager();
 
@@ -2508,6 +2554,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).build();
 
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
+
       final Queue queue = queueFactory.createQueueWith(queueConfig);
 
       if (transientQueue) {
@@ -2550,6 +2598,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       callPostQueueCreationCallbacks(queue.getName());
 
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
+
       return queue;
    }
 
@@ -2763,4 +2813,5 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          deployAddressesFromConfiguration(config);
       }
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index ceec92c..8370839 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -63,8 +64,9 @@ public class LastValueQueue extends QueueImpl {
                          final PostOffice postOffice,
                          final StorageManager storageManager,
                          final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                         final Executor executor) {
-      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+                         final Executor executor,
+                         final ActiveMQServer server) {
+      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 9258a07..3d8ceb1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
@@ -49,17 +50,19 @@ public class QueueFactoryImpl implements QueueFactory {
 
    protected final ExecutorFactory executorFactory;
 
+   protected final ActiveMQServer server;
+
    public QueueFactoryImpl(final ExecutorFactory executorFactory,
                            final ScheduledExecutorService scheduledExecutor,
                            final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                           final StorageManager storageManager) {
-      this.addressSettingsRepository = addressSettingsRepository;
+                           final StorageManager storageManager,
+                           final ActiveMQServer server) {
 
+      this.addressSettingsRepository = addressSettingsRepository;
       this.scheduledExecutor = scheduledExecutor;
-
       this.storageManager = storageManager;
-
       this.executorFactory = executorFactory;
+      this.server = server;
    }
 
    @Override
@@ -72,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory {
       final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
       final Queue queue;
       if (addressSettings.isLastValueQueue()) {
-         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
       } else {
-         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
       }
       return queue;
    }
@@ -98,9 +101,9 @@ public class QueueFactoryImpl implements QueueFactory {
 
       Queue queue;
       if (addressSettings.isLastValueQueue()) {
-         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),  scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),  scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
       } else {
-         queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+         queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
       }
 
       return queue;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a62ae79..c2cfdef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@@ -69,7 +70,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
@@ -198,6 +198,8 @@ public class QueueImpl implements Queue {
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
+   private final ActiveMQServer server;
+
    private final ScheduledExecutorService scheduledExecutor;
 
    private final SimpleString address;
@@ -330,8 +332,9 @@ public class QueueImpl implements Queue {
                     final PostOffice postOffice,
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                    final Executor executor) {
-      this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+                    final Executor executor,
+                    final ActiveMQServer server) {
+      this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
    }
 
    public QueueImpl(final long id,
@@ -347,8 +350,9 @@ public class QueueImpl implements Queue {
                     final PostOffice postOffice,
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                    final Executor executor) {
-      this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+                    final Executor executor,
+                    final ActiveMQServer server) {
+      this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
    }
 
    public QueueImpl(final long id,
@@ -367,7 +371,8 @@ public class QueueImpl implements Queue {
                     final PostOffice postOffice,
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                    final Executor executor) {
+                    final Executor executor,
+                    final ActiveMQServer server) {
 
       this.id = id;
 
@@ -401,6 +406,8 @@ public class QueueImpl implements Queue {
 
       this.scheduledExecutor = scheduledExecutor;
 
+      this.server = server;
+
       scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
 
       if (addressSettingsRepository != null) {
@@ -1078,6 +1085,9 @@ public class QueueImpl implements Queue {
          messagesAcknowledged++;
       }
 
+      if (server != null) {
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null);
+      }
    }
 
    @Override
@@ -1112,6 +1122,10 @@ public class QueueImpl implements Queue {
       } else {
          messagesAcknowledged++;
       }
+
+      if (server != null) {
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null);
+      }
    }
 
    @Override
@@ -1195,6 +1209,11 @@ public class QueueImpl implements Queue {
          }
          acknowledge(ref, AckReason.EXPIRED);
       }
+
+      if (server != null) {
+         final SimpleString expiryAddress = messageExpiryAddress;
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageExpired(ref, expiryAddress) : null);
+      }
    }
 
    private SimpleString expiryAddressFromMessageAddress(MessageReference ref) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 9e33602..af8524d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -416,6 +416,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       try {
          Message message = reference.getMessage();
 
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeliver(reference) : null);
+
          if (message.isLargeMessage() && supportLargeMessage) {
             if (largeMessageDeliverer == null) {
                // This can't really happen as handle had already crated the deliverer
@@ -432,6 +434,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       } finally {
          lockDelivery.readLock().unlock();
          callback.afterDelivery();
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeliver(reference) : null);
       }
 
    }
@@ -447,6 +450,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
       }
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseConsumer(this, failed) : null);
+
       setStarted(false);
 
       LargeMessageDeliverer del = largeMessageDeliverer;
@@ -501,6 +506,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
          managementService.sendNotification(notification);
       }
+
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseConsumer(this, failed) : null);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index edd7afc..7245843 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -16,10 +16,8 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonObjectBuilder;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.Xid;
+import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,6 +29,11 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonObjectBuilder;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.Closeable;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@@ -89,8 +92,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
 import org.apache.activemq.artemis.utils.TypedProperties;
 import org.jboss.logging.Logger;
 
-import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
-
 /**
  * Server side Session implementation
  */
@@ -345,6 +346,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    protected void doClose(final boolean failed) throws Exception {
       synchronized (this) {
+         if (!closed) {
+            server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null);
+         }
          this.setStarted(false);
          if (closed)
             return;
@@ -395,6 +399,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          }
 
          closed = true;
+
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseSession(this, failed) : null);
       }
    }
 
@@ -444,9 +450,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       Filter filter = FilterImpl.createFilter(filterString);
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, unPrefixedQueueName,
+            filterString, browseOnly, supportLargeMessage) : null);
+
       ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
       consumers.put(consumer.getID(), consumer);
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConsumer(consumer) : null);
+
       if (!browseOnly) {
          TypedProperties props = new TypedProperties();
 
@@ -1290,6 +1301,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                                           final boolean direct,
                                           boolean noAutoCreateQueue) throws Exception {
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(tx, message, direct, noAutoCreateQueue) : null);
+
       // If the protocol doesn't support flow control, we have no choice other than fail the communication
       if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
          ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
@@ -1333,10 +1346,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       if (message.getAddressSimpleString().equals(managementAddress)) {
          // It's a management message
 
-         handleManagementMessage(tx, message, direct);
+         result = handleManagementMessage(tx, message, direct);
       } else {
          result = doSend(tx, message, address, direct, noAutoCreateQueue);
       }
+
+      final RoutingStatus finalResult = result;
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(tx, message, direct, noAutoCreateQueue, finalResult) : null);
+
       return result;
    }
 
@@ -1367,10 +1384,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public void addMetaData(String key, String data) {
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null);
       if (metaData == null) {
          metaData = new HashMap<>();
       }
       metaData.put(key, data);
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSessionMetadataAdded(this, key, data) : null);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
new file mode 100644
index 0000000..bc85475
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.plugin;
+
+public interface ActiveMQPluginRunnable {
+
+   void run(ActiveMQServerPlugin plugin);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
new file mode 100644
index 0000000..95296f0
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.plugin;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.cluster.Bridge;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+
+
+public interface ActiveMQServerPlugin {
+
+
+   /**
+    * A connection has been created.
+    *
+    * @param connection The newly created connection
+    */
+   default void afterCreateConnection(RemotingConnection connection) {
+
+   }
+
+   /**
+    * A connection has been destroyed.
+    *
+    * @param connection
+    */
+   default void afterDestroyConnection(RemotingConnection connection) {
+
+   }
+
+   /**
+    * Before a session is created.
+    *
+    * @param name
+    * @param username
+    * @param minLargeMessageSize
+    * @param connection
+    * @param autoCommitSends
+    * @param autoCommitAcks
+    * @param preAcknowledge
+    * @param xa
+    * @param defaultAddress
+    * @param callback
+    * @param autoCreateQueues
+    * @param context
+    * @param prefixes
+    */
+   default void beforeCreateSession(String name, String username, int minLargeMessageSize,
+         RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
+         boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
+         Map<SimpleString, RoutingType> prefixes) {
+
+   }
+
+   /**
+    * After a session has been created.
+    *
+    * @param session The newly created session
+    */
+   default void afterCreateSession(ServerSession session) {
+
+   }
+
+   /**
+    * Before a session is closed
+    *
+    * @param session
+    * @param failed
+    */
+   default void beforeCloseSession(ServerSession session, boolean failed) {
+
+   }
+
+   /**
+    * After a session is closed
+    *
+    * @param session
+    * @param failed
+    */
+   default void afterCloseSession(ServerSession session, boolean failed) {
+
+   }
+
+   /**
+    * Before session metadata is added to the session
+    *
+    * @param session
+    * @param key
+    * @param data
+    */
+   default void beforeSessionMetadataAdded(ServerSession session, String key, String data) {
+
+   }
+
+   /**
+    * After session metadata is added to the session
+    *
+    * @param session
+    * @param key
+    * @param data
+    */
+   default void afterSessionMetadataAdded(ServerSession session, String key, String data) {
+
+   }
+
+   /**
+    * Before a consumer is created
+    *
+    * @param consumerID
+    * @param queueName
+    * @param filterString
+    * @param browseOnly
+    * @param supportLargeMessage
+    */
+   default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
+         boolean browseOnly, boolean supportLargeMessage) {
+
+   }
+
+   /**
+    * After a consumer has been created
+    *
+    * @param consumer the created consumer
+    */
+   default void afterCreateConsumer(ServerConsumer consumer) {
+
+   }
+
+   /**
+    * Before a consumer is closed
+    *
+    * @param consumer
+    * @param failed
+    */
+   default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
+
+   }
+
+   /**
+    * After a consumer is closed
+    *
+    * @param consumer
+    * @param failed
+    */
+   default void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
+
+   }
+
+   /**
+    * Before a queue is created
+    *
+    * @param queueConfig
+    */
+   default void beforeCreateQueue(QueueConfig queueConfig) {
+
+   }
+
+   /**
+    * After a queue has been created
+    *
+    * @param queue The newly created queue
+    */
+   default void afterCreateQueue(Queue queue) {
+
+   }
+
+   /**
+    * Before a queue is destroyed
+    *
+    * @param queueName
+    * @param session
+    * @param checkConsumerCount
+    * @param removeConsumers
+    * @param autoDeleteAddress
+    */
+   default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
+         boolean removeConsumers, boolean autoDeleteAddress) {
+
+   }
+
+   /**
+    * After a queue has been destroyed
+    *
+    * @param queue
+    * @param address
+    * @param session
+    * @param checkConsumerCount
+    * @param removeConsumers
+    * @param autoDeleteAddress
+    */
+   default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
+         boolean removeConsumers, boolean autoDeleteAddress) {
+
+   }
+
+   /**
+    * Before a message is sent
+    *
+    * @param tx
+    * @param message
+    * @param direct
+    * @param noAutoCreateQueue
+    */
+   default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
+
+   }
+
+   /**
+    * After a message is sent
+    *
+    * @param tx
+    * @param message
+    * @param direct
+    * @param noAutoCreateQueue
+    * @param result
+    */
+   default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
+         RoutingStatus result) {
+
+   }
+
+   /**
+    * Before a message is routed
+    *
+    * @param message
+    * @param context
+    * @param direct
+    * @param rejectDuplicates
+    */
+   default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
+
+   }
+
+   /**
+    * After a message is routed
+    *
+    * @param message
+    * @param context
+    * @param direct
+    * @param rejectDuplicates
+    * @param result
+    */
+   default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
+         RoutingStatus result) {
+
+   }
+
+   /**
+    * Before a message is delivered to a client consumer
+    *
+    * @param reference
+    */
+   default void beforeDeliver(MessageReference reference) {
+
+   }
+
+   /**
+    * After a message is delivered to a client consumer
+    *
+    * @param reference
+    */
+   default void afterDeliver(MessageReference reference) {
+
+   }
+
+   /**
+    * A message has been expired
+    *
+    * @param message The expired message
+    * @param messageExpiryAddress The message expiry address if exists
+    */
+   default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
+
+   }
+
+   /**
+    * A message has been acknowledged
+    *
+    * @param ref The acked message
+    * @param reason The ack reason
+    */
+   default void messageAcknowledged(MessageReference ref, AckReason reason) {
+
+   }
+
+   /**
+    * Before a bridge is deployed
+    *
+    * @param config The bridge configuration
+    */
+   default void beforeDeployBridge(BridgeConfiguration config) {
+
+   }
+
+   /**
+    * After a bridge has been deployed
+    *
+    * @param bridge The newly deployed bridge
+    */
+   default void afterDeployBridge(Bridge bridge) {
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 8d27895..60b9b74 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -166,6 +166,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       // Add optional security for tests that need it
       configureBrokerSecurity(server);
 
+      // Add extra configuration
+      addConfiguration(server);
+
       server.start();
 
       // Prepare all addresses and queues for client tests.
@@ -174,6 +177,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       return server;
    }
 
+   protected void addConfiguration(ActiveMQServer server) {
+
+   }
+
    protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
       HashMap<String, Object> params = new HashMap<>();
       params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 201a96b..da695ca 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
-import javax.management.MBeanServer;
 import java.lang.management.ManagementFactory;
 import java.util.LinkedList;
 import java.util.Map;
@@ -26,10 +25,12 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+import javax.management.MBeanServer;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Interceptor;
-
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -53,7 +54,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
@@ -234,8 +234,10 @@ public class HangConsumerTest extends ActiveMQTestBase {
                              final PostOffice postOffice,
                              final StorageManager storageManager,
                              final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                             final Executor executor) {
-            super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+                             final Executor executor, final ActiveMQServer server) {
+            super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode,
+                  maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager,
+                  addressSettingsRepository, executor, server);
          }
 
          @Override
@@ -256,13 +258,18 @@ public class HangConsumerTest extends ActiveMQTestBase {
          LocalFactory(final ExecutorFactory executorFactory,
                       final ScheduledExecutorService scheduledExecutor,
                       final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                      final StorageManager storageManager) {
-            super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager);
+                      final StorageManager storageManager, final ActiveMQServer server) {
+            super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager, server);
          }
 
          @Override
          public Queue createQueueWith(final QueueConfig config) {
-            queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+            queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(),
+                                            config.user(), config.pageSubscription(), config.isDurable(),
+                                            config.isTemporary(), config.isAutoCreated(), config.deliveryMode(),
+                                            config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor,
+                                            postOffice, storageManager, addressSettingsRepository,
+                                            executorFactory.getExecutor(), server);
             return queue;
          }
 
@@ -277,13 +284,18 @@ public class HangConsumerTest extends ActiveMQTestBase {
                                   final boolean durable,
                                   final boolean temporary,
                                   final boolean autoCreated) {
-            queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+            queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable,
+                                            temporary, autoCreated, RoutingType.MULTICAST, null, null,
+                                            scheduledExecutor, postOffice, storageManager, addressSettingsRepository,
+                                            executorFactory.getExecutor(), server);
             return queue;
          }
 
       }
 
-      LocalFactory queueFactory = new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(), server.getAddressSettingsRepository(), server.getStorageManager());
+      LocalFactory queueFactory =
+               new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(),
+                                server.getAddressSettingsRepository(), server.getStorageManager(), server);
 
       queueFactory.setPostOffice(server.getPostOffice());
 
@@ -359,7 +371,10 @@ public class HangConsumerTest extends ActiveMQTestBase {
       long txID = server.getStorageManager().generateID();
 
       // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
-      LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
+      LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE,
+                                                           new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false,
+                                                                         false, null, null, null, null, null, null),
+                                                           server.getNodeID());
       server.getStorageManager().addQueueBinding(txID, newBinding);
       server.getStorageManager().commitBindings(txID);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
index 540baf6..44015e1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
@@ -29,9 +23,17 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -52,7 +54,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -518,7 +519,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
                         StorageManager storageManager,
                         HierarchicalRepository<AddressSettings> addressSettingsRepository,
                         Executor executor) {
-            super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+            super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor,
+                  postOffice, storageManager, addressSettingsRepository, executor, null);
          }
 
          @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
index f8094a1..63743ed 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
@@ -16,14 +16,15 @@
  */
 package org.apache.activemq.artemis.tests.integration.jms.client;
 
+import java.util.List;
+import java.util.Map;
+
 import javax.jms.Connection;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
@@ -81,7 +82,12 @@ public class TopicCleanupTest extends JMSTestBase {
          for (int i = 0; i < 100; i++) {
             long txid = storage.generateID();
 
-            final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"), SimpleString.toSimpleString("topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
+            final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"),
+                                              SimpleString.toSimpleString("topic"),
+                                              FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null,
+                                              true, false, false, server.getScheduledPool(), server.getPostOffice(),
+                                              storage, server.getAddressSettingsRepository(),
+                                              server.getExecutorFactory().getExecutor(), server);
 
             LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
index 056891a..bd8cfd8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
@@ -45,7 +45,7 @@ public class JmsResourceProvider {
    /**
     * Creates a connection.
     *
-    * @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory)
+    * @see org.apache.activemq.test.JmsResourceProvider#afterCreateConnection(javax.jms.ConnectionFactory)
     */
    public Connection createConnection(ConnectionFactory cf) throws JMSException {
       Connection connection = cf.createConnection();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
new file mode 100644
index 0000000..d918b27
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ */
+public class AmqpPluginTest extends AmqpClientTestSupport {
+
+   protected static final Logger LOG = LoggerFactory.getLogger(AmqpPluginTest.class);
+
+   private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+   private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      super.addConfiguration(server);
+      server.registerBrokerPlugin(verifier);
+   }
+
+   @Test(timeout = 60000)
+   public void testQueueReceiverReadAndAckMessage() throws Exception {
+      sendMessages(getQueueName(), 1);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(message);
+      message.accept();
+      receiver.close();
+      connection.close();
+
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
+            BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION,
+            BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+            BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND,
+            AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
+   }
+
+   @Override
+   public void sendMessages(String destinationName, int count) throws Exception {
+      sendMessages(destinationName, count, null);
+   }
+
+   @Override
+   public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("MessageID:" + i);
+            if (routingType != null) {
+               message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
+            }
+            sender.send(message);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+}


[3/3] activemq-artemis git commit: This closes #1242

Posted by cl...@apache.org.
This closes #1242


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5391d42e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5391d42e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5391d42e

Branch: refs/heads/master
Commit: 5391d42e474a323b84bcd0e2dd7162bbeaffeefe
Parents: 303d97c 1e1ede8
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed May 3 12:17:01 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed May 3 12:17:01 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/config/Configuration.java      |  21 ++
 .../core/config/impl/ConfigurationImpl.java     |  24 ++
 .../core/postoffice/impl/PostOfficeImpl.java    |   4 +
 .../server/impl/RemotingServiceImpl.java        |   7 +-
 .../artemis/core/server/ActiveMQServer.java     |  18 +-
 .../core/server/cluster/ClusterManager.java     |   3 +
 .../core/server/impl/ActiveMQServerImpl.java    |  57 +++-
 .../core/server/impl/LastValueQueue.java        |   6 +-
 .../core/server/impl/QueueFactoryImpl.java      |  19 +-
 .../artemis/core/server/impl/QueueImpl.java     |  31 +-
 .../core/server/impl/ServerConsumerImpl.java    |   7 +
 .../core/server/impl/ServerSessionImpl.java     |  33 +-
 .../server/plugin/ActiveMQPluginRunnable.java   |  24 ++
 .../server/plugin/ActiveMQServerPlugin.java     | 336 +++++++++++++++++++
 .../integration/amqp/AmqpClientTestSupport.java |   7 +
 .../integration/client/HangConsumerTest.java    |  37 +-
 .../client/InterruptedLargeMessageTest.java     |  18 +-
 .../jms/client/TopicCleanupTest.java            |  12 +-
 .../openwire/amq/JmsResourceProvider.java       |   2 +-
 .../integration/plugin/AmqpPluginTest.java      | 131 ++++++++
 .../integration/plugin/CorePluginTest.java      | 257 ++++++++++++++
 .../plugin/MethodCalledVerifier.java            | 276 +++++++++++++++
 .../integration/plugin/MqttPluginTest.java      | 132 ++++++++
 .../integration/plugin/OpenwirePluginTest.java  | 109 ++++++
 .../integration/plugin/StompPluginTest.java     | 126 +++++++
 .../timing/core/server/impl/QueueImplTest.java  |  15 +-
 .../unit/core/server/impl/QueueImplTest.java    |   3 +-
 .../server/impl/fakes/FakeQueueFactory.java     |   7 +-
 28 files changed, 1664 insertions(+), 58 deletions(-)
----------------------------------------------------------------------