You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/05/03 16:17:11 UTC
[1/3] activemq-artemis git commit: ARTEMIS-898 - Adding Plugin Support
Repository: activemq-artemis
Updated Branches:
refs/heads/master 303d97c76 -> 5391d42e4
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
new file mode 100644
index 0000000..9f6b4ea
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CorePluginTest extends JMSTestBase {
+
+ private Queue queue;
+
+ private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+ private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+ public static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
+
+ @Override
+ protected Configuration createDefaultConfig(boolean netty) throws Exception {
+ Configuration config = super.createDefaultConfig(netty);
+ config.registerBrokerPlugin(verifier);
+ return config;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ queue = createQueue("queue1");
+ }
+
+
+ @Test
+ public void testSendReceive() throws Exception {
+ conn = cf.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ TextMessage msg1 = sess.createTextMessage("test");
+ prod.send(msg1);
+ TextMessage received1 = (TextMessage)cons.receive(1000);
+ assertNotNull(received1);
+
+ conn.close();
+
+ verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
+ BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE);
+ verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
+ BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+ BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
+ AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
+ verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
+ AFTER_CLOSE_SESSION);
+ }
+
+ @Test
+ public void testDestroyQueue() throws Exception {
+ conn = cf.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sess.createProducer(queue);
+ conn.close();
+
+ server.destroyQueue(new SimpleString(queue.getQueueName()));
+
+ verifier.validatePluginMethodsEquals(1, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_DESTROY_QUEUE,
+ AFTER_DESTROY_QUEUE);
+ }
+
+ @Test
+ public void testMessageExpireServer() throws Exception {
+ server.registerBrokerPlugin(new ExpiredPluginVerifier());
+
+ conn = cf.createConnection();
+ conn.setClientID("test");
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+ prod.setTimeToLive(1);
+ MessageConsumer cons = sess.createConsumer(queue);
+ Thread.sleep(100);
+ TextMessage msg1 = sess.createTextMessage("test");
+ prod.send(msg1);
+ Thread.sleep(100);
+ assertNull(cons.receive(100));
+
+ conn.close();
+
+ verifier.validatePluginMethodsEquals(0, BEFORE_DELIVER, AFTER_DELIVER, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+ verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
+ BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+ BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED,
+ BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
+ AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED);
+ verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
+ AFTER_CLOSE_SESSION);
+
+ }
+
+ @Test
+ public void testMessageExpireClient() throws Exception {
+ server.registerBrokerPlugin(new ExpiredPluginVerifier());
+
+ conn = cf.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+ prod.setTimeToLive(500);
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ TextMessage msg1 = sess.createTextMessage("test");
+ prod.send(msg1);
+ Thread.sleep(500);
+ assertNull(cons.receive(500));
+
+ conn.close();
+
+ verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+ verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
+ BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+ BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
+ AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED);
+ verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
+ AFTER_CLOSE_SESSION);
+
+ }
+
+ @Test
+ public void testSimpleBridge() throws Exception {
+ server.stop();
+ ActiveMQServer server0;
+ ActiveMQServer server1;
+
+ Map<String, Object> server0Params = new HashMap<>();
+ server0 = createClusteredServerWithParams(false, 0, false, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<>();
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ server1 = createClusteredServerWithParams(false, 1, false, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+
+ TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params);
+
+ HashMap<String, TransportConfiguration> connectors = new HashMap<>();
+ connectors.put(server1tc.getName(), server1tc);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+ server0.registerBrokerPlugin(verifier);
+
+ ArrayList<String> connectorConfig = new ArrayList<>();
+ connectorConfig.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1")
+ .setQueueName(queueName0)
+ .setForwardingAddress(forwardAddress)
+ .setRetryInterval(1000)
+ .setReconnectAttemptsOnSameNode(-1)
+ .setUseDuplicateDetection(false)
+ .setStaticConnectors(connectorConfig);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ server1.start();
+ server0.start();
+
+ verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+
+ server0.stop();
+ server1.stop();
+
+ }
+
+ private class ExpiredPluginVerifier implements ActiveMQServerPlugin {
+
+ @Override
+ public void messageAcknowledged(MessageReference ref, AckReason reason) {
+ assertEquals(AckReason.EXPIRED, reason);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
new file mode 100644
index 0000000..14aa4a1
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -0,0 +1,276 @@
+/**
+ *
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.cluster.Bridge;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class MethodCalledVerifier implements ActiveMQServerPlugin {
+
+ private final Map<String, AtomicInteger> methodCalls;
+
+ public static final String AFTER_CREATE_CONNECTION = "afterCreateConnection";
+ public static final String AFTER_DESTROY_CONNECTION = "afterDestroyConnection";
+ public static final String BEFORE_CREATE_SESSION = "beforeCreateSession";
+ public static final String AFTER_CREATE_SESSION = "afterCreateSession";
+ public static final String BEFORE_CLOSE_SESSION = "beforeCloseSession";
+ public static final String AFTER_CLOSE_SESSION = "afterCloseSession";
+ public static final String BEFORE_SESSION_METADATA_ADDED = "beforeSessionMetadataAdded";
+ public static final String AFTER_SESSION_METADATA_ADDED = "afterSessionMetadataAdded";
+ public static final String BEFORE_CREATE_CONSUMER = "beforeCreateConsumer";
+ public static final String AFTER_CREATE_CONSUMER = "afterCreateConsumer";
+ public static final String BEFORE_CLOSE_CONSUMER = "beforeCloseConsumer";
+ public static final String AFTER_CLOSE_CONSUMER = "afterCloseConsumer";
+ public static final String BEFORE_CREATE_QUEUE = "beforeCreateQueue";
+ public static final String AFTER_CREATE_QUEUE = "afterCreateQueue";
+ public static final String BEFORE_DESTROY_QUEUE = "beforeDestroyQueue";
+ public static final String AFTER_DESTROY_QUEUE = "afterDestroyQueue";
+ public static final String MESSAGE_EXPIRED = "messageExpired";
+ public static final String MESSAGE_ACKED = "messageAcknowledged";
+ public static final String BEFORE_SEND = "beforeSend";
+ public static final String AFTER_SEND = "afterSend";
+ public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute";
+ public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute";
+ public static final String BEFORE_DELIVER = "beforeDeliver";
+ public static final String AFTER_DELIVER = "afterDeliver";
+ public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
+ public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge";
+
+ /**
+ * @param methods
+ */
+ public MethodCalledVerifier(Map<String, AtomicInteger> methodCalls) {
+ super();
+ this.methodCalls = methodCalls;
+ }
+
+ @Override
+ public void afterCreateConnection(RemotingConnection connection) {
+ Preconditions.checkNotNull(connection);
+ methodCalled(AFTER_CREATE_CONNECTION);
+ }
+
+ @Override
+ public void afterDestroyConnection(RemotingConnection connection) {
+ Preconditions.checkNotNull(connection);
+ methodCalled(AFTER_DESTROY_CONNECTION);
+ }
+
+ @Override
+ public void beforeCreateSession(String name, String username, int minLargeMessageSize, RemotingConnection connection,
+ boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa,
+ String defaultAddress, SessionCallback callback, boolean autoCreateQueues,
+ OperationContext context, Map<SimpleString, RoutingType> prefixes) {
+ Preconditions.checkNotNull(connection);
+ methodCalled(BEFORE_CREATE_SESSION);
+ }
+
+ @Override
+ public void afterCreateSession(ServerSession session) {
+ Preconditions.checkNotNull(session);
+ methodCalled(AFTER_CREATE_SESSION);
+ }
+
+ @Override
+ public void beforeCloseSession(ServerSession session, boolean failed) {
+ Preconditions.checkNotNull(session);
+ methodCalled(BEFORE_CLOSE_SESSION);
+ }
+
+ @Override
+ public void afterCloseSession(ServerSession session, boolean failed) {
+ Preconditions.checkNotNull(session);
+ methodCalled(AFTER_CLOSE_SESSION);
+ }
+
+ @Override
+ public void beforeSessionMetadataAdded(ServerSession session, String key, String data) {
+ Preconditions.checkNotNull(key);
+ methodCalled(BEFORE_SESSION_METADATA_ADDED);
+ }
+
+ @Override
+ public void afterSessionMetadataAdded(ServerSession session, String key, String data) {
+ Preconditions.checkNotNull(key);
+ methodCalled(AFTER_SESSION_METADATA_ADDED);
+ }
+
+ @Override
+ public void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
+ boolean browseOnly, boolean supportLargeMessage) {
+ Preconditions.checkNotNull(queueName);
+ methodCalled(BEFORE_CREATE_CONSUMER);
+ }
+
+ @Override
+ public void afterCreateConsumer(ServerConsumer consumer) {
+ Preconditions.checkNotNull(consumer);
+ methodCalled(AFTER_CREATE_CONSUMER);
+ }
+
+ @Override
+ public void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
+ Preconditions.checkNotNull(consumer);
+ methodCalled(BEFORE_CLOSE_CONSUMER);
+ }
+
+ @Override
+ public void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
+ Preconditions.checkNotNull(consumer);
+ methodCalled(AFTER_CLOSE_CONSUMER);
+ }
+
+ @Override
+ public void beforeCreateQueue(QueueConfig queueConfig) {
+ Preconditions.checkNotNull(queueConfig);
+ methodCalled(BEFORE_CREATE_QUEUE);
+ }
+
+ @Override
+ public void afterCreateQueue(org.apache.activemq.artemis.core.server.Queue queue) {
+ Preconditions.checkNotNull(queue);
+ methodCalled(AFTER_CREATE_QUEUE);
+ }
+
+ @Override
+ public void beforeDestroyQueue(SimpleString queueName, SecurityAuth session, boolean checkConsumerCount,
+ boolean removeConsumers, boolean autoDeleteAddress) {
+ Preconditions.checkNotNull(queueName);
+ methodCalled(BEFORE_DESTROY_QUEUE);
+ }
+
+ @Override
+ public void afterDestroyQueue(Queue queue, SimpleString address, SecurityAuth session, boolean checkConsumerCount,
+ boolean removeConsumers, boolean autoDeleteAddress) {
+ Preconditions.checkNotNull(queue);
+ methodCalled(AFTER_DESTROY_QUEUE);
+ }
+
+ @Override
+ public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
+ Preconditions.checkNotNull(message);
+ methodCalled(MESSAGE_EXPIRED);
+ }
+
+ @Override
+ public void messageAcknowledged(MessageReference ref, AckReason reason) {
+ Preconditions.checkNotNull(ref);
+ Preconditions.checkNotNull(reason);
+ methodCalled(MESSAGE_ACKED);
+ }
+
+ @Override
+ public void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
+ Preconditions.checkNotNull(message);
+ methodCalled(BEFORE_SEND);
+ }
+
+ @Override
+ public void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
+ RoutingStatus result) {
+ Preconditions.checkNotNull(message);
+ Preconditions.checkNotNull(result);
+ methodCalled(AFTER_SEND);
+ }
+
+ @Override
+ public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
+ Preconditions.checkNotNull(message);
+ Preconditions.checkNotNull(context);
+ methodCalled(BEFORE_MESSAGE_ROUTE);
+ }
+
+ @Override
+ public void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
+ RoutingStatus result) {
+ Preconditions.checkNotNull(message);
+ Preconditions.checkNotNull(context);
+ Preconditions.checkNotNull(result);
+ methodCalled(AFTER_MESSAGE_ROUTE);
+ }
+
+ @Override
+ public void beforeDeliver(MessageReference reference) {
+ Preconditions.checkNotNull(reference);
+ methodCalled(BEFORE_DELIVER);
+ }
+
+ @Override
+ public void afterDeliver(MessageReference reference) {
+ Preconditions.checkNotNull(reference);
+ methodCalled(AFTER_DELIVER);
+ }
+
+ @Override
+ public void beforeDeployBridge(BridgeConfiguration config) {
+ Preconditions.checkNotNull(config);
+ methodCalled(BEFORE_DEPLOY_BRIDGE);
+ }
+
+ @Override
+ public void afterDeployBridge(Bridge bridge) {
+ Preconditions.checkNotNull(bridge);
+ methodCalled(AFTER_DEPLOY_BRIDGE);
+ }
+
+ public void validatePluginMethodsEquals(int count, String... names) {
+ Arrays.asList(names).forEach(name -> {
+ assertEquals("validating method " + name, count, methodCalls.getOrDefault(name, new AtomicInteger()).get());
+ });
+ }
+
+ public void validatePluginMethodsAtLeast(int count, String... names) {
+ Arrays.asList(names).forEach(name -> {
+ assertTrue("validating method " + name, count <= methodCalls.getOrDefault(name, new AtomicInteger()).get());
+ });
+ }
+
+ private void methodCalled(String name) {
+ methodCalls.computeIfAbsent(name, k -> new AtomicInteger()).incrementAndGet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
new file mode 100644
index 0000000..5e7f127
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MqttPluginTest extends MQTTTestSupport {
+
+
+ private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+ private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
+ sessions.setAccessible(true);
+ sessions.set(null, new ConcurrentHashMap<>());
+
+ Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
+ connectedClients.setAccessible(true);
+ connectedClients.set(null, new ConcurrentHashSet<>());
+ super.setUp();
+
+ }
+
+ @Override
+ public void configureBroker() throws Exception {
+ super.configureBroker();
+ server.registerBrokerPlugin(verifier);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendAndReceiveMQTT() throws Exception {
+ final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+ initializeConnection(subscriptionProvider);
+
+ subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
+
+ final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES);
+
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < NUM_MESSAGES; i++) {
+ try {
+ byte[] payload = subscriptionProvider.receive(10000);
+ assertNotNull("Should get a message", payload);
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ break;
+ }
+
+ }
+ }
+ });
+ thread.start();
+
+ final MQTTClientProvider publishProvider = getMQTTClientProvider();
+ initializeConnection(publishProvider);
+
+ for (int i = 0; i < NUM_MESSAGES; i++) {
+ String payload = "Message " + i;
+ publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
+ }
+
+ latch.await(10, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ subscriptionProvider.disconnect();
+ publishProvider.disconnect();
+
+ verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+ verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
+ AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
+ AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
+ MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+ AFTER_DELIVER);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java
new file mode 100644
index 0000000..afb6841
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+
+public class OpenwirePluginTest extends BasicOpenWireTest {
+
+ private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+ private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+
+ @Override
+ protected ActiveMQServer createServer(boolean realFiles, Configuration configuration, long pageSize,
+ long maxAddressSize, Map<String, AddressSettings> settings) {
+ ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
+ server.registerBrokerPlugin(verifier);
+ return server;
+ }
+
+ @Test
+ public void testAckedMessageAreConsumed() throws JMSException {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+
+ // Reset the session.
+ session.close();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(1000);
+ assertNull(msg);
+
+ session.close();
+ connection.close();
+
+ verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
+ BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED);
+ verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
+ AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
+ AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
+ MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+ AFTER_DELIVER);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
new file mode 100644
index 0000000..c771272
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StompPluginTest extends StompTestBase {
+
+ private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+ public static final String CLIENT_ID = "myclientid";
+
+ private StompClientConnectionV12 conn;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ conn = (StompClientConnectionV12)StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ boolean connected = conn != null && conn.isConnected();
+ log.debug("Connection 1.2 : " + connected);
+ if (connected) {
+ conn.disconnect();
+ }
+ } finally {
+ super.tearDown();
+ }
+ }
+
+ private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+ private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+
+ @Override
+ protected JMSServerManager createServer() throws Exception {
+ JMSServerManager server = super.createServer();
+ server.getActiveMQServer().registerBrokerPlugin(verifier);
+ return server;
+ }
+
+ @Test
+ public void testSendAndReceive() throws Exception {
+
+ // subscribehoward county escaped
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
+ newConn.connect(defUser, defPass);
+ subscribe(newConn, "a-sub");
+
+ send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!");
+ ClientStompFrame frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+
+ // unsub
+ unsubscribe(newConn, "a-sub");
+
+ newConn.disconnect();
+
+ verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+ verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
+ AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
+ AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
+ MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+ AFTER_DELIVER);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
index c0a4112..121a4b0 100644
--- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
+++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
@@ -67,7 +67,10 @@ public class QueueImplTest extends ActiveMQTestBase {
@Test
public void testScheduledNoConsumer() throws Exception {
- QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
+ QueueImpl queue =
+ new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
+ false, scheduledExecutor, null, null, null,
+ Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
// Send one scheduled
@@ -132,7 +135,10 @@ public class QueueImplTest extends ActiveMQTestBase {
@Test
public void testScheduled() throws Exception {
- QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
+ QueueImpl queue =
+ new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
+ false, scheduledExecutor, null, null, null,
+ Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
FakeConsumer consumer = null;
@@ -230,7 +236,10 @@ public class QueueImplTest extends ActiveMQTestBase {
public void disconnect() {
}
};
- QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
+ QueueImpl queue =
+ new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false,
+ scheduledExecutor, null, null, null,
+ Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 78179a8..0a08eb6 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1309,6 +1309,7 @@ public class QueueImplTest extends ActiveMQTestBase {
}
private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) {
- return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor, new FakePostOffice(), null, null, executor);
+ return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor,
+ new FakePostOffice(), null, null, executor, null);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
index 06c7e1e..40c117a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
@@ -40,7 +40,9 @@ public final class FakeQueueFactory implements QueueFactory {
@Override
public Queue createQueueWith(final QueueConfig config) {
- return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, null, null, executor);
+ return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(),
+ config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(),
+ scheduledExecutor, postOffice, null, null, executor, null);
}
@Deprecated
@@ -54,7 +56,8 @@ public final class FakeQueueFactory implements QueueFactory {
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
- return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, null, null, executor);
+ return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated,
+ scheduledExecutor, postOffice, null, null, executor, null);
}
@Override
[2/3] activemq-artemis git commit: ARTEMIS-898 - Adding Plugin Support
Posted by cl...@apache.org.
ARTEMIS-898 - Adding Plugin Support
Adding a new ActievMQServerPlugin interface to support adding custom
behavior to the broker at certain events such as connection or session
creation.
https://issues.apache.org/jira/browse/ARTEMIS-898
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1e1ede84
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1e1ede84
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1e1ede84
Branch: refs/heads/master
Commit: 1e1ede84c0483099f27741bc046ef95c08e1d090
Parents: 303d97c
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue May 2 09:46:17 2017 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed May 3 11:21:32 2017 -0400
----------------------------------------------------------------------
.../artemis/core/config/Configuration.java | 21 ++
.../core/config/impl/ConfigurationImpl.java | 24 ++
.../core/postoffice/impl/PostOfficeImpl.java | 4 +
.../server/impl/RemotingServiceImpl.java | 7 +-
.../artemis/core/server/ActiveMQServer.java | 18 +-
.../core/server/cluster/ClusterManager.java | 3 +
.../core/server/impl/ActiveMQServerImpl.java | 57 +++-
.../core/server/impl/LastValueQueue.java | 6 +-
.../core/server/impl/QueueFactoryImpl.java | 19 +-
.../artemis/core/server/impl/QueueImpl.java | 31 +-
.../core/server/impl/ServerConsumerImpl.java | 7 +
.../core/server/impl/ServerSessionImpl.java | 33 +-
.../server/plugin/ActiveMQPluginRunnable.java | 24 ++
.../server/plugin/ActiveMQServerPlugin.java | 336 +++++++++++++++++++
.../integration/amqp/AmqpClientTestSupport.java | 7 +
.../integration/client/HangConsumerTest.java | 37 +-
.../client/InterruptedLargeMessageTest.java | 18 +-
.../jms/client/TopicCleanupTest.java | 12 +-
.../openwire/amq/JmsResourceProvider.java | 2 +-
.../integration/plugin/AmqpPluginTest.java | 131 ++++++++
.../integration/plugin/CorePluginTest.java | 257 ++++++++++++++
.../plugin/MethodCalledVerifier.java | 276 +++++++++++++++
.../integration/plugin/MqttPluginTest.java | 132 ++++++++
.../integration/plugin/OpenwirePluginTest.java | 109 ++++++
.../integration/plugin/StompPluginTest.java | 126 +++++++
.../timing/core/server/impl/QueueImplTest.java | 15 +-
.../unit/core/server/impl/QueueImplTest.java | 3 +-
.../server/impl/fakes/FakeQueueFactory.java | 7 +-
28 files changed, 1664 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 7dfb1a5..7da5b02 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
@@ -1081,4 +1082,24 @@ public interface Configuration {
Configuration setNetworkCheckPing6Command(String command);
String getInternalNamingPrefix();
+
+ /**
+ * @param plugins
+ */
+ void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
+
+ /**
+ * @param plugin
+ */
+ void registerBrokerPlugin(ActiveMQServerPlugin plugin);
+
+ /**
+ * @param plugin
+ */
+ void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
+
+ /**
+ * @return
+ */
+ List<ActiveMQServerPlugin> getBrokerPlugins();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 2a538ca..8edeb5b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@@ -63,6 +64,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.utils.Env;
@@ -232,6 +234,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private List<SecuritySettingPlugin> securitySettingPlugins = new ArrayList<>();
+ private final List<ActiveMQServerPlugin> brokerPlugins = new CopyOnWriteArrayList<>();
+
private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>();
protected List<ConnectorServiceConfiguration> connectorServiceConfigurations = new ArrayList<>();
@@ -1321,6 +1325,26 @@ public class ConfigurationImpl implements Configuration, Serializable {
}
@Override
+ public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
+ brokerPlugins.addAll(plugins);
+ }
+
+ @Override
+ public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
+ brokerPlugins.add(plugin);
+ }
+
+ @Override
+ public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
+ brokerPlugins.remove(plugin);
+ }
+
+ @Override
+ public List<ActiveMQServerPlugin> getBrokerPlugins() {
+ return brokerPlugins;
+ }
+
+ @Override
public File getBrokerInstance() {
if (artemisInstance != null) {
return artemisInstance;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 2ef7657..a927768 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -783,7 +783,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
} else {
try {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
processRoute(message, context, direct);
+ final RoutingStatus finalResult = result;
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct,
+ rejectDuplicates, finalResult) : null);
} catch (ActiveMQAddressFullException e) {
if (startedTX.get()) {
context.getTransaction().rollback();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index e0e5b52..7c9c675 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -514,6 +514,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
}
ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null);
if (logger.isTraceEnabled()) {
logger.trace("Connection created " + connection);
@@ -534,8 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
ConnectionEntry conn = connections.get(connectionID);
if (conn != null && !conn.connection.isSupportReconnect()) {
- removeConnection(connectionID);
-
+ RemotingConnection removedConnection = removeConnection(connectionID);
+ if (removedConnection != null) {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null);
+ }
conn.connection.fail(new ActiveMQRemoteDisconnectException());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index bfd9aec..e16557f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.server;
-import javax.management.MBeanServer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -24,6 +23,8 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -47,6 +48,8 @@ import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -186,6 +189,18 @@ public interface ActiveMQServer extends ServiceComponent {
*/
void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception;
+ void registerBrokerPlugin(ActiveMQServerPlugin plugin);
+
+ void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
+
+ void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
+
+ List<ActiveMQServerPlugin> getBrokerPlugins();
+
+ void callBrokerPlugins(ActiveMQPluginRunnable pluginRun);
+
+ boolean hasBrokerPlugins();
+
void checkQueueCreationLimit(String username) throws Exception;
ServerSession createSession(String name,
@@ -447,4 +462,5 @@ public interface ActiveMQServer extends ServiceComponent {
void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
String getInternalNamingPrefix();
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index d2219c2..70edb68 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -405,6 +405,8 @@ public final class ClusterManager implements ActiveMQComponent {
return;
}
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeployBridge(config) : null);
+
Queue queue = (Queue) binding.getBindable();
ServerLocatorInternal serverLocator;
@@ -478,6 +480,7 @@ public final class ClusterManager implements ActiveMQComponent {
bridge.start();
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeployBridge(bridge) : null);
}
public static class IncomingInterceptorLookingForExceptionMessage implements Interceptor {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 8482cb3..06964ee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.impl;
-import javax.management.MBeanServer;
-import javax.security.cert.X509Certificate;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
@@ -48,6 +46,9 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.MBeanServer;
+import javax.security.cert.X509Certificate;
+
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.Pair;
@@ -144,6 +145,8 @@ import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
@@ -1309,10 +1312,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
checkSessionLimit(validatedUser);
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
+ autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
+
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes);
sessions.put(name, session);
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateSession(session) : null);
+
return session;
}
@@ -1705,6 +1713,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return;
}
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
+ removeConsumers, autoDeleteAddress) : null);
+
addressSettingsRepository.clearCache();
Binding binding = postOffice.getBinding(queueName);
@@ -1743,6 +1754,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
callPostQueueDeletionCallbacks(address, queueName);
+
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
+ removeConsumers, autoDeleteAddress) : null);
}
@Override
@@ -1808,6 +1822,38 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
+ configuration.registerBrokerPlugins(plugins);
+ }
+
+ @Override
+ public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
+ configuration.registerBrokerPlugin(plugin);
+ }
+
+ @Override
+ public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
+ configuration.unRegisterBrokerPlugin(plugin);
+ }
+
+ @Override
+ public List<ActiveMQServerPlugin> getBrokerPlugins() {
+ return configuration.getBrokerPlugins();
+ }
+
+ @Override
+ public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) {
+ if (pluginRun != null) {
+ getBrokerPlugins().forEach(plugin -> pluginRun.run(plugin));
+ }
+ }
+
+ @Override
+ public boolean hasBrokerPlugins() {
+ return !getBrokerPlugins().isEmpty();
+ }
+
+ @Override
public ExecutorFactory getExecutorFactory() {
return executorFactory;
}
@@ -2103,7 +2149,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
securityStore = new SecurityStoreImpl(securityRepository, securityManager, configuration.getSecurityInvalidationInterval(), configuration.isSecurityEnabled(), configuration.getClusterUser(), configuration.getClusterPassword(), managementService);
- queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager);
+ queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager, this);
pagingManager = createPagingManager();
@@ -2508,6 +2554,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).build();
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
+
final Queue queue = queueFactory.createQueueWith(queueConfig);
if (transientQueue) {
@@ -2550,6 +2598,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callPostQueueCreationCallbacks(queue.getName());
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
+
return queue;
}
@@ -2763,4 +2813,5 @@ public class ActiveMQServerImpl implements ActiveMQServer {
deployAddressesFromConfiguration(config);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index ceec92c..8370839 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -63,8 +64,9 @@ public class LastValueQueue extends QueueImpl {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final Executor executor) {
- super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ final Executor executor,
+ final ActiveMQServer server) {
+ super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 9258a07..3d8ceb1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
@@ -49,17 +50,19 @@ public class QueueFactoryImpl implements QueueFactory {
protected final ExecutorFactory executorFactory;
+ protected final ActiveMQServer server;
+
public QueueFactoryImpl(final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final StorageManager storageManager) {
- this.addressSettingsRepository = addressSettingsRepository;
+ final StorageManager storageManager,
+ final ActiveMQServer server) {
+ this.addressSettingsRepository = addressSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
-
this.storageManager = storageManager;
-
this.executorFactory = executorFactory;
+ this.server = server;
}
@Override
@@ -72,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory {
final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
final Queue queue;
if (addressSettings.isLastValueQueue()) {
- queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
} else {
- queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
}
return queue;
}
@@ -98,9 +101,9 @@ public class QueueFactoryImpl implements QueueFactory {
Queue queue;
if (addressSettings.isLastValueQueue()) {
- queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
} else {
- queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
}
return queue;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a62ae79..c2cfdef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@@ -69,7 +70,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
@@ -198,6 +198,8 @@ public class QueueImpl implements Queue {
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
+ private final ActiveMQServer server;
+
private final ScheduledExecutorService scheduledExecutor;
private final SimpleString address;
@@ -330,8 +332,9 @@ public class QueueImpl implements Queue {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final Executor executor) {
- this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ final Executor executor,
+ final ActiveMQServer server) {
+ this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
}
public QueueImpl(final long id,
@@ -347,8 +350,9 @@ public class QueueImpl implements Queue {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final Executor executor) {
- this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ final Executor executor,
+ final ActiveMQServer server) {
+ this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
}
public QueueImpl(final long id,
@@ -367,7 +371,8 @@ public class QueueImpl implements Queue {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final Executor executor) {
+ final Executor executor,
+ final ActiveMQServer server) {
this.id = id;
@@ -401,6 +406,8 @@ public class QueueImpl implements Queue {
this.scheduledExecutor = scheduledExecutor;
+ this.server = server;
+
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
if (addressSettingsRepository != null) {
@@ -1078,6 +1085,9 @@ public class QueueImpl implements Queue {
messagesAcknowledged++;
}
+ if (server != null) {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null);
+ }
}
@Override
@@ -1112,6 +1122,10 @@ public class QueueImpl implements Queue {
} else {
messagesAcknowledged++;
}
+
+ if (server != null) {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null);
+ }
}
@Override
@@ -1195,6 +1209,11 @@ public class QueueImpl implements Queue {
}
acknowledge(ref, AckReason.EXPIRED);
}
+
+ if (server != null) {
+ final SimpleString expiryAddress = messageExpiryAddress;
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageExpired(ref, expiryAddress) : null);
+ }
}
private SimpleString expiryAddressFromMessageAddress(MessageReference ref) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 9e33602..af8524d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -416,6 +416,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
try {
Message message = reference.getMessage();
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeliver(reference) : null);
+
if (message.isLargeMessage() && supportLargeMessage) {
if (largeMessageDeliverer == null) {
// This can't really happen as handle had already crated the deliverer
@@ -432,6 +434,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} finally {
lockDelivery.readLock().unlock();
callback.afterDelivery();
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeliver(reference) : null);
}
}
@@ -447,6 +450,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
}
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseConsumer(this, failed) : null);
+
setStarted(false);
LargeMessageDeliverer del = largeMessageDeliverer;
@@ -501,6 +506,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
managementService.sendNotification(notification);
}
+
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseConsumer(this, failed) : null);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index edd7afc..7245843 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -16,10 +16,8 @@
*/
package org.apache.activemq.artemis.core.server.impl;
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonObjectBuilder;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.Xid;
+import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -31,6 +29,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonObjectBuilder;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@@ -89,8 +92,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.jboss.logging.Logger;
-import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
-
/**
* Server side Session implementation
*/
@@ -345,6 +346,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected void doClose(final boolean failed) throws Exception {
synchronized (this) {
+ if (!closed) {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null);
+ }
this.setStarted(false);
if (closed)
return;
@@ -395,6 +399,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
closed = true;
+
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseSession(this, failed) : null);
}
}
@@ -444,9 +450,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
Filter filter = FilterImpl.createFilter(filterString);
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, unPrefixedQueueName,
+ filterString, browseOnly, supportLargeMessage) : null);
+
ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
consumers.put(consumer.getID(), consumer);
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConsumer(consumer) : null);
+
if (!browseOnly) {
TypedProperties props = new TypedProperties();
@@ -1290,6 +1301,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(tx, message, direct, noAutoCreateQueue) : null);
+
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
@@ -1333,10 +1346,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (message.getAddressSimpleString().equals(managementAddress)) {
// It's a management message
- handleManagementMessage(tx, message, direct);
+ result = handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue);
}
+
+ final RoutingStatus finalResult = result;
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(tx, message, direct, noAutoCreateQueue, finalResult) : null);
+
return result;
}
@@ -1367,10 +1384,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void addMetaData(String key, String data) {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null);
if (metaData == null) {
metaData = new HashMap<>();
}
metaData.put(key, data);
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSessionMetadataAdded(this, key, data) : null);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
new file mode 100644
index 0000000..bc85475
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.plugin;
+
+public interface ActiveMQPluginRunnable {
+
+ void run(ActiveMQServerPlugin plugin);
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
new file mode 100644
index 0000000..95296f0
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.plugin;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.cluster.Bridge;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+
+
+public interface ActiveMQServerPlugin {
+
+
+ /**
+ * A connection has been created.
+ *
+ * @param connection The newly created connection
+ */
+ default void afterCreateConnection(RemotingConnection connection) {
+
+ }
+
+ /**
+ * A connection has been destroyed.
+ *
+ * @param connection
+ */
+ default void afterDestroyConnection(RemotingConnection connection) {
+
+ }
+
+ /**
+ * Before a session is created.
+ *
+ * @param name
+ * @param username
+ * @param minLargeMessageSize
+ * @param connection
+ * @param autoCommitSends
+ * @param autoCommitAcks
+ * @param preAcknowledge
+ * @param xa
+ * @param defaultAddress
+ * @param callback
+ * @param autoCreateQueues
+ * @param context
+ * @param prefixes
+ */
+ default void beforeCreateSession(String name, String username, int minLargeMessageSize,
+ RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
+ boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
+ Map<SimpleString, RoutingType> prefixes) {
+
+ }
+
+ /**
+ * After a session has been created.
+ *
+ * @param session The newly created session
+ */
+ default void afterCreateSession(ServerSession session) {
+
+ }
+
+ /**
+ * Before a session is closed
+ *
+ * @param session
+ * @param failed
+ */
+ default void beforeCloseSession(ServerSession session, boolean failed) {
+
+ }
+
+ /**
+ * After a session is closed
+ *
+ * @param session
+ * @param failed
+ */
+ default void afterCloseSession(ServerSession session, boolean failed) {
+
+ }
+
+ /**
+ * Before session metadata is added to the session
+ *
+ * @param session
+ * @param key
+ * @param data
+ */
+ default void beforeSessionMetadataAdded(ServerSession session, String key, String data) {
+
+ }
+
+ /**
+ * After session metadata is added to the session
+ *
+ * @param session
+ * @param key
+ * @param data
+ */
+ default void afterSessionMetadataAdded(ServerSession session, String key, String data) {
+
+ }
+
+ /**
+ * Before a consumer is created
+ *
+ * @param consumerID
+ * @param queueName
+ * @param filterString
+ * @param browseOnly
+ * @param supportLargeMessage
+ */
+ default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
+ boolean browseOnly, boolean supportLargeMessage) {
+
+ }
+
+ /**
+ * After a consumer has been created
+ *
+ * @param consumer the created consumer
+ */
+ default void afterCreateConsumer(ServerConsumer consumer) {
+
+ }
+
+ /**
+ * Before a consumer is closed
+ *
+ * @param consumer
+ * @param failed
+ */
+ default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
+
+ }
+
+ /**
+ * After a consumer is closed
+ *
+ * @param consumer
+ * @param failed
+ */
+ default void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
+
+ }
+
+ /**
+ * Before a queue is created
+ *
+ * @param queueConfig
+ */
+ default void beforeCreateQueue(QueueConfig queueConfig) {
+
+ }
+
+ /**
+ * After a queue has been created
+ *
+ * @param queue The newly created queue
+ */
+ default void afterCreateQueue(Queue queue) {
+
+ }
+
+ /**
+ * Before a queue is destroyed
+ *
+ * @param queueName
+ * @param session
+ * @param checkConsumerCount
+ * @param removeConsumers
+ * @param autoDeleteAddress
+ */
+ default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
+ boolean removeConsumers, boolean autoDeleteAddress) {
+
+ }
+
+ /**
+ * After a queue has been destroyed
+ *
+ * @param queue
+ * @param address
+ * @param session
+ * @param checkConsumerCount
+ * @param removeConsumers
+ * @param autoDeleteAddress
+ */
+ default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
+ boolean removeConsumers, boolean autoDeleteAddress) {
+
+ }
+
+ /**
+ * Before a message is sent
+ *
+ * @param tx
+ * @param message
+ * @param direct
+ * @param noAutoCreateQueue
+ */
+ default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
+
+ }
+
+ /**
+ * After a message is sent
+ *
+ * @param tx
+ * @param message
+ * @param direct
+ * @param noAutoCreateQueue
+ * @param result
+ */
+ default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
+ RoutingStatus result) {
+
+ }
+
+ /**
+ * Before a message is routed
+ *
+ * @param message
+ * @param context
+ * @param direct
+ * @param rejectDuplicates
+ */
+ default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
+
+ }
+
+ /**
+ * After a message is routed
+ *
+ * @param message
+ * @param context
+ * @param direct
+ * @param rejectDuplicates
+ * @param result
+ */
+ default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
+ RoutingStatus result) {
+
+ }
+
+ /**
+ * Before a message is delivered to a client consumer
+ *
+ * @param reference
+ */
+ default void beforeDeliver(MessageReference reference) {
+
+ }
+
+ /**
+ * After a message is delivered to a client consumer
+ *
+ * @param reference
+ */
+ default void afterDeliver(MessageReference reference) {
+
+ }
+
+ /**
+ * A message has been expired
+ *
+ * @param message The expired message
+ * @param messageExpiryAddress The message expiry address if exists
+ */
+ default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
+
+ }
+
+ /**
+ * A message has been acknowledged
+ *
+ * @param ref The acked message
+ * @param reason The ack reason
+ */
+ default void messageAcknowledged(MessageReference ref, AckReason reason) {
+
+ }
+
+ /**
+ * Before a bridge is deployed
+ *
+ * @param config The bridge configuration
+ */
+ default void beforeDeployBridge(BridgeConfiguration config) {
+
+ }
+
+ /**
+ * After a bridge has been deployed
+ *
+ * @param bridge The newly deployed bridge
+ */
+ default void afterDeployBridge(Bridge bridge) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 8d27895..60b9b74 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -166,6 +166,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
// Add optional security for tests that need it
configureBrokerSecurity(server);
+ // Add extra configuration
+ addConfiguration(server);
+
server.start();
// Prepare all addresses and queues for client tests.
@@ -174,6 +177,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
return server;
}
+ protected void addConfiguration(ActiveMQServer server) {
+
+ }
+
protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 201a96b..da695ca 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
-import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
import java.util.LinkedList;
import java.util.Map;
@@ -26,10 +25,12 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
-
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -53,7 +54,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
@@ -234,8 +234,10 @@ public class HangConsumerTest extends ActiveMQTestBase {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final Executor executor) {
- super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ final Executor executor, final ActiveMQServer server) {
+ super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode,
+ maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager,
+ addressSettingsRepository, executor, server);
}
@Override
@@ -256,13 +258,18 @@ public class HangConsumerTest extends ActiveMQTestBase {
LocalFactory(final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final StorageManager storageManager) {
- super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager);
+ final StorageManager storageManager, final ActiveMQServer server) {
+ super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager, server);
}
@Override
public Queue createQueueWith(final QueueConfig config) {
- queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(),
+ config.user(), config.pageSubscription(), config.isDurable(),
+ config.isTemporary(), config.isAutoCreated(), config.deliveryMode(),
+ config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor,
+ postOffice, storageManager, addressSettingsRepository,
+ executorFactory.getExecutor(), server);
return queue;
}
@@ -277,13 +284,18 @@ public class HangConsumerTest extends ActiveMQTestBase {
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
- queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable,
+ temporary, autoCreated, RoutingType.MULTICAST, null, null,
+ scheduledExecutor, postOffice, storageManager, addressSettingsRepository,
+ executorFactory.getExecutor(), server);
return queue;
}
}
- LocalFactory queueFactory = new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(), server.getAddressSettingsRepository(), server.getStorageManager());
+ LocalFactory queueFactory =
+ new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(),
+ server.getAddressSettingsRepository(), server.getStorageManager(), server);
queueFactory.setPostOffice(server.getPostOffice());
@@ -359,7 +371,10 @@ public class HangConsumerTest extends ActiveMQTestBase {
long txID = server.getStorageManager().generateID();
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
- LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
+ LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE,
+ new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false,
+ false, null, null, null, null, null, null),
+ server.getNodeID());
server.getStorageManager().addQueueBinding(txID, newBinding);
server.getStorageManager().commitBindings(txID);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
index 540baf6..44015e1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
@@ -16,12 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
@@ -29,9 +23,17 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -52,7 +54,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -518,7 +519,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
StorageManager storageManager,
HierarchicalRepository<AddressSettings> addressSettingsRepository,
Executor executor) {
- super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor,
+ postOffice, storageManager, addressSettingsRepository, executor, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
index f8094a1..63743ed 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
@@ -16,14 +16,15 @@
*/
package org.apache.activemq.artemis.tests.integration.jms.client;
+import java.util.List;
+import java.util.Map;
+
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
-import java.util.List;
-import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
@@ -81,7 +82,12 @@ public class TopicCleanupTest extends JMSTestBase {
for (int i = 0; i < 100; i++) {
long txid = storage.generateID();
- final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"), SimpleString.toSimpleString("topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
+ final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"),
+ SimpleString.toSimpleString("topic"),
+ FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null,
+ true, false, false, server.getScheduledPool(), server.getPostOffice(),
+ storage, server.getAddressSettingsRepository(),
+ server.getExecutorFactory().getExecutor(), server);
LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
index 056891a..bd8cfd8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
@@ -45,7 +45,7 @@ public class JmsResourceProvider {
/**
* Creates a connection.
*
- * @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory)
+ * @see org.apache.activemq.test.JmsResourceProvider#afterCreateConnection(javax.jms.ConnectionFactory)
*/
public Connection createConnection(ConnectionFactory cf) throws JMSException {
Connection connection = cf.createConnection();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
new file mode 100644
index 0000000..d918b27
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ */
+public class AmqpPluginTest extends AmqpClientTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(AmqpPluginTest.class);
+
+ private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+ private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+
+ @Override
+ protected void addConfiguration(ActiveMQServer server) {
+ super.addConfiguration(server);
+ server.registerBrokerPlugin(verifier);
+ }
+
+ @Test(timeout = 60000)
+ public void testQueueReceiverReadAndAckMessage() throws Exception {
+ sendMessages(getQueueName(), 1);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+ Queue queueView = getProxyToQueue(getQueueName());
+ assertEquals(1, queueView.getMessageCount());
+
+ receiver.flow(1);
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+ receiver.close();
+ connection.close();
+
+ verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+ verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
+ BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION,
+ BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+ BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND,
+ AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
+ }
+
+ @Override
+ public void sendMessages(String destinationName, int count) throws Exception {
+ sendMessages(destinationName, count, null);
+ }
+
+ @Override
+ public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(destinationName);
+
+ for (int i = 0; i < count; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("MessageID:" + i);
+ if (routingType != null) {
+ message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
+ }
+ sender.send(message);
+ }
+ } finally {
+ connection.close();
+ }
+ }
+}
[3/3] activemq-artemis git commit: This closes #1242
Posted by cl...@apache.org.
This closes #1242
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5391d42e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5391d42e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5391d42e
Branch: refs/heads/master
Commit: 5391d42e474a323b84bcd0e2dd7162bbeaffeefe
Parents: 303d97c 1e1ede8
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed May 3 12:17:01 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed May 3 12:17:01 2017 -0400
----------------------------------------------------------------------
.../artemis/core/config/Configuration.java | 21 ++
.../core/config/impl/ConfigurationImpl.java | 24 ++
.../core/postoffice/impl/PostOfficeImpl.java | 4 +
.../server/impl/RemotingServiceImpl.java | 7 +-
.../artemis/core/server/ActiveMQServer.java | 18 +-
.../core/server/cluster/ClusterManager.java | 3 +
.../core/server/impl/ActiveMQServerImpl.java | 57 +++-
.../core/server/impl/LastValueQueue.java | 6 +-
.../core/server/impl/QueueFactoryImpl.java | 19 +-
.../artemis/core/server/impl/QueueImpl.java | 31 +-
.../core/server/impl/ServerConsumerImpl.java | 7 +
.../core/server/impl/ServerSessionImpl.java | 33 +-
.../server/plugin/ActiveMQPluginRunnable.java | 24 ++
.../server/plugin/ActiveMQServerPlugin.java | 336 +++++++++++++++++++
.../integration/amqp/AmqpClientTestSupport.java | 7 +
.../integration/client/HangConsumerTest.java | 37 +-
.../client/InterruptedLargeMessageTest.java | 18 +-
.../jms/client/TopicCleanupTest.java | 12 +-
.../openwire/amq/JmsResourceProvider.java | 2 +-
.../integration/plugin/AmqpPluginTest.java | 131 ++++++++
.../integration/plugin/CorePluginTest.java | 257 ++++++++++++++
.../plugin/MethodCalledVerifier.java | 276 +++++++++++++++
.../integration/plugin/MqttPluginTest.java | 132 ++++++++
.../integration/plugin/OpenwirePluginTest.java | 109 ++++++
.../integration/plugin/StompPluginTest.java | 126 +++++++
.../timing/core/server/impl/QueueImplTest.java | 15 +-
.../unit/core/server/impl/QueueImplTest.java | 3 +-
.../server/impl/fakes/FakeQueueFactory.java | 7 +-
28 files changed, 1664 insertions(+), 58 deletions(-)
----------------------------------------------------------------------