You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/29 22:56:12 UTC

[activemq-artemis] branch master updated (9e3dbde -> 2918bdf)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from 9e3dbde  This closes #2675
     new 1ccb688  ARTEMIS-2355: Marking message as changed after setting routing type, because it is set after divert
     new 4fa2e75  ARTEMIS-2361 Bridge should make a copy of messages and some improvements into ARTEMIS-2355
     new 2918bdf  This closes #2686

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../core/server/cluster/impl/BridgeImpl.java       |  12 +-
 .../cluster/impl/ClusterConnectionBridge.java      |   2 +-
 .../artemis/core/server/impl/DivertImpl.java       |   2 +
 .../amqp/AmqpBridgeClusterRedistributionTest.java  | 262 +++++++++++++++++++++
 .../integration/cluster/bridge/BridgeTest.java     |  20 +-
 5 files changed, 295 insertions(+), 3 deletions(-)
 create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java


[activemq-artemis] 01/03: ARTEMIS-2355: Marking message as changed after setting routing type, because it is set after divert

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 1ccb688eec47b30d05245dddd55dc6822b646726
Author: Luis De Bello <lu...@mulesoft.com>
AuthorDate: Thu May 23 16:19:12 2019 -0300

    ARTEMIS-2355: Marking message as changed after setting routing type, because it is set after divert
---
 .../amqp/AmqpBridgeClusterRedistributionTest.java  | 262 +++++++++++++++++++++
 1 file changed, 262 insertions(+)

diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
new file mode 100644
index 0000000..3365723
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport {
+
+   protected ActiveMQServer[] servers = new ActiveMQServer[3];
+   private ActiveMQServer server0;
+   private ActiveMQServer server1;
+   private ActiveMQServer server2;
+   private SimpleString customNotificationQueue;
+   private SimpleString frameworkNotificationsQueue;
+   private SimpleString bridgeNotificationsQueue;
+   private SimpleString notificationsQueue;
+
+   private String getServer0URL() {
+      return "tcp://localhost:61616";
+   }
+
+   private String getServer1URL() {
+      return "tcp://localhost:61617";
+   }
+
+   private String getServer2URL() {
+      return "tcp://localhost:61618";
+   }
+
+   @Override
+   public URI getBrokerAmqpConnectionURI() {
+      try {
+         return new URI(getServer0URL());
+      } catch (URISyntaxException e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   @Override
+   protected ActiveMQServer createServer(final boolean realFiles,
+                                         final Configuration configuration,
+                                         final long pageSize,
+                                         final long maxAddressSize,
+                                         final Map<String, AddressSettings> settings) {
+      ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
+
+      if (settings != null) {
+         for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
+            server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+         }
+      }
+
+      AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(maxAddressSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setRedeliveryDelay(0).setRedistributionDelay(0).setAutoCreateQueues(true).setAutoCreateAddresses(true);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      return server;
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server0 = createServer(false, createBasicConfig());
+      server1 = createServer(false, createBasicConfig());
+      server2 = createServer(false, createBasicConfig());
+
+      servers[0] = server0;
+      servers[1] = server1;
+      servers[2] = server2;
+
+      server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL());
+      server0.getConfiguration().addConnectorConfiguration("notification-broker", getServer1URL());
+
+      server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL());
+      server2.getConfiguration().addAcceptorConfiguration("acceptor", getServer2URL());
+
+      DivertConfiguration customNotificationsDivert = new DivertConfiguration().setName("custom-notifications-divert").setAddress("*.Provider.*.Agent.*.CustomNotification").setForwardingAddress("FrameworkNotifications").setExclusive(true);
+
+      DivertConfiguration frameworkNotificationsDivertServer1 = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true);
+      DivertConfiguration frameworkNotificationsDivertServer2 = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true);
+
+      server0.getConfiguration().addDivertConfiguration(customNotificationsDivert);
+
+      server1.getConfiguration().addDivertConfiguration(frameworkNotificationsDivertServer1);
+      server2.getConfiguration().addDivertConfiguration(frameworkNotificationsDivertServer2);
+
+      customNotificationQueue = SimpleString.toSimpleString("*.Provider.*.Agent.*.CustomNotification");
+      frameworkNotificationsQueue = SimpleString.toSimpleString("FrameworkNotifications");
+      bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications");
+      notificationsQueue = SimpleString.toSimpleString("Notifications");
+
+      setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2);
+      setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1);
+
+      server0.start();
+
+      server1.start();
+      server2.start();
+
+      server0.createQueue(customNotificationQueue, RoutingType.ANYCAST, customNotificationQueue, null, true, false);
+      server0.createQueue(frameworkNotificationsQueue, RoutingType.ANYCAST, frameworkNotificationsQueue, null, true, false);
+
+      server1.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false);
+      server1.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false);
+
+      server2.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false);
+      server2.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false);
+
+      server0.deployBridge(new BridgeConfiguration().setName("notifications-bridge").setQueueName(frameworkNotificationsQueue.toString()).setForwardingAddress(bridgeNotificationsQueue.toString()).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("notification-broker")));
+   }
+
+   @After
+   @Override
+   public void tearDown() throws Exception {
+      try {
+         if (server0 != null) {
+            server0.stop();
+         }
+         if (server1 != null) {
+            server1.stop();
+         }
+         if (server2 != null) {
+            server2.stop();
+         }
+      } finally {
+         super.tearDown();
+      }
+   }
+
+   @Test
+   public void testSendMessageToBroker0GetFromBroker1() throws Exception {
+      try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer1URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) {
+
+         session.start();
+
+         sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true);
+
+         ClientMessage message = consumer.receive(5000);
+         assertNotNull(message);
+
+         message = consumer.receiveImmediate();
+         assertNull(message);
+      }
+   }
+
+   @Test
+   public void testSendMessageToBroker0GetFromBroker2() throws Exception {
+      try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer2URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) {
+
+         session.start();
+
+         sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true);
+
+         ClientMessage message = consumer.receive(5000);
+         assertNotNull(message);
+
+         message = consumer.receiveImmediate();
+         assertNull(message);
+      }
+   }
+
+   protected void setupClusterConnection(final String name,
+                                         final String address,
+                                         final MessageLoadBalancingType messageLoadBalancingType,
+                                         final int maxHops,
+                                         final boolean netty,
+                                         final int nodeFrom,
+                                         final int... nodesTo) {
+      setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo);
+   }
+
+   protected void setupClusterConnection(final String name,
+                                         final String address,
+                                         final MessageLoadBalancingType messageLoadBalancingType,
+                                         final int maxHops,
+                                         final boolean netty,
+                                         final ClusterTestBase.ClusterConfigCallback cb,
+                                         final int nodeFrom,
+                                         final int... nodesTo) {
+      ActiveMQServer serverFrom = servers[nodeFrom];
+
+      if (serverFrom == null) {
+         throw new IllegalStateException("No server at node " + nodeFrom);
+      }
+
+      TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
+      serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
+
+      List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
+      Configuration config = serverFrom.getConfiguration();
+      ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs);
+
+      if (cb != null) {
+         cb.configure(clusterConf);
+      }
+      config.getClusterConfigurations().add(clusterConf);
+   }
+
+   private List<String> getClusterConnectionTCNames(boolean netty, ActiveMQServer serverFrom, int[] nodesTo) {
+      List<String> pairs = new ArrayList<>();
+      for (int element : nodesTo) {
+         TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
+         serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
+         pairs.add(serverTotc.getName());
+      }
+      return pairs;
+   }
+
+   private ClusterConnectionConfiguration createClusterConfig(final String name,
+                                                              final String address,
+                                                              final MessageLoadBalancingType messageLoadBalancingType,
+                                                              final int maxHops,
+                                                              TransportConfiguration connectorFrom,
+                                                              List<String> pairs) {
+      return new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
+   }
+}
\ No newline at end of file


[activemq-artemis] 02/03: ARTEMIS-2361 Bridge should make a copy of messages and some improvements into ARTEMIS-2355

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 4fa2e75cdc5572e2e98d311fdcde0c2010726780
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed May 29 13:37:45 2019 -0400

    ARTEMIS-2361 Bridge should make a copy of messages
    and some improvements into ARTEMIS-2355
