You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/07 19:36:44 UTC
activemq-artemis git commit: ARTEMIS-2065 Change routing-type isnt
destructive.
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x 4e03b4029 -> 410b3c1dd
ARTEMIS-2065 Change routing-type isnt destructive.
Revert previous fix
Keep original ConfigChangeTest
Apply new non-destructive fix.
Enhance tests to ensure messages in queues are not lost either on reload when running or when config changed on-restart (e.g. queue i not destroyed)
(cherry picked from commit dbfdc18f49cfa3fed89f722d78aa49dd170028b9)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/410b3c1d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/410b3c1d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/410b3c1d
Branch: refs/heads/2.6.x
Commit: 410b3c1dde46a1eb54ae9b9e963633e088e10188
Parents: 4e03b40
Author: Michael André Pearce <mi...@me.com>
Authored: Thu Sep 6 18:41:21 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 7 15:36:24 2018 -0400
----------------------------------------------------------------------
.../core/server/ActiveMQServerLogger.java | 5 +-
.../core/server/impl/ActiveMQServerImpl.java | 54 ++++++++++++++------
.../tests/integration/jms/RedeployTest.java | 31 ++++++++---
.../persistence/ConfigChangeTest.java | 41 ++++++++-------
.../reload-queue-routingtype-updated.xml | 18 ++++---
.../test/resources/reload-queue-routingtype.xml | 17 +++---
6 files changed, 106 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 1fb67ab..6ebecd6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -44,7 +44,6 @@ import io.netty.channel.Channel;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
@@ -1903,8 +1902,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void undeployAddress(SimpleString addressName);
@LogMessage(level = Logger.Level.INFO)
- @Message(id = 224077, value = "Undeploying {0} queue {1}", format = Message.Format.MESSAGE_FORMAT)
- void undeployQueue(RoutingType routingType, SimpleString queueName);
+ @Message(id = 224077, value = "Undeploying queue {0}", format = Message.Format.MESSAGE_FORMAT)
+ void undeployQueue(SimpleString queueName);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224078, value = "The size of duplicate cache detection (<id_cache-size/>) appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer (<confirmation-window-size/>) {1}.", format = Message.Format.MESSAGE_FORMAT)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index f73f0a5..8557852 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2389,9 +2389,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
}
- // Undeploy any addresses and queues not in config
- undeployAddressesAndQueueNotInConfiguration();
-
// Deploy the rest of the stuff
// Deploy predefined addresses
@@ -2400,6 +2397,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Deploy any predefined queues
deployQueuesFromConfiguration();
+ // Undeploy any addresses and queues not in config
+ undeployAddressesAndQueueNotInConfiguration();
+
// We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated
callActivateCallbacks();
@@ -2483,31 +2483,28 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception {
Set<String> addressesInConfig = configuration.getAddressConfigurations().stream()
- .map(CoreAddressConfiguration::getName)
- .collect(Collectors.toSet());
+ .map(CoreAddressConfiguration::getName)
+ .collect(Collectors.toSet());
- Set<String> queuesInConfig = new HashSet<>();
- for (CoreAddressConfiguration cac : configuration.getAddressConfigurations()) {
- for (CoreQueueConfiguration cqc : cac.getQueueConfigurations()) {
- // combine the routing-type and queue name as the unique identifier as it's possible to change the routing-type without changing the name
- queuesInConfig.add(cqc.getRoutingType().toString() + cqc.getName());
- }
- }
+ Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
+ .map(CoreAddressConfiguration::getQueueConfigurations)
+ .flatMap(List::stream).map(CoreQueueConfiguration::getName)
+ .collect(Collectors.toSet());
for (SimpleString addressName : listAddressNames()) {
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
for (Queue queue : listQueues(addressName)) {
- ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
+ ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
queue.deleteQueue(true);
}
ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
removeAddressInfo(addressName, null);
} else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
for (Queue queue : listConfiguredQueues(addressName)) {
- if (!queuesInConfig.contains(queue.getRoutingType().toString() + queue.getName().toString())) {
- ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
+ if (!queuesInConfig.contains(queue.getName().toString())) {
+ ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
queue.deleteQueue(true);
}
}
@@ -2535,15 +2532,38 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) {
try {
ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString());
- AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
- addOrUpdateAddressInfo(info);
+ SimpleString address = SimpleString.toSimpleString(config.getName());
+
+ AddressInfo tobe = new AddressInfo(address, config.getRoutingTypes());
+
+ //During this stage until all queues re-configured we combine the current (if exists) with to-be routing types to allow changes in queues
+ AddressInfo current = getAddressInfo(address);
+ AddressInfo merged = new AddressInfo(address, tobe.getRoutingType());
+ if (current != null) {
+ merged.getRoutingTypes().addAll(current.getRoutingTypes());
+ }
+ addOrUpdateAddressInfo(merged);
+
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
+
+ //Now all queues updated we apply the actual address info expected tobe.
+ addOrUpdateAddressInfo(tobe);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.problemDeployingAddress(config.getName(), e.getMessage());
}
}
}
+ private AddressInfo mergedRoutingTypes(SimpleString address, AddressInfo... addressInfos) {
+ EnumSet<RoutingType> mergedRoutingTypes = EnumSet.noneOf(RoutingType.class);
+ for (AddressInfo addressInfo : addressInfos) {
+ if (addressInfo != null) {
+ mergedRoutingTypes.addAll(addressInfo.getRoutingTypes());
+ }
+ }
+ return new AddressInfo(address, mergedRoutingTypes);
+ }
+
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) {
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index 126534e..b9999d0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -17,12 +17,6 @@
package org.apache.activemq.artemis.tests.integration.jms;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -32,6 +26,16 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -286,9 +290,14 @@ public class RedeployTest extends ActiveMQTestBase {
embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick);
try {
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
+ try (JMSContext context = connectionFactory.createContext()) {
+ context.createProducer().send(context.createQueue("myAddress"), "hello");
+ }
+
latch.await(10, TimeUnit.SECONDS);
Assert.assertNotNull(getAddressInfo(embeddedJMS, "myAddress"));
- Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedJMS, "myQueue").getRoutingType());
+ Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedJMS, "myQueue").getRoutingType());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
@@ -297,7 +306,13 @@ public class RedeployTest extends ActiveMQTestBase {
latch.await(10, TimeUnit.SECONDS);
Assert.assertNotNull(getAddressInfo(embeddedJMS, "myAddress"));
- Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedJMS, "myQueue").getRoutingType());
+ Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedJMS, "myQueue").getRoutingType());
+
+ //Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss)
+ try (JMSContext context = connectionFactory.createContext()) {
+ Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive();
+ assertEquals("hello", ((TextMessage) message).getText());
+ }
} finally {
embeddedJMS.stop();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
index 3a2264f..fefc735 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
@@ -20,6 +20,10 @@ package org.apache.activemq.artemis.tests.integration.persistence;
import java.util.ArrayList;
import java.util.List;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSContext;
+import javax.jms.Message;
+import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
@@ -27,7 +31,7 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
@@ -37,21 +41,8 @@ public class ConfigChangeTest extends ActiveMQTestBase {
@Test
public void testChangeQueueRoutingTypeOnRestart() throws Exception {
- internalTestChangeQueueRoutingTypeOnRestart(false);
- }
-
- @Test
- public void testChangeQueueRoutingTypeOnRestartNegative() throws Exception {
- internalTestChangeQueueRoutingTypeOnRestart(true);
- }
-
- public void internalTestChangeQueueRoutingTypeOnRestart(boolean negative) throws Exception {
- // if negative == true then the queue's routing type should *not* change
-
Configuration configuration = createDefaultInVMConfig();
- configuration.addAddressesSetting("#", new AddressSettings()
- .setConfigDeleteQueues(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE)
- .setConfigDeleteAddresses(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE));
+ configuration.addAddressesSetting("#", new AddressSettings());
List addressConfigurations = new ArrayList();
CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
@@ -65,6 +56,14 @@ public class ConfigChangeTest extends ActiveMQTestBase {
configuration.setAddressConfigurations(addressConfigurations);
server = createServer(true, configuration);
server.start();
+
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0");
+ try (JMSContext context = connectionFactory.createContext()) {
+ context.createProducer().send(context.createQueue("myAddress"), "hello");
+ }
+
+
server.stop();
addressConfiguration = new CoreAddressConfiguration()
@@ -77,10 +76,16 @@ public class ConfigChangeTest extends ActiveMQTestBase {
addressConfigurations.clear();
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
-
server.start();
- assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
- assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
+ assertEquals(RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
+ assertEquals(RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
+
+ //Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss)
+ try (JMSContext context = connectionFactory.createContext()) {
+ Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive();
+ assertEquals("hello", ((TextMessage) message).getText());
+ }
+
server.stop();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
index e5bbe4f..a031359 100644
--- a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
+++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
@@ -23,17 +23,21 @@ under the License.
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
- <address-settings>
- <address-setting match="#">
- <config-delete-queues>FORCE</config-delete-queues>
- </address-setting>
- </address-settings>
+ <security-enabled>false</security-enabled>
+
+ <acceptors>
+ <!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
+ <!-- performance tests have shown that openWire performs best with these buffer sizes -->
+ <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+
+ </acceptors>
+
<addresses>
<address name="myAddress">
- <anycast>
+ <multicast>
<queue name="myQueue"/>
- </anycast>
+ </multicast>
</address>
</addresses>
</core>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
index 61ae86a..6737a05 100644
--- a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
+++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
@@ -23,17 +23,20 @@ under the License.
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
- <address-settings>
- <address-setting match="#">
- <config-delete-queues>FORCE</config-delete-queues>
- </address-setting>
- </address-settings>
+ <security-enabled>false</security-enabled>
+
+ <acceptors>
+ <!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
+ <!-- performance tests have shown that openWire performs best with these buffer sizes -->
+ <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+
+ </acceptors>
<addresses>
<address name="myAddress">
- <multicast>
+ <anycast>
<queue name="myQueue"/>
- </multicast>
+ </anycast>
</address>
</addresses>
</core>