You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/14 18:50:37 UTC

[1/2] activemq-artemis git commit: ARTEMIS-2000: For ScaleDown set the RoutingType header property on the message

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 86c4d1b85 -> d6bdb1a7f


ARTEMIS-2000: For ScaleDown set the RoutingType header property on the message

so if the address does not exist on the other end it will be created correctly.

Test added by Andy Taylor (commit squash during rebase on #2202)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/34df9679
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/34df9679
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/34df9679

Branch: refs/heads/master
Commit: 34df9679d7606c92a74bad133af76b9ece0b1210
Parents: 86c4d1b
Author: Roddie Kieley <rk...@unifiedsoftworx.com>
Authored: Tue Jul 31 10:04:56 2018 -0230
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 14 14:48:09 2018 -0400

----------------------------------------------------------------------
 .../core/server/impl/ScaleDownHandler.java      | 40 ++++++++++++++------
 .../tests/integration/server/ScaleDownTest.java | 27 +++++++++++++
 2 files changed, 55 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34df9679/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index a1c2229..02fe1bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -31,6 +31,7 @@ import java.util.TreeSet;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -162,14 +163,28 @@ public class ScaleDownHandler {
          }
 
          // compile a list of all the relevant queues and queue iterators for this address
+         RoutingType routingType;
+         Integer routingTypeOrdinal;
+         String routingTypeString = "";
          for (Queue loopQueue : queues) {
             logger.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue);
 
             try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.browserIterator()) {
 
+               routingType = loopQueue.getRoutingType();
+               if (null != routingType) {
+                  routingTypeOrdinal = routingType.ordinal();
+                  routingTypeString = routingTypeOrdinal.toString();
+               }
+
                while (messagesIterator.hasNext()) {
                   MessageReference messageReference = messagesIterator.next();
-                  Message message = messageReference.getMessage().copy();
+                  Message originalMessage = messageReference.getMessage();
+
+                  if (null != routingType) {
+                     originalMessage.putStringProperty(Message.HDR_ROUTING_TYPE.toString(), routingTypeString);
+                  }
+                  Message message = originalMessage.copy();
 
                   logger.debug("Reading message " + message + " from queue " + loopQueue);
                   Set<QueuesXRefInnerManager> queuesFound = new HashSet<>();
@@ -179,7 +194,7 @@ public class ScaleDownHandler {
                         // no need to lookup on itself, we just add it
                         queuesFound.add(controlEntry.getValue());
                      } else if (controlEntry.getValue().lookup(messageReference)) {
-                        logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID());
+                        logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID(message));
                         queuesFound.add(controlEntry.getValue());
                      }
                   }
@@ -188,7 +203,7 @@ public class ScaleDownHandler {
                   ByteBuffer buffer = ByteBuffer.allocate(queuesFound.size() * 8);
 
                   for (QueuesXRefInnerManager control : queuesFound) {
-                     long queueID = control.getQueueID();
+                     long queueID = control.getQueueID(message);
                      buffer.putLong(queueID);
                   }
 
@@ -339,7 +354,7 @@ public class ScaleDownHandler {
                   if (queueIDs.containsKey(queueName)) {
                      queueID = queueIDs.get(queueName);
                   } else {
-                     queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString());
+                     queueID = createQueueWithRoutingTypeIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString(), message.getRoutingType());
                      queueIDs.put(queueName, queueID);  // store it so we don't have to look it up every time
                   }
                   Pair<List<Long>, List<Long>> queueIds = queuesToSendTo.get(message);
@@ -361,7 +376,7 @@ public class ScaleDownHandler {
                   if (queueIDs.containsKey(queueName)) {
                      queueID = queueIDs.get(queueName);
                   } else {
-                     queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString());
+                     queueID = createQueueWithRoutingTypeIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString(), message.getRoutingType());
                      queueIDs.put(queueName, queueID);  // store it so we don't have to look it up every time
                   }
                   Pair<List<Long>, List<Long>> queueIds = queuesToSendTo.get(message);
