You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/07/21 19:12:53 UTC

[activemq-artemis] branch main updated: ARTEMIS-3392 Scale down would fail if target queue's id greater than max int

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a88c5f  ARTEMIS-3392 Scale down would fail if target queue's id greater than max int
8a88c5f is described below

commit 8a88c5f91307d6633f2cb7cf7eaa3035007b4fcf
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Tue Jul 20 18:43:26 2021 +0800

    ARTEMIS-3392 Scale down would fail if target queue's id greater than max int
---
 .../artemis/core/server/impl/ScaleDownHandler.java |  6 +--
 .../integration/server/ScaleDownDirectTest.java    | 51 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index 0e75305..0e7c405 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -451,8 +451,8 @@ public class ScaleDownHandler {
       return queueID;
    }
 
-   private Integer getQueueID(ClientSession session, SimpleString queueName) throws Exception {
-      Integer queueID = -1;
+   private Long getQueueID(ClientSession session, SimpleString queueName) throws Exception {
+      Long queueID = -1L;
       Object result;
       try (ClientRequestor requestor = new ClientRequestor(session, "activemq.management")) {
          ClientMessage managementMessage = session.createMessage(false);
@@ -463,7 +463,7 @@ public class ScaleDownHandler {
          result = ManagementHelper.getResult(reply);
       }
       if (result != null && result instanceof Number) {
-         queueID = ((Number) result).intValue();
+         queueID = ((Number) result).longValue();
       }
       return queueID;
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
index b9f2685..7c6c6b3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
@@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.impl.ScaleDownHandler;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -294,6 +296,55 @@ public class ScaleDownDirectTest extends ClusterTestBase {
       Assert.assertNull(servers[1].getPostOffice().getBinding(new SimpleString(queueName3)));
    }
 
+   @Test
+   public void testScaleDownWithBigQueueID() throws Exception {
+      final int TEST_SIZE = 2;
+      final String addressName = "testAddress";
+      final String queueName1 = "testQueue1";
+
+      JournalStorageManager manager = (JournalStorageManager) servers[0].getStorageManager();
+      BatchingIDGenerator idGenerator = (BatchingIDGenerator) manager.getIDGenerator();
+      idGenerator.forceNextID((Integer.MAX_VALUE) + 100L);
+
+      long nextId = idGenerator.generateID();
+      assertTrue(nextId > Integer.MAX_VALUE);
+
+      manager = (JournalStorageManager) servers[1].getStorageManager();
+      idGenerator = (BatchingIDGenerator) manager.getIDGenerator();
+      idGenerator.forceNextID((Integer.MAX_VALUE) + 200L);
+
+      nextId = idGenerator.generateID();
+      assertTrue(nextId > Integer.MAX_VALUE);
+
+      // create 2 queues on each node mapped to the same address
+      createQueue(0, addressName, queueName1, null, true);
+      createQueue(1, addressName, queueName1, null, true);
+
+      // send messages to node 0
+      send(0, addressName, TEST_SIZE, true, null);
+
+      // at this point on node 0 there should be 2 messages in testQueue1
+      Wait.assertEquals(TEST_SIZE, () -> getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
+
+      assertEquals(TEST_SIZE, performScaledown());
+      // trigger scaleDown from node 0 to node 1
+      servers[0].stop();
+
+      // get the 2 messages from queue 1
+      addConsumer(0, 1, queueName1, null);
+      ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+
+      // ensure there are no more messages on queue 1
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNull(clientMessage);
+      removeConsumer(0);
+   }
+
    private void checkBody(ClientMessage message, int bufferSize) {
       assertEquals(bufferSize, message.getBodySize());
       byte[] body = new byte[message.getBodySize()];