You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2018/08/30 20:18:33 UTC

activemq-artemis git commit: ARTEMIS-2065 Can't change queue routing-type between restarts

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 4ce8743f2 -> 715689efa


ARTEMIS-2065 Can't change queue routing-type between restarts

(cherry picked from commit 3827c54c058e7fef062d6d5e7e97e3f6466361fe)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/715689ef
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/715689ef
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/715689ef

Branch: refs/heads/2.6.x
Commit: 715689efaaa91c64e927a4931eabe30c95f745c5
Parents: 4ce8743
Author: Justin Bertram <jb...@apache.org>
Authored: Thu Aug 30 13:21:07 2018 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Aug 30 15:18:04 2018 -0500

----------------------------------------------------------------------
 .../core/server/ActiveMQServerLogger.java       |  7 +-
 .../core/server/impl/ActiveMQServerImpl.java    | 25 +++---
 .../tests/integration/jms/RedeployTest.java     | 44 +++++++++-
 .../persistence/ConfigChangeTest.java           | 86 ++++++++++++++++++++
 .../reload-queue-routingtype-updated.xml        | 40 +++++++++
 .../test/resources/reload-queue-routingtype.xml | 40 +++++++++
 6 files changed, 225 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/715689ef/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index cc9674e..1fb67ab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -44,6 +44,7 @@ import io.netty.channel.Channel;
 
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -1898,12 +1899,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
    void criticalSystemLog(Object component);
 
    @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 224076, value = "UnDeploying address {0}", format = Message.Format.MESSAGE_FORMAT)
+   @Message(id = 224076, value = "Undeploying address {0}", format = Message.Format.MESSAGE_FORMAT)
    void undeployAddress(SimpleString addressName);
 
    @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 224077, value = "UnDeploying queue {0}", format = Message.Format.MESSAGE_FORMAT)
-   void undeployQueue(SimpleString queueName);
+   @Message(id = 224077, value = "Undeploying {0} queue {1}", format = Message.Format.MESSAGE_FORMAT)
+   void undeployQueue(RoutingType routingType, SimpleString queueName);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 224078, value = "The size of duplicate cache detection (<id_cache-size/>) appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer (<confirmation-window-size/>) {1}.", format = Message.Format.MESSAGE_FORMAT)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/715689ef/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 3c427b2..f73f0a5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2389,6 +2389,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
       }
 
+      // Undeploy any addresses and queues not in config
+      undeployAddressesAndQueueNotInConfiguration();
+
       // Deploy the rest of the stuff
 
       // Deploy predefined addresses
@@ -2397,9 +2400,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       // Deploy any predefined queues
       deployQueuesFromConfiguration();
 
-      // Undeploy any addresses and queues not in config
-      undeployAddressesAndQueueNotInConfiguration();
-
       // We need to call this here, this gives any dependent server a chance to deploy its own addresses
       // this needs to be done before clustering is fully activated
       callActivateCallbacks();
@@ -2486,25 +2486,28 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          .map(CoreAddressConfiguration::getName)
          .collect(Collectors.toSet());
 
-      Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
-         .map(CoreAddressConfiguration::getQueueConfigurations)
-         .flatMap(List::stream).map(CoreQueueConfiguration::getName)
-         .collect(Collectors.toSet());
+      Set<String> queuesInConfig = new HashSet<>();
+      for (CoreAddressConfiguration cac : configuration.getAddressConfigurations()) {
+         for (CoreQueueConfiguration cqc : cac.getQueueConfigurations()) {
+            // combine the routing-type and queue name as the unique identifier as it's possible to change the routing-type without changing the name
+            queuesInConfig.add(cqc.getRoutingType().toString() + cqc.getName());
+         }
+      }
 
       for (SimpleString addressName : listAddressNames()) {
          AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
 
          if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
             for (Queue queue : listQueues(addressName)) {
-               ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
+               ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
                queue.deleteQueue(true);
             }
             ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
             removeAddressInfo(addressName, null);
          } else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
             for (Queue queue : listConfiguredQueues(addressName)) {
-               if (!queuesInConfig.contains(queue.getName().toString())) {
-                  ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
+               if (!queuesInConfig.contains(queue.getRoutingType().toString() + queue.getName().toString())) {
+                  ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
                   queue.deleteQueue(true);
                }
             }
@@ -3164,8 +3167,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
             }
 
             ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
