You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/07 19:36:44 UTC

activemq-artemis git commit: ARTEMIS-2065 Change routing-type isnt destructive.

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 4e03b4029 -> 410b3c1dd


ARTEMIS-2065 Change routing-type isnt destructive.

Revert previous fix
Keep original ConfigChangeTest
Apply new non-destructive fix.
Enhance tests to ensure messages in queues are not lost either on reload when running or when config changed on-restart (e.g. queue i not destroyed)
(cherry picked from commit dbfdc18f49cfa3fed89f722d78aa49dd170028b9)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/410b3c1d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/410b3c1d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/410b3c1d

Branch: refs/heads/2.6.x
Commit: 410b3c1dde46a1eb54ae9b9e963633e088e10188
Parents: 4e03b40
Author: Michael André Pearce <mi...@me.com>
Authored: Thu Sep 6 18:41:21 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 7 15:36:24 2018 -0400

----------------------------------------------------------------------
 .../core/server/ActiveMQServerLogger.java       |  5 +-
 .../core/server/impl/ActiveMQServerImpl.java    | 54 ++++++++++++++------
 .../tests/integration/jms/RedeployTest.java     | 31 ++++++++---
 .../persistence/ConfigChangeTest.java           | 41 ++++++++-------
 .../reload-queue-routingtype-updated.xml        | 18 ++++---
 .../test/resources/reload-queue-routingtype.xml | 17 +++---
 6 files changed, 106 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 1fb67ab..6ebecd6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -44,7 +44,6 @@ import io.netty.channel.Channel;
 
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -1903,8 +1902,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    void undeployAddress(SimpleString addressName);
 
    @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 224077, value = "Undeploying {0} queue {1}", format = Message.Format.MESSAGE_FORMAT)
-   void undeployQueue(RoutingType routingType, SimpleString queueName);
+   @Message(id = 224077, value = "Undeploying queue {0}", format = Message.Format.MESSAGE_FORMAT)
+   void undeployQueue(SimpleString queueName);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 224078, value = "The size of duplicate cache detection (<id_cache-size/>) appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer (<confirmation-window-size/>) {1}.", format = Message.Format.MESSAGE_FORMAT)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index f73f0a5..8557852 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2389,9 +2389,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
       }
 
-      // Undeploy any addresses and queues not in config
-      undeployAddressesAndQueueNotInConfiguration();
-
       // Deploy the rest of the stuff
 
       // Deploy predefined addresses
@@ -2400,6 +2397,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       // Deploy any predefined queues
       deployQueuesFromConfiguration();
 
+      // Undeploy any addresses and queues not in config
+      undeployAddressesAndQueueNotInConfiguration();
+
       // We need to call this here, this gives any dependent server a chance to deploy its own addresses
       // this needs to be done before clustering is fully activated
       callActivateCallbacks();
@@ -2483,31 +2483,28 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception {
       Set<String> addressesInConfig = configuration.getAddressConfigurations().stream()
-         .map(CoreAddressConfiguration::getName)
-         .collect(Collectors.toSet());
+              .map(CoreAddressConfiguration::getName)
+              .collect(Collectors.toSet());
 
-      Set<String> queuesInConfig = new HashSet<>();
-      for (CoreAddressConfiguration cac : configuration.getAddressConfigurations()) {
-         for (CoreQueueConfiguration cqc : cac.getQueueConfigurations()) {
-            // combine the routing-type and queue name as the unique identifier as it's possible to change the routing-type without changing the name
-            queuesInConfig.add(cqc.getRoutingType().toString() + cqc.getName());
-         }
-      }
+      Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
+              .map(CoreAddressConfiguration::getQueueConfigurations)
+              .flatMap(List::stream).map(CoreQueueConfiguration::getName)
+              .collect(Collectors.toSet());
 
       for (SimpleString addressName : listAddressNames()) {
          AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
 
          if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
             for (Queue queue : listQueues(addressName)) {
-               ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
+               ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
                queue.deleteQueue(true);
             }
             ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
             removeAddressInfo(addressName, null);
          } else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
             for (Queue queue : listConfiguredQueues(addressName)) {
-               if (!queuesInConfig.contains(queue.getRoutingType().toString() + queue.getName().toString())) {
-                  ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName());
+               if (!queuesInConfig.contains(queue.getName().toString())) {
+                  ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
                   queue.deleteQueue(true);
                }
             }
@@ -2535,15 +2532,38 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) {
          try {
             ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString());
-            AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
-            addOrUpdateAddressInfo(info);
+            SimpleString address = SimpleString.toSimpleString(config.getName());
+
+            AddressInfo tobe = new AddressInfo(address, config.getRoutingTypes());
+
+            //During this stage until all queues re-configured we combine the current (if exists) with to-be routing types to allow changes in queues
+            AddressInfo current = getAddressInfo(address);
+            AddressInfo merged = new AddressInfo(address, tobe.getRoutingType());
+            if (current != null) {
+               merged.getRoutingTypes().addAll(current.getRoutingTypes());
+            }
+            addOrUpdateAddressInfo(merged);
+
             deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
+
+            //Now all queues updated we apply the actual address info expected tobe.
+            addOrUpdateAddressInfo(tobe);
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.problemDeployingAddress(config.getName(), e.getMessage());
          }
       }
    }
 