@@ -426,13 +441,14 @@ public class ScaleDownHandler {
     * send directly to a queue, we have to send to an address instead but not all the queues related to the
     * address may need the message
     */
-   private long createQueueIfNecessaryAndGetID(ClientSession session,
-                                               Queue queue,
-                                               SimpleString addressName) throws Exception {
+   private long createQueueWithRoutingTypeIfNecessaryAndGetID(ClientSession session,
+                                                              Queue queue,
+                                                              SimpleString addressName,
+                                                              RoutingType routingType) throws Exception {
       long queueID = getQueueID(session, queue.getName());
       if (queueID == -1) {
-         session.createQueue(addressName, queue.getName(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.isDurable());
-         logger.debug("Failed to get queue ID, creating queue [addressName=" + addressName + ", queueName=" + queue.getName() + ", filter=" + (queue.getFilter() == null ? "" : queue.getFilter().getFilterString()) + ", durable=" + queue.isDurable() + "]");
+         session.createQueue(addressName, routingType, queue.getName(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.isDurable());
+         logger.debug("Failed to get queue ID, creating queue [addressName=" + addressName + ", queueName=" + queue.getName() + ", routingType=" + queue.getRoutingType() + ", filter=" + (queue.getFilter() == null ? "" : queue.getFilter().getFilterString()) + ", durable=" + queue.isDurable() + "]");
          queueID = getQueueID(session, queue.getName());
       }
 
@@ -523,10 +539,10 @@ public class ScaleDownHandler {
          return queue;
       }
 
-      public long getQueueID() throws Exception {
+      public long getQueueID(Message message) throws Exception {
 
          if (targetQueueID < 0) {
-            targetQueueID = createQueueIfNecessaryAndGetID(clientSession, queue, queue.getAddress());
+            targetQueueID = createQueueWithRoutingTypeIfNecessaryAndGetID(clientSession, queue, queue.getAddress(), message.getRoutingType());
          }
          return targetQueueID;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34df9679/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
index ec49ece..1a1d007 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -33,6 +34,7 @@ import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@@ -291,6 +293,31 @@ public class ScaleDownTest extends ClusterTestBase {
    }
 
    @Test
+   public void testScaleDownWithMissingAnycastQueue() throws Exception {
+      final int TEST_SIZE = 2;
+      final String addressName = "testAddress";
+      final String queueName1 = "testQueue1";
+      final String queueName2 = "testQueue2";
+
+      // create 2 queues on each node mapped to the same address
+      createQueue(0, addressName, queueName2, null, false, null, null, RoutingType.ANYCAST);
+
+      // send messages to node 0
+      send(0, addressName, TEST_SIZE, false, null);
+
+      // trigger scaleDown from node 0 to node 1
+      servers[0].stop();
+      
+      Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName2))).getBindable()).getRoutingType(), RoutingType.ANYCAST);
+      // get the 1 message from queue 2
+      addConsumer(0, 1, queueName2, null);
+      ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+
+   }
+
+   @Test
    public void testMessageProperties() throws Exception {
       final int TEST_SIZE = 5;
       final String addressName = "testAddress";


[2/2] activemq-artemis git commit: This closes #2202

Posted by cl...@apache.org.
This closes #2202


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d6bdb1a7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d6bdb1a7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d6bdb1a7

Branch: refs/heads/master
Commit: d6bdb1a7fd93ab5e48a109be8c48a6a74c8dbb84
Parents: 86c4d1b 34df967
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Sep 14 14:49:42 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 14 14:49:42 2018 -0400

----------------------------------------------------------------------
 .../core/server/impl/ScaleDownHandler.java      | 40 ++++++++++++++------
 .../tests/integration/server/ScaleDownTest.java | 27 +++++++++++++
 2 files changed, 55 insertions(+), 12 deletions(-)
----------------------------------------------------------------------