-            deployAddressesFromConfiguration(config);
             undeployAddressesAndQueueNotInConfiguration(config);
+            deployAddressesFromConfiguration(config);
             configuration.setAddressConfigurations(config.getAddressConfigurations());
             configuration.setQueueConfigurations(config.getQueueConfigurations());
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/715689ef/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index 2ba7f3d..fb7e20f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.security.Role;
@@ -196,9 +197,6 @@ public class RedeployTest extends ActiveMQTestBase {
 
    }
 
-
-
-
    @Test
    public void testRedeployAddressQueue() throws Exception {
       Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
@@ -265,6 +263,46 @@ public class RedeployTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testRedeployChangeQueueRoutingType() throws Exception {
+      Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+      URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype.xml");
+      URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype-updated.xml");
+      Files.copy(url1.openStream(), brokerXML);
+
+      EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+      embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+      embeddedActiveMQ.start();
+
+      final ReusableLatch latch = new ReusableLatch(1);
+
+      Runnable tick = new Runnable() {
+         @Override
+         public void run() {
+            latch.countDown();
+         }
+      };
+
+      embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+      try {
+         latch.await(10, TimeUnit.SECONDS);
+         Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
+         Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
+
+         Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
+         brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
+         latch.setCount(1);
+         embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+         latch.await(10, TimeUnit.SECONDS);
+
+         Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
+         Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
+      } finally {
+         embeddedActiveMQ.stop();
+      }
+   }
+
 
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/715689ef/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
new file mode 100644
index 0000000..3a2264f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.persistence;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Test;
+
+public class ConfigChangeTest extends ActiveMQTestBase {
+
+   private ActiveMQServer server;
+
+   @Test
+   public void testChangeQueueRoutingTypeOnRestart() throws Exception {
+      internalTestChangeQueueRoutingTypeOnRestart(false);
+   }
+
+   @Test
+   public void testChangeQueueRoutingTypeOnRestartNegative() throws Exception {
+      internalTestChangeQueueRoutingTypeOnRestart(true);
+   }
+
+   public void internalTestChangeQueueRoutingTypeOnRestart(boolean negative) throws Exception {
+      // if negative == true then the queue's routing type should *not* change
+
+      Configuration configuration = createDefaultInVMConfig();
+      configuration.addAddressesSetting("#", new AddressSettings()
+         .setConfigDeleteQueues(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE)
+         .setConfigDeleteAddresses(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE));
+
+      List addressConfigurations = new ArrayList();
+      CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
+         .setName("myAddress")
+         .addRoutingType(RoutingType.ANYCAST)
+         .addQueueConfiguration(new CoreQueueConfiguration()
+                                   .setName("myQueue")
+                                   .setAddress("myAddress")
+                                   .setRoutingType(RoutingType.ANYCAST));
+      addressConfigurations.add(addressConfiguration);
+      configuration.setAddressConfigurations(addressConfigurations);
+      server = createServer(true, configuration);
+      server.start();
+      server.stop();
+
+      addressConfiguration = new CoreAddressConfiguration()
+         .setName("myAddress")
+         .addRoutingType(RoutingType.MULTICAST)
+         .addQueueConfiguration(new CoreQueueConfiguration()
+                                   .setName("myQueue")
+                                   .setAddress("myAddress")
+                                   .setRoutingType(RoutingType.MULTICAST));
+      addressConfigurations.clear();
+      addressConfigurations.add(addressConfiguration);
+      configuration.setAddressConfigurations(addressConfigurations);
+
+      server.start();
+      assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
+      assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
+      server.stop();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/715689ef/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
new file mode 100644
index 0000000..e5bbe4f
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
@@ -0,0 +1,40 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+      <address-settings>
+         <address-setting match="#">
+            <config-delete-queues>FORCE</config-delete-queues>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="myAddress">
+            <anycast>
+               <queue name="myQueue"/>
+            </anycast>
+         </address>
+      </addresses>
+   </core>
+</configuration>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/715689ef/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
new file mode 100644
index 0000000..61ae86a
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
@@ -0,0 +1,40 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+      <address-settings>
+         <address-setting match="#">
+            <config-delete-queues>FORCE</config-delete-queues>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="myAddress">
+            <multicast>
+               <queue name="myQueue"/>
+            </multicast>
+         </address>
+      </addresses>
+   </core>
+</configuration>