---
 .../artemis/core/server/cluster/impl/BridgeImpl.java | 12 +++++++++++-
 .../server/cluster/impl/ClusterConnectionBridge.java |  2 +-
 .../artemis/core/server/impl/DivertImpl.java         |  2 ++
 .../tests/integration/cluster/bridge/BridgeTest.java | 20 +++++++++++++++++++-
 4 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index ac08185..38e2294 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -550,7 +550,15 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    }
 
    /* Hook for processing message before forwarding */
-   protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
+   protected Message beforeForward(Message message, final SimpleString forwardingAddress) {
+      message = message.copy();
+
+      return beforeForwardingNoCopy(message, forwardingAddress);
+   }
+
+   /** ClusterConnectionBridge already makes a copy of the message.
+    * So I needed I hook where the message is not copied. */
+   protected Message beforeForwardingNoCopy(Message message, SimpleString forwardingAddress) {
       if (useDuplicateDetection) {
          // We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
          byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
@@ -577,6 +585,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
             break;
       }
 
+      message.messageChanged();
+
       if (transformer != null) {
          final Message transformedMessage = transformer.transform(message);
          if (transformedMessage != message) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index f7e2817..96caf46 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -190,7 +190,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
 
       messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
 
-      messageCopy = super.beforeForward(messageCopy, forwardingAddress);
+      messageCopy = super.beforeForwardingNoCopy(messageCopy, forwardingAddress);
 
       return messageCopy;
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index dce20e7..7fbf20c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -125,6 +125,8 @@ public class DivertImpl implements Divert {
          if (transformer != null) {
             copy = transformer.transform(copy);
          }
+
+         copy.messageChanged();
       } else {
          copy = message;
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index e9d8a21..c6d6fa6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -806,6 +806,7 @@ public class BridgeTest extends ActiveMQTestBase {
 
       final String testAddress = "testAddress";
       final String queueName0 = "queue0";
+      final String secondQueue = "queue1";
       final String forwardAddress = "forwardAddress";
       final String queueName1 = "forwardQueue";
 
@@ -827,6 +828,8 @@ public class BridgeTest extends ActiveMQTestBase {
       CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
       List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
       queueConfigs0.add(queueConfig0);
+      queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(secondQueue);
+      queueConfigs0.add(queueConfig0);
       server0.getConfiguration().setQueueConfigurations(queueConfigs0);
 
       server0.start();
@@ -882,7 +885,8 @@ public class BridgeTest extends ActiveMQTestBase {
          tx.commit();
       }
 
-      Thread.sleep(1000);
+      Thread.sleep(100);
+
 
       ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
 
@@ -909,6 +913,20 @@ public class BridgeTest extends ActiveMQTestBase {
 
       Assert.assertNull(consumer1.receiveImmediate());
 
+      ClientConsumer otherConsumer = session0.createConsumer(secondQueue);
+      session0.start();
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = otherConsumer.receive(5000);
+         Assert.assertNotNull(message);
+         // This is validating the Bridge is not messing up with the original message
+         // and should make a copy of the message before sending it
+         Assert.assertEquals(2, message.getPropertyNames().size());
+         Assert.assertEquals(i, message.getIntProperty(propKey).intValue());
+         Assert.assertEquals(new SimpleString("monkey" + i), message.getSimpleStringProperty(selectorKey));
+         message.acknowledge();
+
+      }
+
       consumer1.close();
 
       session1.deleteQueue(queueName1);


[activemq-artemis] 03/03: This closes #2686

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 2918bdffd28a9a10f9ad06dc34ab10ae12f73ab4
Merge: 9e3dbde 4fa2e75
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed May 29 18:56:01 2019 -0400

    This closes #2686

 .../core/server/cluster/impl/BridgeImpl.java       |  12 +-
 .../cluster/impl/ClusterConnectionBridge.java      |   2 +-
 .../artemis/core/server/impl/DivertImpl.java       |   2 +
 .../amqp/AmqpBridgeClusterRedistributionTest.java  | 262 +++++++++++++++++++++
 .../integration/cluster/bridge/BridgeTest.java     |  20 +-
 5 files changed, 295 insertions(+), 3 deletions(-)