You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/29 22:56:13 UTC
[activemq-artemis] 01/03: ARTEMIS-2355: Marking message as changed
after setting routing type, because it is set after divert
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 1ccb688eec47b30d05245dddd55dc6822b646726
Author: Luis De Bello <lu...@mulesoft.com>
AuthorDate: Thu May 23 16:19:12 2019 -0300
ARTEMIS-2355: Marking message as changed after setting routing type, because it is set after divert
---
.../amqp/AmqpBridgeClusterRedistributionTest.java | 262 +++++++++++++++++++++
1 file changed, 262 insertions(+)
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
new file mode 100644
index 0000000..3365723
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport {
+
+ protected ActiveMQServer[] servers = new ActiveMQServer[3];
+ private ActiveMQServer server0;
+ private ActiveMQServer server1;
+ private ActiveMQServer server2;
+ private SimpleString customNotificationQueue;
+ private SimpleString frameworkNotificationsQueue;
+ private SimpleString bridgeNotificationsQueue;
+ private SimpleString notificationsQueue;
+
+ private String getServer0URL() {
+ return "tcp://localhost:61616";
+ }
+
+ private String getServer1URL() {
+ return "tcp://localhost:61617";
+ }
+
+ private String getServer2URL() {
+ return "tcp://localhost:61618";
+ }
+
+ @Override
+ public URI getBrokerAmqpConnectionURI() {
+ try {
+ return new URI(getServer0URL());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected ActiveMQServer createServer(final boolean realFiles,
+ final Configuration configuration,
+ final long pageSize,
+ final long maxAddressSize,
+ final Map<String, AddressSettings> settings) {
+ ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
+
+ if (settings != null) {
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+ }
+ }
+
+ AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(maxAddressSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setRedeliveryDelay(0).setRedistributionDelay(0).setAutoCreateQueues(true).setAutoCreateAddresses(true);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server0 = createServer(false, createBasicConfig());
+ server1 = createServer(false, createBasicConfig());
+ server2 = createServer(false, createBasicConfig());
+
+ servers[0] = server0;
+ servers[1] = server1;
+ servers[2] = server2;
+
+ server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL());
+ server0.getConfiguration().addConnectorConfiguration("notification-broker", getServer1URL());
+
+ server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL());
+ server2.getConfiguration().addAcceptorConfiguration("acceptor", getServer2URL());
+
+ DivertConfiguration customNotificationsDivert = new DivertConfiguration().setName("custom-notifications-divert").setAddress("*.Provider.*.Agent.*.CustomNotification").setForwardingAddress("FrameworkNotifications").setExclusive(true);
+
+ DivertConfiguration frameworkNotificationsDivertServer1 = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true);
+ DivertConfiguration frameworkNotificationsDivertServer2 = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true);
+
+ server0.getConfiguration().addDivertConfiguration(customNotificationsDivert);
+
+ server1.getConfiguration().addDivertConfiguration(frameworkNotificationsDivertServer1);
+ server2.getConfiguration().addDivertConfiguration(frameworkNotificationsDivertServer2);
+
+ customNotificationQueue = SimpleString.toSimpleString("*.Provider.*.Agent.*.CustomNotification");
+ frameworkNotificationsQueue = SimpleString.toSimpleString("FrameworkNotifications");
+ bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications");
+ notificationsQueue = SimpleString.toSimpleString("Notifications");
+
+ setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2);
+ setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1);
+
+ server0.start();
+
+ server1.start();
+ server2.start();
+
+ server0.createQueue(customNotificationQueue, RoutingType.ANYCAST, customNotificationQueue, null, true, false);
+ server0.createQueue(frameworkNotificationsQueue, RoutingType.ANYCAST, frameworkNotificationsQueue, null, true, false);
+
+ server1.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false);
+ server1.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false);
+
+ server2.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false);
+ server2.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false);
+
+ server0.deployBridge(new BridgeConfiguration().setName("notifications-bridge").setQueueName(frameworkNotificationsQueue.toString()).setForwardingAddress(bridgeNotificationsQueue.toString()).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("notification-broker")));
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ try {
+ if (server0 != null) {
+ server0.stop();
+ }
+ if (server1 != null) {
+ server1.stop();
+ }
+ if (server2 != null) {
+ server2.stop();
+ }
+ } finally {
+ super.tearDown();
+ }
+ }
+
+ @Test
+ public void testSendMessageToBroker0GetFromBroker1() throws Exception {
+ try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer1URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) {
+
+ session.start();
+
+ sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true);
+
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+
+ message = consumer.receiveImmediate();
+ assertNull(message);
+ }
+ }
+
+ @Test
+ public void testSendMessageToBroker0GetFromBroker2() throws Exception {
+ try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer2URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) {
+
+ session.start();
+
+ sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true);
+
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+
+ message = consumer.receiveImmediate();
+ assertNull(message);
+ }
+ }
+
+ protected void setupClusterConnection(final String name,
+ final String address,
+ final MessageLoadBalancingType messageLoadBalancingType,
+ final int maxHops,
+ final boolean netty,
+ final int nodeFrom,
+ final int... nodesTo) {
+ setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo);
+ }
+
+ protected void setupClusterConnection(final String name,
+ final String address,
+ final MessageLoadBalancingType messageLoadBalancingType,
+ final int maxHops,
+ final boolean netty,
+ final ClusterTestBase.ClusterConfigCallback cb,
+ final int nodeFrom,
+ final int... nodesTo) {
+ ActiveMQServer serverFrom = servers[nodeFrom];
+
+ if (serverFrom == null) {
+ throw new IllegalStateException("No server at node " + nodeFrom);
+ }
+
+ TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
+
+ List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
+ Configuration config = serverFrom.getConfiguration();
+ ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs);
+
+ if (cb != null) {
+ cb.configure(clusterConf);
+ }
+ config.getClusterConfigurations().add(clusterConf);
+ }
+
+ private List<String> getClusterConnectionTCNames(boolean netty, ActiveMQServer serverFrom, int[] nodesTo) {
+ List<String> pairs = new ArrayList<>();
+ for (int element : nodesTo) {
+ TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
+ pairs.add(serverTotc.getName());
+ }
+ return pairs;
+ }
+
+ private ClusterConnectionConfiguration createClusterConfig(final String name,
+ final String address,
+ final MessageLoadBalancingType messageLoadBalancingType,
+ final int maxHops,
+ TransportConfiguration connectorFrom,
+ List<String> pairs) {
+ return new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
+ }
+}
\ No newline at end of file