You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/08/09 12:38:21 UTC

[2/2] activemq-artemis git commit: ARTEMIS-2018 - Add bridge events to plugin API

ARTEMIS-2018 - Add bridge events to plugin API

Add callbacks to handle bridge events including beforeDeliverBridge,
afterDeliverBridge and afterAcknowledgeBridge


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e9155452
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e9155452
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e9155452

Branch: refs/heads/master
Commit: e9155452784f3241f61c4eb87ec0de20a10d0002
Parents: 24c13fa
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Aug 8 14:10:22 2018 -0400
Committer: Michael Andre Pearce <mi...@me.com>
Committed: Thu Aug 9 13:37:56 2018 +0100

----------------------------------------------------------------------
 .../core/server/cluster/impl/BridgeImpl.java    | 14 +++-
 .../server/plugin/ActiveMQServerPlugin.java     | 37 +++++++++-
 .../integration/plugin/CorePluginTest.java      | 74 +++++++++++++++++---
 .../plugin/MethodCalledVerifier.java            | 22 ++++++
 4 files changed, 136 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9155452/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index c811b63..d2c886b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -521,6 +521,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                ref.getQueue().acknowledge(ref);
                pendingAcks.countDown();
                metrics.incrementMessagesAcknowledged();
+
+               if (server.hasBrokerPlugins()) {
+                  server.callBrokerPlugins(plugin -> plugin.afterAcknowledgeBridge(this, ref));
+               }
             } else {
                if (logger.isTraceEnabled()) {
                   logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
@@ -614,13 +618,17 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
          pendingAcks.countUp();
 
          try {
+            if (server.hasBrokerPlugins()) {
+               server.callBrokerPlugins(plugin -> plugin.beforeDeliverBridge(this, ref));
+            }
+
             final HandleStatus status;
             if (message.isLargeMessage()) {
                deliveringLargeMessage = true;
                deliverLargeMessage(dest, ref, (LargeServerMessage) message);
                status = HandleStatus.HANDLED;
             } else {
-               status =  deliverStandardMessage(dest, ref, message);
+               status = deliverStandardMessage(dest, ref, message);
             }
 
             //Only increment messages pending acknowledgement if handled by bridge
@@ -628,6 +636,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                metrics.incrementMessagesPendingAcknowledgement();
             }
 
+            if (server.hasBrokerPlugins()) {
+               server.callBrokerPlugins(plugin -> plugin.afterDeliverBridge(this, ref, status));
+            }
+
             return status;
          } catch (Exception e) {
             // If an exception happened, we must count down immediately

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9155452/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
index 97e23b4..704420e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
@@ -610,7 +611,6 @@ public interface ActiveMQServerPlugin {
       this.messageAcknowledged(ref, reason);
    }
 
-
    /**
     * Before a bridge is deployed
     *
@@ -632,6 +632,41 @@ public interface ActiveMQServerPlugin {
    }
 
    /**
+    * Called immediately before a bridge delivers a message
+    *
+    * @param bridge
+    * @param ref
+    * @throws ActiveMQException
+    */
+   default void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
+
+   }
+
+   /**
+    * Called immediately after a bridge delivers a message but before the message
+    * is acknowledged
+    *
+    * @param bridge
+    * @param ref
+    * @param status
+    * @throws ActiveMQException
+    */
+   default void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
+
+   }
+
+   /**
+    * Called after delivered message over this bridge has been acknowledged by the remote broker
+    *
+    * @param bridge
+    * @param ref
+    * @throws ActiveMQException
+    */
+   default void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
+
+   }
+
+   /**
     * A Critical failure has been detected.
     * This will be called before the broker is stopped
     * @param components

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9155452/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
index f83b3ee..a236b8e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.plugin;
 
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ACKNOWLEDGE_BRIDGE;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_BINDING;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
@@ -25,6 +26,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER_BRIDGE;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE;
@@ -42,6 +44,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER_BRIDGE;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE;
 import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
@@ -70,6 +73,13 @@ import javax.jms.Topic;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
@@ -78,10 +88,12 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -285,6 +297,7 @@ public class CorePluginTest extends JMSTestBase {
 
       Map<String, Object> server0Params = new HashMap<>();
       server0 = createClusteredServerWithParams(false, 0, false, server0Params);
+      server0.registerBrokerPlugin(verifier);
 
       Map<String, Object> server1Params = new HashMap<>();
       server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -293,23 +306,21 @@ public class CorePluginTest extends JMSTestBase {
       final String testAddress = "testAddress";
       final String queueName0 = "queue0";
       final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
 
+      TransportConfiguration server0tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server0Params);
       TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params);
 
       HashMap<String, TransportConfiguration> connectors = new HashMap<>();
       connectors.put(server1tc.getName(), server1tc);
       server0.getConfiguration().setConnectorConfigurations(connectors);
-      server0.registerBrokerPlugin(verifier);
+
+      final int messageSize = 1024;
+      final int numMessages = 10;
 
       ArrayList<String> connectorConfig = new ArrayList<>();
       connectorConfig.add(server1tc.getName());
-      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1")
-                                                                         .setQueueName(queueName0)
-                                                                         .setForwardingAddress(forwardAddress)
-                                                                         .setRetryInterval(1000)
-                                                                         .setReconnectAttemptsOnSameNode(-1)
-                                                                         .setUseDuplicateDetection(false)
-                                                                         .setStaticConnectors(connectorConfig);
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
       bridgeConfigs.add(bridgeConfiguration);
@@ -320,14 +331,59 @@ public class CorePluginTest extends JMSTestBase {
       queueConfigs0.add(queueConfig0);
       server0.getConfiguration().setQueueConfigurations(queueConfigs0);
 
+      CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
+      List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
+      queueConfigs1.add(queueConfig1);
+      server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
       server1.start();
       server0.start();
 
+      ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+      ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
+      ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
+
+      ClientSession session0 = sf0.createSession(false, true, true);
+      ClientSession session1 = sf1.createSession(false, true, true);
+      ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+      ClientConsumer consumer1 = session1.createConsumer(queueName1);
+      session1.start();
+
+      final byte[] bytes = new byte[messageSize];
+      final SimpleString propKey = new SimpleString("testkey");
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session0.createMessage(true);
+         message.putIntProperty(propKey, i);
+         message.getBodyBuffer().writeBytes(bytes);
+         producer0.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = consumer1.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(i, message.getObjectProperty(propKey));
+         message.acknowledge();
+      }
+
+      Assert.assertNull(consumer1.receiveImmediate());
+      session0.close();
+      session1.close();
+
+      sf0.close();
+      sf1.close();
+
+      assertEquals(1, server0.getClusterManager().getBridges().size());
+      BridgeMetrics bridgeMetrics = server0.getClusterManager().getBridges().get("bridge1").getMetrics();
+      assertEquals(10, bridgeMetrics.getMessagesPendingAcknowledgement());
+      assertEquals(10, bridgeMetrics.getMessagesAcknowledged());
+
+      //verify plugin method calls
       verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsEquals(10, BEFORE_DELIVER_BRIDGE, AFTER_DELIVER_BRIDGE, AFTER_ACKNOWLEDGE_BRIDGE);
 
       server0.stop();
       server1.stop();
-
    }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9155452/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
index 8977ba5..9c24505 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
@@ -95,6 +96,9 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
    public static final String AFTER_DELIVER = "afterDeliver";
    public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
    public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge";
+   public static final String BEFORE_DELIVER_BRIDGE = "beforeDeliverBridge";
+   public static final String AFTER_DELIVER_BRIDGE = "afterDeliverBridge";
+   public static final String AFTER_ACKNOWLEDGE_BRIDGE = "afterAcknowledgeBridge";
 
    public MethodCalledVerifier(Map<String, AtomicInteger> methodCalls) {
       super();
@@ -340,6 +344,24 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
       methodCalled(AFTER_DEPLOY_BRIDGE);
    }
 
+   @Override
+   public void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
+      Preconditions.checkNotNull(bridge);
+      methodCalled(BEFORE_DELIVER_BRIDGE);
+   }
+
+   @Override
+   public void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
+      Preconditions.checkNotNull(bridge);
+      methodCalled(AFTER_DELIVER_BRIDGE);
+   }
+
+   @Override
+   public void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
+      Preconditions.checkNotNull(bridge);
+      methodCalled(AFTER_ACKNOWLEDGE_BRIDGE);
+   }
+
    public void validatePluginMethodsEquals(int count, String... names) {
       Arrays.asList(names).forEach(name -> {
          try {