You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/08/09 12:38:21 UTC
[2/2] activemq-artemis git commit: ARTEMIS-2018 - Add bridge events
to plugin API
ARTEMIS-2018 - Add bridge events to plugin API
Add callbacks to handle bridge events including beforeDeliverBridge,
afterDeliverBridge and afterAcknowledgeBridge
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e9155452
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e9155452
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e9155452
Branch: refs/heads/master
Commit: e9155452784f3241f61c4eb87ec0de20a10d0002
Parents: 24c13fa
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Aug 8 14:10:22 2018 -0400
Committer: Michael Andre Pearce <mi...@me.com>
Committed: Thu Aug 9 13:37:56 2018 +0100
----------------------------------------------------------------------
.../core/server/cluster/impl/BridgeImpl.java | 14 +++-
.../server/plugin/ActiveMQServerPlugin.java | 37 +++++++++-
.../integration/plugin/CorePluginTest.java | 74 +++++++++++++++++---
.../plugin/MethodCalledVerifier.java | 22 ++++++
4 files changed, 136 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9155452/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index c811b63..d2c886b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -521,6 +521,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
ref.getQueue().acknowledge(ref);
pendingAcks.countDown();
metrics.incrementMessagesAcknowledged();
+
+ if (server.hasBrokerPlugins()) {
+ server.callBrokerPlugins(plugin -> plugin.afterAcknowledgeBridge(this, ref));
+ }
} else {
if (logger.isTraceEnabled()) {
logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
@@ -614,13 +618,17 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
pendingAcks.countUp();
try {
+ if (server.hasBrokerPlugins()) {
+ server.callBrokerPlugins(plugin -> plugin.beforeDeliverBridge(this, ref));
+ }
+
final HandleStatus status;
if (message.isLargeMessage()) {
deliveringLargeMessage = true;
deliverLargeMessage(dest, ref, (LargeServerMessage) message);
status = HandleStatus.HANDLED;
} else {
- status = deliverStandardMessage(dest, ref, message);
+ status = deliverStandardMessage(dest, ref, message);
}
//Only increment messages pending acknowledgement if handled by bridge
@@ -628,6 +636,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
metrics.incrementMessagesPendingAcknowledgement();
}
+ if (server.hasBrokerPlugins()) {
+ server.callBrokerPlugins(plugin -> plugin.afterDeliverBridge(this, ref, status));
+ }
+
return status;
} catch (Exception e) {
// If an exception happened, we must count down immediately
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9155452/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
index 97e23b4..704420e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
@@ -610,7 +611,6 @@ public interface ActiveMQServerPlugin {
this.messageAcknowledged(ref, reason);
}
-
/**
* Before a bridge is deployed
*
@@ -632,6 +632,41 @@ public interface ActiveMQServerPlugin {
}
/**
+ * Called immediately before a bridge delivers a message
+ *
+ * @param bridge
+ * @param ref
+ * @throws ActiveMQException
+ */
+ default void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
+
+ }
+
+ /**
+ * Called immediately after a bridge delivers a message but before the message
+ * is acknowledged
+ *
+ * @param bridge
+ * @param ref
+ * @param status
+ * @throws ActiveMQException
+ */
+ default void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
+
+ }
+
+ /**
+ * Called after delivered message over this bridge has been acknowledged by the remote broker
+ *
+ * @param bridge
+ * @param ref
+ * @throws ActiveMQException
+ */
+ default void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
+
+ }
+
+ /**
* A Critical failure has been detected.
* This will be called before the broker is stopped
* @param components
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9155452/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
index f83b3ee..a236b8e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ACKNOWLEDGE_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_BINDING;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
@@ -25,6 +26,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE;
@@ -42,6 +44,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
@@ -70,6 +73,13 @@ import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
@@ -78,10 +88,12 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -285,6 +297,7 @@ public class CorePluginTest extends JMSTestBase {
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(false, 0, false, server0Params);
+ server0.registerBrokerPlugin(verifier);
Map<String, Object> server1Params = new HashMap<>();
server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -293,23 +306,21 @@ public class CorePluginTest extends JMSTestBase {
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+ TransportConfiguration server0tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server0Params);
TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params);
HashMap<String, TransportConfiguration> connectors = new HashMap<>();
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
- server0.registerBrokerPlugin(verifier);
+
+ final int messageSize = 1024;
+ final int numMessages = 10;
ArrayList<String> connectorConfig = new ArrayList<>();
connectorConfig.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1")
- .setQueueName(queueName0)
- .setForwardingAddress(forwardAddress)
- .setRetryInterval(1000)
- .setReconnectAttemptsOnSameNode(-1)
- .setUseDuplicateDetection(false)
- .setStaticConnectors(connectorConfig);
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
@@ -320,14 +331,59 @@ public class CorePluginTest extends JMSTestBase {
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
server1.start();
server0.start();
+ ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+ ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
+ ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+ ClientSession session1 = sf1.createSession(false, true, true);
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+ session1.start();
+
+ final byte[] bytes = new byte[messageSize];
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session0.createMessage(true);
+ message.putIntProperty(propKey, i);
+ message.getBodyBuffer().writeBytes(bytes);
+ producer0.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = consumer1.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer1.receiveImmediate());
+ session0.close();
+ session1.close();
+
+ sf0.close();
+ sf1.close();
+
+ assertEquals(1, server0.getClusterManager().getBridges().size());
+ BridgeMetrics bridgeMetrics = server0.getClusterManager().getBridges().get("bridge1").getMetrics();
+ assertEquals(10, bridgeMetrics.getMessagesPendingAcknowledgement());
+ assertEquals(10, bridgeMetrics.getMessagesAcknowledged());
+
+ //verify plugin method calls
verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+ verifier.validatePluginMethodsEquals(10, BEFORE_DELIVER_BRIDGE, AFTER_DELIVER_BRIDGE, AFTER_ACKNOWLEDGE_BRIDGE);
server0.stop();
server1.stop();
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9155452/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
index 8977ba5..9c24505 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
@@ -95,6 +96,9 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
public static final String AFTER_DELIVER = "afterDeliver";
public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge";
+ public static final String BEFORE_DELIVER_BRIDGE = "beforeDeliverBridge";
+ public static final String AFTER_DELIVER_BRIDGE = "afterDeliverBridge";
+ public static final String AFTER_ACKNOWLEDGE_BRIDGE = "afterAcknowledgeBridge";
public MethodCalledVerifier(Map<String, AtomicInteger> methodCalls) {
super();
@@ -340,6 +344,24 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
methodCalled(AFTER_DEPLOY_BRIDGE);
}
+ @Override
+ public void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
+ Preconditions.checkNotNull(bridge);
+ methodCalled(BEFORE_DELIVER_BRIDGE);
+ }
+
+ @Override
+ public void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
+ Preconditions.checkNotNull(bridge);
+ methodCalled(AFTER_DELIVER_BRIDGE);
+ }
+
+ @Override
+ public void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
+ Preconditions.checkNotNull(bridge);
+ methodCalled(AFTER_ACKNOWLEDGE_BRIDGE);
+ }
+
public void validatePluginMethodsEquals(int count, String... names) {
Arrays.asList(names).forEach(name -> {
try {