You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/05/03 16:17:12 UTC
[2/3] activemq-artemis git commit: ARTEMIS-898 - Adding Plugin Support
ARTEMIS-898 - Adding Plugin Support
Adding a new ActievMQServerPlugin interface to support adding custom
behavior to the broker at certain events such as connection or session
creation.
https://issues.apache.org/jira/browse/ARTEMIS-898
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1e1ede84
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1e1ede84
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1e1ede84
Branch: refs/heads/master
Commit: 1e1ede84c0483099f27741bc046ef95c08e1d090
Parents: 303d97c
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue May 2 09:46:17 2017 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed May 3 11:21:32 2017 -0400
----------------------------------------------------------------------
.../artemis/core/config/Configuration.java | 21 ++
.../core/config/impl/ConfigurationImpl.java | 24 ++
.../core/postoffice/impl/PostOfficeImpl.java | 4 +
.../server/impl/RemotingServiceImpl.java | 7 +-
.../artemis/core/server/ActiveMQServer.java | 18 +-
.../core/server/cluster/ClusterManager.java | 3 +
.../core/server/impl/ActiveMQServerImpl.java | 57 +++-
.../core/server/impl/LastValueQueue.java | 6 +-
.../core/server/impl/QueueFactoryImpl.java | 19 +-
.../artemis/core/server/impl/QueueImpl.java | 31 +-
.../core/server/impl/ServerConsumerImpl.java | 7 +
.../core/server/impl/ServerSessionImpl.java | 33 +-
.../server/plugin/ActiveMQPluginRunnable.java | 24 ++
.../server/plugin/ActiveMQServerPlugin.java | 336 +++++++++++++++++++
.../integration/amqp/AmqpClientTestSupport.java | 7 +
.../integration/client/HangConsumerTest.java | 37 +-
.../client/InterruptedLargeMessageTest.java | 18 +-
.../jms/client/TopicCleanupTest.java | 12 +-
.../openwire/amq/JmsResourceProvider.java | 2 +-
.../integration/plugin/AmqpPluginTest.java | 131 ++++++++
.../integration/plugin/CorePluginTest.java | 257 ++++++++++++++
.../plugin/MethodCalledVerifier.java | 276 +++++++++++++++
.../integration/plugin/MqttPluginTest.java | 132 ++++++++
.../integration/plugin/OpenwirePluginTest.java | 109 ++++++
.../integration/plugin/StompPluginTest.java | 126 +++++++
.../timing/core/server/impl/QueueImplTest.java | 15 +-
.../unit/core/server/impl/QueueImplTest.java | 3 +-
.../server/impl/fakes/FakeQueueFactory.java | 7 +-
28 files changed, 1664 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 7dfb1a5..7da5b02 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
@@ -1081,4 +1082,24 @@ public interface Configuration {
Configuration setNetworkCheckPing6Command(String command);
String getInternalNamingPrefix();
+
+ /**
+ * @param plugins
+ */
+ void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
+
+ /**
+ * @param plugin
+ */
+ void registerBrokerPlugin(ActiveMQServerPlugin plugin);
+
+ /**
+ * @param plugin
+ */
+ void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
+
+ /**
+ * @return
+ */
+ List<ActiveMQServerPlugin> getBrokerPlugins();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 2a538ca..8edeb5b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@@ -63,6 +64,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.utils.Env;
@@ -232,6 +234,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private List<SecuritySettingPlugin> securitySettingPlugins = new ArrayList<>();
+ private final List<ActiveMQServerPlugin> brokerPlugins = new CopyOnWriteArrayList<>();
+
private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>();
protected List<ConnectorServiceConfiguration> connectorServiceConfigurations = new ArrayList<>();
@@ -1321,6 +1325,26 @@ public class ConfigurationImpl implements Configuration, Serializable {
}
@Override
+ public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
+ brokerPlugins.addAll(plugins);
+ }
+
+ @Override
+ public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
+ brokerPlugins.add(plugin);
+ }
+
+ @Override
+ public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
+ brokerPlugins.remove(plugin);
+ }
+
+ @Override
+ public List<ActiveMQServerPlugin> getBrokerPlugins() {
+ return brokerPlugins;
+ }
+
+ @Override
public File getBrokerInstance() {
if (artemisInstance != null) {
return artemisInstance;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 2ef7657..a927768 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -783,7 +783,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
} else {
try {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
processRoute(message, context, direct);
+ final RoutingStatus finalResult = result;
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct,
+ rejectDuplicates, finalResult) : null);
} catch (ActiveMQAddressFullException e) {
if (startedTX.get()) {
context.getTransaction().rollback();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index e0e5b52..7c9c675 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -514,6 +514,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
}
ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null);
if (logger.isTraceEnabled()) {
logger.trace("Connection created " + connection);
@@ -534,8 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
ConnectionEntry conn = connections.get(connectionID);
if (conn != null && !conn.connection.isSupportReconnect()) {
- removeConnection(connectionID);
-
+ RemotingConnection removedConnection = removeConnection(connectionID);
+ if (removedConnection != null) {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null);
+ }
conn.connection.fail(new ActiveMQRemoteDisconnectException());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index bfd9aec..e16557f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.server;
-import javax.management.MBeanServer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -24,6 +23,8 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -47,6 +48,8 @@ import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -186,6 +189,18 @@ public interface ActiveMQServer extends ServiceComponent {
*/
void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception;
+ void registerBrokerPlugin(ActiveMQServerPlugin plugin);
+
+ void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
+
+ void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
+
+ List<ActiveMQServerPlugin> getBrokerPlugins();
+
+ void callBrokerPlugins(ActiveMQPluginRunnable pluginRun);
+
+ boolean hasBrokerPlugins();
+
void checkQueueCreationLimit(String username) throws Exception;
ServerSession createSession(String name,
@@ -447,4 +462,5 @@ public interface ActiveMQServer extends ServiceComponent {
void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
String getInternalNamingPrefix();
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index d2219c2..70edb68 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -405,6 +405,8 @@ public final class ClusterManager implements ActiveMQComponent {
return;
}
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeployBridge(config) : null);
+
Queue queue = (Queue) binding.getBindable();
ServerLocatorInternal serverLocator;
@@ -478,6 +480,7 @@ public final class ClusterManager implements ActiveMQComponent {
bridge.start();
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeployBridge(bridge) : null);
}
public static class IncomingInterceptorLookingForExceptionMessage implements Interceptor {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 8482cb3..06964ee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.impl;
-import javax.management.MBeanServer;
-import javax.security.cert.X509Certificate;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
@@ -48,6 +46,9 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.MBeanServer;
+import javax.security.cert.X509Certificate;
+
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.Pair;
@@ -144,6 +145,8 @@ import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
@@ -1309,10 +1312,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
checkSessionLimit(validatedUser);
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
+ autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
+
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes);
sessions.put(name, session);
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateSession(session) : null);
+
return session;
}
@@ -1705,6 +1713,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return;
}
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
+ removeConsumers, autoDeleteAddress) : null);
+
addressSettingsRepository.clearCache();
Binding binding = postOffice.getBinding(queueName);
@@ -1743,6 +1754,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
callPostQueueDeletionCallbacks(address, queueName);
+
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
+ removeConsumers, autoDeleteAddress) : null);
}
@Override
@@ -1808,6 +1822,38 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
+ configuration.registerBrokerPlugins(plugins);
+ }
+
+ @Override
+ public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
+ configuration.registerBrokerPlugin(plugin);
+ }
+
+ @Override
+ public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
+ configuration.unRegisterBrokerPlugin(plugin);
+ }
+
+ @Override
+ public List<ActiveMQServerPlugin> getBrokerPlugins() {
+ return configuration.getBrokerPlugins();
+ }
+
+ @Override
+ public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) {
+ if (pluginRun != null) {
+ getBrokerPlugins().forEach(plugin -> pluginRun.run(plugin));
+ }
+ }
+
+ @Override
+ public boolean hasBrokerPlugins() {
+ return !getBrokerPlugins().isEmpty();
+ }
+
+ @Override
public ExecutorFactory getExecutorFactory() {
return executorFactory;
}
@@ -2103,7 +2149,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
securityStore = new SecurityStoreImpl(securityRepository, securityManager, configuration.getSecurityInvalidationInterval(), configuration.isSecurityEnabled(), configuration.getClusterUser(), configuration.getClusterPassword(), managementService);
- queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager);
+ queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager, this);
pagingManager = createPagingManager();
@@ -2508,6 +2554,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).build();
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
+
final Queue queue = queueFactory.createQueueWith(queueConfig);
if (transientQueue) {
@@ -2550,6 +2598,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callPostQueueCreationCallbacks(queue.getName());
+ callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
+
return queue;
}
@@ -2763,4 +2813,5 @@ public class ActiveMQServerImpl implements ActiveMQServer {
deployAddressesFromConfiguration(config);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index ceec92c..8370839 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -63,8 +64,9 @@ public class LastValueQueue extends QueueImpl {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final Executor executor) {
- super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ final Executor executor,
+ final ActiveMQServer server) {
+ super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 9258a07..3d8ceb1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
@@ -49,17 +50,19 @@ public class QueueFactoryImpl implements QueueFactory {
protected final ExecutorFactory executorFactory;
+ protected final ActiveMQServer server;
+
public QueueFactoryImpl(final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final StorageManager storageManager) {
- this.addressSettingsRepository = addressSettingsRepository;
+ final StorageManager storageManager,
+ final ActiveMQServer server) {
+ this.addressSettingsRepository = addressSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
-
this.storageManager = storageManager;
-
this.executorFactory = executorFactory;
+ this.server = server;
}
@Override
@@ -72,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory {
final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
final Queue queue;
if (addressSettings.isLastValueQueue()) {
- queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
} else {
- queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
}
return queue;
}
@@ -98,9 +101,9 @@ public class QueueFactoryImpl implements QueueFactory {
Queue queue;
if (addressSettings.isLastValueQueue()) {
- queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
} else {
- queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
}
return queue;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a62ae79..c2cfdef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@@ -69,7 +70,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
@@ -198,6 +198,8 @@ public class QueueImpl implements Queue {
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
+ private final ActiveMQServer server;
+
private final ScheduledExecutorService scheduledExecutor;
private final SimpleString address;
@@ -330,8 +332,9 @@ public class QueueImpl implements Queue {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final Executor executor) {
- this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ final Executor executor,
+ final ActiveMQServer server) {
+ this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
}
public QueueImpl(final long id,
@@ -347,8 +350,9 @@ public class QueueImpl implements Queue {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final Executor executor) {
- this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ final Executor executor,
+ final ActiveMQServer server) {
+ this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
}
public QueueImpl(final long id,
@@ -367,7 +371,8 @@ public class QueueImpl implements Queue {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final Executor executor) {
+ final Executor executor,
+ final ActiveMQServer server) {
this.id = id;
@@ -401,6 +406,8 @@ public class QueueImpl implements Queue {
this.scheduledExecutor = scheduledExecutor;
+ this.server = server;
+
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
if (addressSettingsRepository != null) {
@@ -1078,6 +1085,9 @@ public class QueueImpl implements Queue {
messagesAcknowledged++;
}
+ if (server != null) {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null);
+ }
}
@Override
@@ -1112,6 +1122,10 @@ public class QueueImpl implements Queue {
} else {
messagesAcknowledged++;
}
+
+ if (server != null) {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null);
+ }
}
@Override
@@ -1195,6 +1209,11 @@ public class QueueImpl implements Queue {
}
acknowledge(ref, AckReason.EXPIRED);
}
+
+ if (server != null) {
+ final SimpleString expiryAddress = messageExpiryAddress;
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageExpired(ref, expiryAddress) : null);
+ }
}
private SimpleString expiryAddressFromMessageAddress(MessageReference ref) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 9e33602..af8524d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -416,6 +416,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
try {
Message message = reference.getMessage();
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeliver(reference) : null);
+
if (message.isLargeMessage() && supportLargeMessage) {
if (largeMessageDeliverer == null) {
// This can't really happen as handle had already crated the deliverer
@@ -432,6 +434,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} finally {
lockDelivery.readLock().unlock();
callback.afterDelivery();
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeliver(reference) : null);
}
}
@@ -447,6 +450,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
}
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseConsumer(this, failed) : null);
+
setStarted(false);
LargeMessageDeliverer del = largeMessageDeliverer;
@@ -501,6 +506,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
managementService.sendNotification(notification);
}
+
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseConsumer(this, failed) : null);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index edd7afc..7245843 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -16,10 +16,8 @@
*/
package org.apache.activemq.artemis.core.server.impl;
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonObjectBuilder;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.Xid;
+import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -31,6 +29,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonObjectBuilder;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@@ -89,8 +92,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.jboss.logging.Logger;
-import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
-
/**
* Server side Session implementation
*/
@@ -345,6 +346,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected void doClose(final boolean failed) throws Exception {
synchronized (this) {
+ if (!closed) {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null);
+ }
this.setStarted(false);
if (closed)
return;
@@ -395,6 +399,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
closed = true;
+
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseSession(this, failed) : null);
}
}
@@ -444,9 +450,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
Filter filter = FilterImpl.createFilter(filterString);
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, unPrefixedQueueName,
+ filterString, browseOnly, supportLargeMessage) : null);
+
ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
consumers.put(consumer.getID(), consumer);
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConsumer(consumer) : null);
+
if (!browseOnly) {
TypedProperties props = new TypedProperties();
@@ -1290,6 +1301,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(tx, message, direct, noAutoCreateQueue) : null);
+
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
@@ -1333,10 +1346,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (message.getAddressSimpleString().equals(managementAddress)) {
// It's a management message
- handleManagementMessage(tx, message, direct);
+ result = handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue);
}
+
+ final RoutingStatus finalResult = result;
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(tx, message, direct, noAutoCreateQueue, finalResult) : null);
+
return result;
}
@@ -1367,10 +1384,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void addMetaData(String key, String data) {
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null);
if (metaData == null) {
metaData = new HashMap<>();
}
metaData.put(key, data);
+ server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSessionMetadataAdded(this, key, data) : null);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
new file mode 100644
index 0000000..bc85475
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.plugin;
+
+public interface ActiveMQPluginRunnable {
+
+ void run(ActiveMQServerPlugin plugin);
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
new file mode 100644
index 0000000..95296f0
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.plugin;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.cluster.Bridge;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+
+
+public interface ActiveMQServerPlugin {
+
+
+ /**
+ * A connection has been created.
+ *
+ * @param connection The newly created connection
+ */
+ default void afterCreateConnection(RemotingConnection connection) {
+
+ }
+
+ /**
+ * A connection has been destroyed.
+ *
+ * @param connection
+ */
+ default void afterDestroyConnection(RemotingConnection connection) {
+
+ }
+
+ /**
+ * Before a session is created.
+ *
+ * @param name
+ * @param username
+ * @param minLargeMessageSize
+ * @param connection
+ * @param autoCommitSends
+ * @param autoCommitAcks
+ * @param preAcknowledge
+ * @param xa
+ * @param defaultAddress
+ * @param callback
+ * @param autoCreateQueues
+ * @param context
+ * @param prefixes
+ */
+ default void beforeCreateSession(String name, String username, int minLargeMessageSize,
+ RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
+ boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
+ Map<SimpleString, RoutingType> prefixes) {
+
+ }
+
+ /**
+ * After a session has been created.
+ *
+ * @param session The newly created session
+ */
+ default void afterCreateSession(ServerSession session) {
+
+ }
+
+ /**
+ * Before a session is closed
+ *
+ * @param session
+ * @param failed
+ */
+ default void beforeCloseSession(ServerSession session, boolean failed) {
+
+ }
+
+ /**
+ * After a session is closed
+ *
+ * @param session
+ * @param failed
+ */
+ default void afterCloseSession(ServerSession session, boolean failed) {
+
+ }
+
+ /**
+ * Before session metadata is added to the session
+ *
+ * @param session
+ * @param key
+ * @param data
+ */
+ default void beforeSessionMetadataAdded(ServerSession session, String key, String data) {
+
+ }
+
+ /**
+ * After session metadata is added to the session
+ *
+ * @param session
+ * @param key
+ * @param data
+ */
+ default void afterSessionMetadataAdded(ServerSession session, String key, String data) {
+
+ }
+
+ /**
+ * Before a consumer is created
+ *
+ * @param consumerID
+ * @param queueName
+ * @param filterString
+ * @param browseOnly
+ * @param supportLargeMessage
+ */
+ default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
+ boolean browseOnly, boolean supportLargeMessage) {
+
+ }
+
+ /**
+ * After a consumer has been created
+ *
+ * @param consumer the created consumer
+ */
+ default void afterCreateConsumer(ServerConsumer consumer) {
+
+ }
+
+ /**
+ * Before a consumer is closed
+ *
+ * @param consumer
+ * @param failed
+ */
+ default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
+
+ }
+
+ /**
+ * After a consumer is closed
+ *
+ * @param consumer
+ * @param failed
+ */
+ default void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
+
+ }
+
+ /**
+ * Before a queue is created
+ *
+ * @param queueConfig
+ */
+ default void beforeCreateQueue(QueueConfig queueConfig) {
+
+ }
+
+ /**
+ * After a queue has been created
+ *
+ * @param queue The newly created queue
+ */
+ default void afterCreateQueue(Queue queue) {
+
+ }
+
+ /**
+ * Before a queue is destroyed
+ *
+ * @param queueName
+ * @param session
+ * @param checkConsumerCount
+ * @param removeConsumers
+ * @param autoDeleteAddress
+ */
+ default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
+ boolean removeConsumers, boolean autoDeleteAddress) {
+
+ }
+
+ /**
+ * After a queue has been destroyed
+ *
+ * @param queue
+ * @param address
+ * @param session
+ * @param checkConsumerCount
+ * @param removeConsumers
+ * @param autoDeleteAddress
+ */
+ default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
+ boolean removeConsumers, boolean autoDeleteAddress) {
+
+ }
+
+ /**
+ * Before a message is sent
+ *
+ * @param tx
+ * @param message
+ * @param direct
+ * @param noAutoCreateQueue
+ */
+ default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
+
+ }
+
+ /**
+ * After a message is sent
+ *
+ * @param tx
+ * @param message
+ * @param direct
+ * @param noAutoCreateQueue
+ * @param result
+ */
+ default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
+ RoutingStatus result) {
+
+ }
+
+ /**
+ * Before a message is routed
+ *
+ * @param message
+ * @param context
+ * @param direct
+ * @param rejectDuplicates
+ */
+ default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
+
+ }
+
+ /**
+ * After a message is routed
+ *
+ * @param message
+ * @param context
+ * @param direct
+ * @param rejectDuplicates
+ * @param result
+ */
+ default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
+ RoutingStatus result) {
+
+ }
+
+ /**
+ * Before a message is delivered to a client consumer
+ *
+ * @param reference
+ */
+ default void beforeDeliver(MessageReference reference) {
+
+ }
+
+ /**
+ * After a message is delivered to a client consumer
+ *
+ * @param reference
+ */
+ default void afterDeliver(MessageReference reference) {
+
+ }
+
+ /**
+ * A message has been expired
+ *
+ * @param message The expired message
+ * @param messageExpiryAddress The message expiry address if exists
+ */
+ default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
+
+ }
+
+ /**
+ * A message has been acknowledged
+ *
+ * @param ref The acked message
+ * @param reason The ack reason
+ */
+ default void messageAcknowledged(MessageReference ref, AckReason reason) {
+
+ }
+
+ /**
+ * Before a bridge is deployed
+ *
+ * @param config The bridge configuration
+ */
+ default void beforeDeployBridge(BridgeConfiguration config) {
+
+ }
+
+ /**
+ * After a bridge has been deployed
+ *
+ * @param bridge The newly deployed bridge
+ */
+ default void afterDeployBridge(Bridge bridge) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 8d27895..60b9b74 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -166,6 +166,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
// Add optional security for tests that need it
configureBrokerSecurity(server);
+ // Add extra configuration
+ addConfiguration(server);
+
server.start();
// Prepare all addresses and queues for client tests.
@@ -174,6 +177,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
return server;
}
+ protected void addConfiguration(ActiveMQServer server) {
+
+ }
+
protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 201a96b..da695ca 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
-import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
import java.util.LinkedList;
import java.util.Map;
@@ -26,10 +25,12 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
-
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -53,7 +54,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
@@ -234,8 +234,10 @@ public class HangConsumerTest extends ActiveMQTestBase {
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final Executor executor) {
- super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ final Executor executor, final ActiveMQServer server) {
+ super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode,
+ maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager,
+ addressSettingsRepository, executor, server);
}
@Override
@@ -256,13 +258,18 @@ public class HangConsumerTest extends ActiveMQTestBase {
LocalFactory(final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final StorageManager storageManager) {
- super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager);
+ final StorageManager storageManager, final ActiveMQServer server) {
+ super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager, server);
}
@Override
public Queue createQueueWith(final QueueConfig config) {
- queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(),
+ config.user(), config.pageSubscription(), config.isDurable(),
+ config.isTemporary(), config.isAutoCreated(), config.deliveryMode(),
+ config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor,
+ postOffice, storageManager, addressSettingsRepository,
+ executorFactory.getExecutor(), server);
return queue;
}
@@ -277,13 +284,18 @@ public class HangConsumerTest extends ActiveMQTestBase {
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
- queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable,
+ temporary, autoCreated, RoutingType.MULTICAST, null, null,
+ scheduledExecutor, postOffice, storageManager, addressSettingsRepository,
+ executorFactory.getExecutor(), server);
return queue;
}
}
- LocalFactory queueFactory = new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(), server.getAddressSettingsRepository(), server.getStorageManager());
+ LocalFactory queueFactory =
+ new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(),
+ server.getAddressSettingsRepository(), server.getStorageManager(), server);
queueFactory.setPostOffice(server.getPostOffice());
@@ -359,7 +371,10 @@ public class HangConsumerTest extends ActiveMQTestBase {
long txID = server.getStorageManager().generateID();
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
- LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
+ LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE,
+ new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false,
+ false, null, null, null, null, null, null),
+ server.getNodeID());
server.getStorageManager().addQueueBinding(txID, newBinding);
server.getStorageManager().commitBindings(txID);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
index 540baf6..44015e1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
@@ -16,12 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
@@ -29,9 +23,17 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -52,7 +54,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -518,7 +519,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
StorageManager storageManager,
HierarchicalRepository<AddressSettings> addressSettingsRepository,
Executor executor) {
- super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor,
+ postOffice, storageManager, addressSettingsRepository, executor, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
index f8094a1..63743ed 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
@@ -16,14 +16,15 @@
*/
package org.apache.activemq.artemis.tests.integration.jms.client;
+import java.util.List;
+import java.util.Map;
+
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
-import java.util.List;
-import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
@@ -81,7 +82,12 @@ public class TopicCleanupTest extends JMSTestBase {
for (int i = 0; i < 100; i++) {
long txid = storage.generateID();
- final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"), SimpleString.toSimpleString("topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
+ final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"),
+ SimpleString.toSimpleString("topic"),
+ FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null,
+ true, false, false, server.getScheduledPool(), server.getPostOffice(),
+ storage, server.getAddressSettingsRepository(),
+ server.getExecutorFactory().getExecutor(), server);
LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
index 056891a..bd8cfd8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
@@ -45,7 +45,7 @@ public class JmsResourceProvider {
/**
* Creates a connection.
*
- * @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory)
+ * @see org.apache.activemq.test.JmsResourceProvider#afterCreateConnection(javax.jms.ConnectionFactory)
*/
public Connection createConnection(ConnectionFactory cf) throws JMSException {
Connection connection = cf.createConnection();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
new file mode 100644
index 0000000..d918b27
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ */
+public class AmqpPluginTest extends AmqpClientTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(AmqpPluginTest.class);
+
+ private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+ private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+
+ @Override
+ protected void addConfiguration(ActiveMQServer server) {
+ super.addConfiguration(server);
+ server.registerBrokerPlugin(verifier);
+ }
+
+ @Test(timeout = 60000)
+ public void testQueueReceiverReadAndAckMessage() throws Exception {
+ sendMessages(getQueueName(), 1);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+ Queue queueView = getProxyToQueue(getQueueName());
+ assertEquals(1, queueView.getMessageCount());
+
+ receiver.flow(1);
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+ receiver.close();
+ connection.close();
+
+ verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+ verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
+ BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION,
+ BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+ BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND,
+ AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
+ }
+
+ @Override
+ public void sendMessages(String destinationName, int count) throws Exception {
+ sendMessages(destinationName, count, null);
+ }
+
+ @Override
+ public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(destinationName);
+
+ for (int i = 0; i < count; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("MessageID:" + i);
+ if (routingType != null) {
+ message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
+ }
+ sender.send(message);
+ }
+ } finally {
+ connection.close();
+ }
+ }
+}