+   private AddressInfo mergedRoutingTypes(SimpleString address, AddressInfo... addressInfos) {
+      EnumSet<RoutingType> mergedRoutingTypes = EnumSet.noneOf(RoutingType.class);
+      for (AddressInfo addressInfo : addressInfos) {
+         if (addressInfo != null) {
+            mergedRoutingTypes.addAll(addressInfo.getRoutingTypes());
+         }
+      }
+      return new AddressInfo(address, mergedRoutingTypes);
+   }
+
    private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
       for (CoreQueueConfiguration config : queues) {
          try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index 126534e..b9999d0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -17,12 +17,6 @@
 
 package org.apache.activemq.artemis.tests.integration.jms;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -32,6 +26,16 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -286,9 +290,14 @@ public class RedeployTest extends ActiveMQTestBase {
       embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick);
 
       try {
+         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
+         try (JMSContext context = connectionFactory.createContext()) {
+            context.createProducer().send(context.createQueue("myAddress"), "hello");
+         }
+
          latch.await(10, TimeUnit.SECONDS);
          Assert.assertNotNull(getAddressInfo(embeddedJMS, "myAddress"));
-         Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedJMS, "myQueue").getRoutingType());
+         Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedJMS, "myQueue").getRoutingType());
 
          Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
          brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
@@ -297,7 +306,13 @@ public class RedeployTest extends ActiveMQTestBase {
          latch.await(10, TimeUnit.SECONDS);
 
          Assert.assertNotNull(getAddressInfo(embeddedJMS, "myAddress"));
-         Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedJMS, "myQueue").getRoutingType());
+         Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedJMS, "myQueue").getRoutingType());
+
+         //Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss)
+         try (JMSContext context = connectionFactory.createContext()) {
+            Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive();
+            assertEquals("hello", ((TextMessage) message).getText());
+         }
       } finally {
          embeddedJMS.stop();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
index 3a2264f..fefc735 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
@@ -20,6 +20,10 @@ package org.apache.activemq.artemis.tests.integration.persistence;
 import java.util.ArrayList;
 import java.util.List;
 
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSContext;
+import javax.jms.Message;
+import javax.jms.TextMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -27,7 +31,7 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Test;
 
@@ -37,21 +41,8 @@ public class ConfigChangeTest extends ActiveMQTestBase {
 
    @Test
    public void testChangeQueueRoutingTypeOnRestart() throws Exception {
-      internalTestChangeQueueRoutingTypeOnRestart(false);
-   }
-
-   @Test
-   public void testChangeQueueRoutingTypeOnRestartNegative() throws Exception {
-      internalTestChangeQueueRoutingTypeOnRestart(true);
-   }
-
-   public void internalTestChangeQueueRoutingTypeOnRestart(boolean negative) throws Exception {
-      // if negative == true then the queue's routing type should *not* change
-
       Configuration configuration = createDefaultInVMConfig();
-      configuration.addAddressesSetting("#", new AddressSettings()
-         .setConfigDeleteQueues(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE)
-         .setConfigDeleteAddresses(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE));
+      configuration.addAddressesSetting("#", new AddressSettings());
 
       List addressConfigurations = new ArrayList();
       CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
@@ -65,6 +56,14 @@ public class ConfigChangeTest extends ActiveMQTestBase {
       configuration.setAddressConfigurations(addressConfigurations);
       server = createServer(true, configuration);
       server.start();
+
+
+      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0");
+      try (JMSContext context = connectionFactory.createContext()) {
+         context.createProducer().send(context.createQueue("myAddress"), "hello");
+      }
+
+
       server.stop();
 
       addressConfiguration = new CoreAddressConfiguration()
@@ -77,10 +76,16 @@ public class ConfigChangeTest extends ActiveMQTestBase {
       addressConfigurations.clear();
       addressConfigurations.add(addressConfiguration);
       configuration.setAddressConfigurations(addressConfigurations);
-
       server.start();
-      assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
-      assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
+      assertEquals(RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
+      assertEquals(RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
+
+      //Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss)
+      try (JMSContext context = connectionFactory.createContext()) {
+         Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive();
+         assertEquals("hello", ((TextMessage) message).getText());
+      }
+
       server.stop();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
index e5bbe4f..a031359 100644
--- a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
+++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
@@ -23,17 +23,21 @@ under the License.
                xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
 
    <core xmlns="urn:activemq:core">
-      <address-settings>
-         <address-setting match="#">
-            <config-delete-queues>FORCE</config-delete-queues>
-         </address-setting>
-      </address-settings>
+      <security-enabled>false</security-enabled>
+
+      <acceptors>
+         <!-- Default ActiveMQ Artemis Acceptor.  Multi-protocol adapter.  Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
+         <!-- performance tests have shown that openWire performs best with these buffer sizes -->
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+
+      </acceptors>
+
 
       <addresses>
          <address name="myAddress">
-            <anycast>
+            <multicast>
                <queue name="myQueue"/>
-            </anycast>
+            </multicast>
          </address>
       </addresses>
    </core>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/410b3c1d/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
index 61ae86a..6737a05 100644
--- a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
+++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
@@ -23,17 +23,20 @@ under the License.
                xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
 
    <core xmlns="urn:activemq:core">
-      <address-settings>
-         <address-setting match="#">
-            <config-delete-queues>FORCE</config-delete-queues>
-         </address-setting>
-      </address-settings>
+      <security-enabled>false</security-enabled>
+
+      <acceptors>
+         <!-- Default ActiveMQ Artemis Acceptor.  Multi-protocol adapter.  Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
+         <!-- performance tests have shown that openWire performs best with these buffer sizes -->
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+
+      </acceptors>
 
       <addresses>
          <address name="myAddress">
-            <multicast>
+            <anycast>
                <queue name="myQueue"/>
-            </multicast>
+            </anycast>
          </address>
       </addresses>
    </core>