You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/11/25 15:58:13 UTC

[GitHub] [activemq-artemis] gtully opened a new pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

gtully opened a new pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863


   scenario - avoid paging, if address is full chain another broker and produce to the head, consume from the tail using producer and consumer roles to partition connections. When tail is drained, drop it.
    - adds a option to treat an idle consumer as slow
    - adds basic support for credit based address blocking ARTEMIS-2097
    - adds some more visiblity to address memory usage and balancer attribute modifier operations


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761958322



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
##########
@@ -612,6 +612,45 @@ public void testMinuteSurviving() throws Exception {
       Wait.assertEquals(0, queue::getConsumerCount);
    }
 
+
+   @Test
+   public void testKilledOnNoMessagesSoCanBeRebalanced() throws Exception {
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setSlowConsumerCheckPeriod(2);
+      addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_SECOND);
+      addressSettings.setSlowConsumerThreshold(0); // if there are no messages pending, kill me
+      addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
+
+      server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
+      server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
+
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = addClientSession(sf.createSession(false, true, true, false));
+
+      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+      int messages = 1;
+
+      for (int i = 0; i < messages; i++) {
+         producer.send(session.createMessage(true));
+      }
+      session.commit();
+
+      ConcurrentHashSet<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
+      FixedRateConsumer consumer = new FixedRateConsumer(10, MESSAGES_PER_SECOND, receivedMessages, sf, QUEUE, 0);
+      consumer.start();
+
+      Queue queue = server.locateQueue(QUEUE);
+      Wait.assertEquals(1, queue::getConsumerCount);

Review comment:
       is it the locate queue that may not yet be present?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#issuecomment-988663421


   any more feedback?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#issuecomment-979325745


   the block feature needs some work, I don't fully have a handle on how flows are collected. I need some help here. The intent is to have an address where consumers can drain and producers are blocked rather than whacked. The use case is partitioning an address by producers/consumers, but supporting the scenario where a single connection has both producers and consumers!
   I would like to block a producer via 0 link credit for that address, but the flow control is based on refreshing credit atm. I may be in the wrong place altogether :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760440122



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -1153,6 +1161,31 @@ public boolean isFull() {
       return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
    }
 
+
+   @Override
+   public int getAddressLimitPercent() {
+      final long currentUsage = getAddressSize();
+      if (currentUsage != 0) {
+         if (maxSize > 0) {
+            return Math.toIntExact((currentUsage / maxSize) * 100);

Review comment:
       that does need work and tests, on it, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761993146



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
##########
@@ -612,6 +612,45 @@ public void testMinuteSurviving() throws Exception {
       Wait.assertEquals(0, queue::getConsumerCount);
    }
 
+
+   @Test
+   public void testKilledOnNoMessagesSoCanBeRebalanced() throws Exception {
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setSlowConsumerCheckPeriod(2);
+      addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_SECOND);
+      addressSettings.setSlowConsumerThreshold(0); // if there are no messages pending, kill me
+      addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
+
+      server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
+      server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
+
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = addClientSession(sf.createSession(false, true, true, false));
+
+      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+      int messages = 1;
+
+      for (int i = 0; i < messages; i++) {
+         producer.send(session.createMessage(true));
+      }
+      session.commit();
+
+      ConcurrentHashSet<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
+      FixedRateConsumer consumer = new FixedRateConsumer(10, MESSAGES_PER_SECOND, receivedMessages, sf, QUEUE, 0);
+      consumer.start();
+
+      Queue queue = server.locateQueue(QUEUE);
+      Wait.assertEquals(1, queue::getConsumerCount);

Review comment:
       I have tidied it up a bit, not 100% I captured your feedback!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761911445



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.security.auth.Subject;
+import java.io.File;
+import java.net.URI;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
+import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ElasticQueueTest extends ActiveMQTestBase {
+
+   static final String qName = "EQ";
+   static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
+
+   final int base_port = 61616;
+   final int opTimeoutMillis = 2000;
+   final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
+   private final String balancerConfigName = "role_name_sharder";
+
+   String urlForNodes(Stack<EmbeddedActiveMQ> nodes) {
+      StringBuilder builder = new StringBuilder("failover:(");
+      int port_start = base_port;
+      for (EmbeddedActiveMQ activeMQ : nodes) {
+         if (port_start != base_port) {
+            builder.append(",");
+         }
+         builder.append("amqp://localhost:" + (port_start++));
+      }
+      // fast reconnect, randomize to get to all brokers and timeout sends that block on no credit
+      builder.append(")?failover.maxReconnectDelay=2000&failover.randomize=true&jms.sendTimeout=" + opTimeoutMillis);
+      return builder.toString();
+   }
+
+   // allow tracking of failover reconnects
+   class ConnectionListener implements JmsConnectionListener {
+
+      AtomicInteger connectionCount;
+
+      ConnectionListener(AtomicInteger connectionCount) {
+         this.connectionCount = connectionCount;
+      }
+
+      @Override
+      public void onConnectionEstablished(URI uri) {
+      }
+
+      @Override
+      public void onConnectionFailure(Throwable throwable) {
+      }
+
+      @Override
+      public void onConnectionInterrupted(URI uri) {
+      }
+
+      @Override
+      public void onConnectionRestored(URI uri) {
+         connectionCount.incrementAndGet();
+      }
+
+      @Override
+      public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
+      }
+
+      @Override
+      public void onSessionClosed(Session session, Throwable throwable) {
+      }
+
+      @Override
+      public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) {
+      }
+
+      @Override
+      public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) {
+      }
+   }
+
+   // slow consumer
+   class EQConsumer extends Thread {

Review comment:
       agree.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#issuecomment-988662432


   going to leave the control loop for another day and have a simple isFull check. there is lots to study in https://en.wikipedia.org/wiki/PID_controller#Response_to_disturbances


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760349734



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -1153,6 +1161,31 @@ public boolean isFull() {
       return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
    }
 
+
+   @Override
+   public int getAddressLimitPercent() {
+      final long currentUsage = getAddressSize();
+      if (currentUsage != 0) {
+         if (maxSize > 0) {
+            return Math.toIntExact((currentUsage / maxSize) * 100);
+         } else if (pagingManager.isUsingGlobalSize()) {
+            return Math.toIntExact((currentUsage / pagingManager.getGlobalSize()) * 100);
+         }
+      }
+      return 0;
+   }
+
+   @Override
+   public void block() {
+      blocked = true;
+   }
+
+   @Override
+   public void unBlock() {

Review comment:
       thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760370371



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -130,6 +162,51 @@ public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
       assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
    }
 
+   @Test(timeout = 10000)
+   public void testSendHangsWhenBlockedAndNotAfterUnBlocked() throws Exception {
+      Connection connection = createConnection(new URI(singleCreditAcceptorURI.replace("tcp", "amqp")), null, null, null, true);
+      final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Destination d = session.createQueue(getQueueName());
+      final MessageProducer p = session.createProducer(d);
+
+      final CountDownLatch running = new CountDownLatch(1);
+      final CountDownLatch done = new CountDownLatch(1);
+
+      AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
+
+      assertTrue("blocked ok", addressControl.block());
+
+      // one credit
+      p.send(session.createBytesMessage());
+
+      // this send will block, no credit
+      new Thread(new Runnable() {
+         @Override
+         public void run() {
+            try {
+               running.countDown();
+               p.send(session.createBytesMessage());
+            } catch (JMSException ignored) {
+            } finally {
+               done.countDown();
+            }
+         }
+      }).start();
+
+      assertTrue(running.await(5, TimeUnit.SECONDS));
+
+      assertFalse(done.await(1, TimeUnit.SECONDS));

Review comment:
       true




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gemmellr commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gemmellr commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760409292



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
##########
@@ -201,4 +201,16 @@ public TargetResult getTarget(String key) {
 
       return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
    }
+
+   public void setLocalTargetFilter(String regExp) {
+      if (regExp == null || regExp.isBlank()) {

Review comment:
       I wasnt suggesting not to use it, in case thats what it seemed like.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gemmellr commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gemmellr commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r758286287



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -695,6 +697,14 @@ public boolean checkMemory(final Runnable runWhenAvailable) {
    @Override
    public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
 
+      if (blocked) {
+         if (runWhenAvailable != null) {
+            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+         }
+         ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());

Review comment:
       This could be spammy, logging for each call. The existing usage of this logger call is guarded by the 'blocking' boolean to have it act as more a log that the state has toggled. The equivalent of that would seem to be doing this logger call in the blocked() method (and checking the boolean) rather than here.
   
   Perhaps it should have its own logger call given that its matching 'unblock' is really separate from the existing 'blocking' mechanism.

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -1153,6 +1161,31 @@ public boolean isFull() {
       return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
    }
 
+
+   @Override
+   public int getAddressLimitPercent() {
+      final long currentUsage = getAddressSize();
+      if (currentUsage != 0) {
+         if (maxSize > 0) {
+            return Math.toIntExact((currentUsage / maxSize) * 100);
+         } else if (pagingManager.isUsingGlobalSize()) {
+            return Math.toIntExact((currentUsage / pagingManager.getGlobalSize()) * 100);
+         }
+      }
+      return 0;
+   }
+
+   @Override
+   public void block() {
+      blocked = true;
+   }
+
+   @Override
+   public void unBlock() {

Review comment:
       No capital needed in unblock.

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked() throws Exception {

Review comment:
       ```suggestion
      public void testCreditIsNotGivenOnLinkCreationWhileBlockedAndIsGivenOnceThenUnblocked() throws Exception {
   ```

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -130,6 +162,51 @@ public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
       assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
    }
 
+   @Test(timeout = 10000)
+   public void testSendHangsWhenBlockedAndNotAfterUnBlocked() throws Exception {
+      Connection connection = createConnection(new URI(singleCreditAcceptorURI.replace("tcp", "amqp")), null, null, null, true);
+      final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Destination d = session.createQueue(getQueueName());
+      final MessageProducer p = session.createProducer(d);
+
+      final CountDownLatch running = new CountDownLatch(1);
+      final CountDownLatch done = new CountDownLatch(1);
+
+      AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
+
+      assertTrue("blocked ok", addressControl.block());
+
+      // one credit
+      p.send(session.createBytesMessage());
+
+      // this send will block, no credit
+      new Thread(new Runnable() {
+         @Override
+         public void run() {
+            try {
+               running.countDown();
+               p.send(session.createBytesMessage());
+            } catch (JMSException ignored) {
+            } finally {
+               done.countDown();
+            }
+         }
+      }).start();
+
+      assertTrue(running.await(5, TimeUnit.SECONDS));
+
+      assertFalse(done.await(1, TimeUnit.SECONDS));

Review comment:
       I dont think it needs as long as this. Especially if you verify there is no credit left. The message will never actually even leave the client until some arrives, waiting an entire second after knowing the thread started and is about to call send next doesn't seem necessary in the typical case.

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -126,6 +126,8 @@
 
    private volatile boolean blocking = false;
 
+   private volatile boolean blocked = false;

Review comment:
       I would give this (and possibly also the existing 'blocking' variable) a different name so its more distinguished, e.g manuallyBlocked. Its not at all clear with blocked+blocking that they are really quite unrelated, and that setting the store 'blocked' does not mean it is 'blocking'. Which leads to the next comment.

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -695,6 +697,14 @@ public boolean checkMemory(final Runnable runWhenAvailable) {
    @Override
    public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
 
+      if (blocked) {
+         if (runWhenAvailable != null) {
+            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));

Review comment:
       This seems questionable, the onMemoryFreedRunnables queue is already used by the existing disk usage based 'blocking' mechanism. Using it for this new but separate 'blocked' mechanism as well means there are likely states that the other [un]'blocking' mechanism could actually run these actions even though the store was and is still 'blocked'.
   
   (Perhaps also vice-versa when the unBlock() is called)

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked() throws Exception {
+      AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
+         addressControl.block();
+         AmqpSession session = connection.createSession();
+         final AmqpSender sender = session.createSender(getQueueName());
+         assertEquals("Should get 0 credit", 0, sender.getSender().getCredit());
+
+         addressControl.unBlock();
+         assertTrue("Should now get issued one credit", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return 1 == sender.getSender().getCredit();
+            }
+         }));

Review comment:
       The default check interval is 100ms, I would set a smaller one for this (and smaller limit, default 30sec), as its likely not to happen on the first check, but shouldnt take anything like that long to then arrive.

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -130,6 +162,51 @@ public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
       assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
    }
 
+   @Test(timeout = 10000)
+   public void testSendHangsWhenBlockedAndNotAfterUnBlocked() throws Exception {

Review comment:
       ```suggestion
      public void testSendBlocksWhenAddressBlockedAndCompletesAfterUnblocked() throws Exception {
   ```

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
##########
@@ -201,4 +201,16 @@ public TargetResult getTarget(String key) {
 
       return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
    }
+
+   public void setLocalTargetFilter(String regExp) {
+      if (regExp == null || regExp.isBlank()) {

Review comment:
       Advanced Java 11 features! :D ;)

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked() throws Exception {
+      AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
+         addressControl.block();
+         AmqpSession session = connection.createSession();
+         final AmqpSender sender = session.createSender(getQueueName());
+         assertEquals("Should get 0 credit", 0, sender.getSender().getCredit());

Review comment:
       This test is susceptible to the inherent race of checking credit, as credit is granted separately and so may not have arrived before the create method returned and you check it. Should probably use a very small wait (e.g 5-10ms) here to help give more confidence in this state and a small window it could fail if some should arrive unexpectedly before checking it, and before/regardless of the unblock call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gemmellr commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gemmellr commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760084361



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -1153,6 +1161,31 @@ public boolean isFull() {
       return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
    }
 
+
+   @Override
+   public int getAddressLimitPercent() {
+      final long currentUsage = getAddressSize();
+      if (currentUsage != 0) {
+         if (maxSize > 0) {
+            return Math.toIntExact((currentUsage / maxSize) * 100);

Review comment:
       Or 1, if currentUsage is the same (or bigger but < double).
   
   Phrased another way: unit tests missing with 0% !=  x != 100% checks also?
   
   (the currentUsage != 0 could probably go away too)

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.security.auth.Subject;
+import java.io.File;
+import java.net.URI;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
+import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ElasticQueueTest extends ActiveMQTestBase {
+
+   static final String qName = "EQ";
+   static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
+
+   final int base_port = 61616;
+   final int opTimeoutMillis = 2000;
+   final Stack<EmbeddedActiveMQ> nodes = new Stack<>();

Review comment:
       Does anything clean these up when the test fails mid-way?

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -695,6 +697,14 @@ public boolean checkMemory(final Runnable runWhenAvailable) {
    @Override
    public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
 
+      if (blocked) {
+         if (runWhenAvailable != null) {
+            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));

Review comment:
       unBlock() would need to do the reverse too I guess, check if `blocking`. 

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.security.auth.Subject;
+import java.io.File;
+import java.net.URI;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
+import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ElasticQueueTest extends ActiveMQTestBase {
+
+   static final String qName = "EQ";
+   static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
+
+   final int base_port = 61616;
+   final int opTimeoutMillis = 2000;
+   final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
+   private final String balancerConfigName = "role_name_sharder";
+
+   String urlForNodes(Stack<EmbeddedActiveMQ> nodes) {
+      StringBuilder builder = new StringBuilder("failover:(");
+      int port_start = base_port;
+      for (EmbeddedActiveMQ activeMQ : nodes) {
+         if (port_start != base_port) {
+            builder.append(",");
+         }
+         builder.append("amqp://localhost:" + (port_start++));
+      }
+      // fast reconnect, randomize to get to all brokers and timeout sends that block on no credit
+      builder.append(")?failover.maxReconnectDelay=2000&failover.randomize=true&jms.sendTimeout=" + opTimeoutMillis);
+      return builder.toString();
+   }
+
+   // allow tracking of failover reconnects
+   class ConnectionListener implements JmsConnectionListener {
+
+      AtomicInteger connectionCount;
+
+      ConnectionListener(AtomicInteger connectionCount) {
+         this.connectionCount = connectionCount;
+      }
+
+      @Override
+      public void onConnectionEstablished(URI uri) {
+      }
+
+      @Override
+      public void onConnectionFailure(Throwable throwable) {
+      }
+
+      @Override
+      public void onConnectionInterrupted(URI uri) {
+      }
+
+      @Override
+      public void onConnectionRestored(URI uri) {
+         connectionCount.incrementAndGet();
+      }
+
+      @Override
+      public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
+      }
+
+      @Override
+      public void onSessionClosed(Session session, Throwable throwable) {
+      }
+
+      @Override
+      public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) {
+      }
+
+      @Override
+      public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) {
+      }
+   }
+
+   // slow consumer
+   class EQConsumer extends Thread {

Review comment:
       Probably nicer to use an executor which can then be cleaned up afterwards




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760352426



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked() throws Exception {

Review comment:
       better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully merged pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully merged pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#issuecomment-987959268


   draft till I rebase and possibly pull in a pid controller


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760328952



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -695,6 +697,14 @@ public boolean checkMemory(final Runnable runWhenAvailable) {
    @Override
    public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
 
+      if (blocked) {
+         if (runWhenAvailable != null) {
+            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));

Review comment:
       good catch, the do need to coordinate.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] brusdev commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
brusdev commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r759323382



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.security.auth.Subject;
+import java.io.File;
+import java.net.URI;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
+import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ElasticQueueTest extends ActiveMQTestBase {
+
+   static final String qName = "EQ";
+   static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
+
+   final int base_port = 61616;
+   final int opTimeoutMillis = 2000;
+   final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
+   private final String balancerConfigName = "role_name_sharder";
+
+   String urlForNodes(Stack<EmbeddedActiveMQ> nodes) {
+      StringBuilder builder = new StringBuilder("failover:(");
+      int port_start = base_port;
+      for (EmbeddedActiveMQ activeMQ : nodes) {
+         if (port_start != base_port) {
+            builder.append(",");
+         }
+         builder.append("amqp://localhost:" + (port_start++));
+      }
+      // fast reconnect, randomize to get to all brokers and timeout sends that block on no credit
+      builder.append(")?failover.maxReconnectDelay=2000&failover.randomize=true&jms.sendTimeout=" + opTimeoutMillis);
+      return builder.toString();
+   }
+
+   // allow tracking of failover reconnects
+   class ConnectionListener implements JmsConnectionListener {
+
+      AtomicInteger connectionCount;
+
+      ConnectionListener(AtomicInteger connectionCount) {
+         this.connectionCount = connectionCount;
+      }
+
+      @Override
+      public void onConnectionEstablished(URI uri) {
+      }
+
+      @Override
+      public void onConnectionFailure(Throwable throwable) {
+      }
+
+      @Override
+      public void onConnectionInterrupted(URI uri) {
+      }
+
+      @Override
+      public void onConnectionRestored(URI uri) {
+         connectionCount.incrementAndGet();
+      }
+
+      @Override
+      public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
+      }
+
+      @Override
+      public void onSessionClosed(Session session, Throwable throwable) {
+      }
+
+      @Override
+      public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) {
+      }
+
+      @Override
+      public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) {
+      }
+   }
+
+   // slow consumer
+   class EQConsumer extends Thread {
+
+      final AtomicInteger consumedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      private final String url;
+      private final int delayMillis;
+      long lastConsumed = 0;
+
+      EQConsumer(String url) {
+         this(url, 1000);
+      }
+
+      EQConsumer(String url, int delay) {
+         this.url = url;
+         this.delayMillis = delay;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory("CONSUMER", "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageConsumer messageConsumer = session.createConsumer(session.createQueue(qName));
+
+               while (!done.get()) {
+                  Message receivedMessage = messageConsumer.receiveNoWait();
+                  if (receivedMessage != null) {
+                     consumedCount.incrementAndGet();
+                     lastConsumed = receivedMessage.getLongProperty("PID");
+                  } else {
+                     TimeUnit.MILLISECONDS.sleep(delayMillis);
+                  }
+               }
+            } catch (JMSException | InterruptedException e) {
+            }
+         }
+      }
+
+      public long getLastConsumed() {
+         return lastConsumed;
+      }
+   }
+
+   // regular producer
+   class EQProducer extends Thread {
+
+      final AtomicInteger producedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      private final String url;
+
+      EQProducer(String url) {
+         this.url = url;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory("PRODUCER", "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
+
+               BytesMessage message = session.createBytesMessage();
+               message.writeBytes(new byte[1024]);
+               while (!done.get()) {
+                  message.setLongProperty("PID", producedCount.get() + 1);
+                  messageProducer.send(message);
+                  producedCount.incrementAndGet();
+               }
+            } catch (ResourceAllocationException expected) {
+            } catch (JMSException e) {
+            }
+         }
+      }
+
+      public long getLastProduced() {
+         return producedCount.get();
+      }
+   }
+
+   // combined producer/ async consumer
+   class EQProducerAsyncConsumer extends Thread {
+
+      final AtomicInteger producedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      final AtomicBoolean producerDone = new AtomicBoolean();
+      private final String url;
+      final AtomicInteger consumedCount = new AtomicInteger();
+      private final String user;
+      private long lastConsumed;
+
+      EQProducerAsyncConsumer(String url, String user) {
+         this.url = url;
+         this.user = user;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(user, "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session clientSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+               MessageConsumer messageConsumer = clientSession.createConsumer(clientSession.createQueue(qName));
+               // consume async
+               messageConsumer.setMessageListener(new MessageListener() {
+                  @Override
+                  public void onMessage(Message message) {
+                     consumedCount.incrementAndGet();
+                     try {
+                        lastConsumed = message.getLongProperty("PID");
+                        if (!producerDone.get()) {
+                           TimeUnit.SECONDS.sleep(1);
+                        }
+                        message.acknowledge();
+                     } catch (JMSException | InterruptedException ignored) {
+                     }
+                  }
+               });
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
+               BytesMessage message = session.createBytesMessage();
+               message.writeBytes(new byte[1024]);
+               while (!done.get()) {
+                  if (!producerDone.get()) {
+                     message.setLongProperty("PID", producedCount.get() + 1);
+                     messageProducer.send(message);
+                     producedCount.incrementAndGet();
+                  } else {
+                     // just hang about and let the consumer listener work
+                     TimeUnit.SECONDS.sleep(5);
+                  }
+               }
+            } catch (JMSException | InterruptedException ignored) {
+            }
+         }
+      }
+
+      public long getLastProduced() {
+         return producedCount.get();
+      }
+
+      public long getLastConsumed() {
+         return lastConsumed;
+      }
+   }
+
+   MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+
+   // hardwire authenticaton to map USER to EQ_USER etc
+   final ActiveMQSecurityManager5 customSecurityManager = new ActiveMQSecurityManager5() {
+      @Override
+      public Subject authenticate(String user,
+                                  String password,
+                                  RemotingConnection remotingConnection,
+                                  String securityDomain) {
+         Subject subject = null;
+         if (validateUser(user, password)) {
+            subject = new Subject();
+            subject.getPrincipals().add(new UserPrincipal(user));
+            subject.getPrincipals().add(new RolePrincipal("EQ_" + user));
+            if (user.equals("BOTH")) {
+               subject.getPrincipals().add(new RolePrincipal("EQ_PRODUCER"));
+               subject.getPrincipals().add(new RolePrincipal("EQ_CONSUMER"));
+            }
+         }
+         return subject;
+      }
+
+      @Override
+      public boolean authorize(Subject subject, Set<Role> roles, CheckType checkType, String address) {
+         return true;
+      }
+
+      @Override
+      public boolean validateUser(final String username, final String password) {
+         return (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH"));
+      }
+
+      @Override
+      public boolean validateUserAndRole(final String username,
+                                         final String password,
+                                         final Set<Role> requiredRoles,
+                                         final CheckType checkType) {
+         if (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH")) {
+            return true;
+         }
+         return false;
+      }
+   };
+
+   final ObjectNameBuilder node0NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node0", true);
+   final ObjectNameBuilder node1NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node1", true);
+
+
+   /*
+    use case is dispatch from memory, with non-blocking producers
+    producers add to the head of the broker chain, consumers receive from the tail
+    when head == tail we are back to one broker for that address, the end of the chain
+   */
+   private void prepareNodesAndStartCombinedHeadTail() throws Exception {
+      prepareNodesAndStartCombinedHeadTail(1000, 300);
+   }
+
+   private void prepareNodesAndStartCombinedHeadTail(int credit, int creditMin) throws Exception {
+      Assert.assertTrue(credit > 0);
+
+      AddressSettings blockingQueue = new AddressSettings();
+      blockingQueue.setMaxSizeBytes(100 * 1024)
+         .setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL)
+         .setSlowConsumerPolicy(SlowConsumerPolicy.KILL).setSlowConsumerThreshold(0).setSlowConsumerCheckPeriod(TimeUnit.MILLISECONDS.toSeconds(opTimeoutMillis))
+         .setAutoDeleteQueues(false).setAutoDeleteAddresses(false); // so slow consumer can kick in!
+
+      Configuration baseConfig = new ConfigurationImpl();
+      baseConfig.getAddressesSettings().put(qName, blockingQueue);
+
+      BrokerBalancerConfiguration balancerConfiguration = new BrokerBalancerConfiguration();
+      balancerConfiguration.setName(balancerConfigName).setTargetKey(TargetKey.ROLE_NAME).setTargetKeyFilter("(?<=^EQ_).*"); // strip EQ_ prefix
+      baseConfig.addBalancerConfiguration(balancerConfiguration);
+
+      // prepare two nodes
+      for (int nodeId = 0; nodeId < 2; nodeId++) {
+         Configuration configuration = baseConfig.copy();
+         configuration.setName("Node" + nodeId);
+         configuration.setBrokerInstance(new File(getTestDirfile(), configuration.getName()));
+         configuration.addAcceptorConfiguration("tcp", "tcp://localhost:" + (base_port + (nodeId)) + "?redirect-to=" + balancerConfigName + ";amqpCredits=" + credit + ";amqpMinCredits=" + creditMin);
+         nodes.add(new EmbeddedActiveMQ().setConfiguration(configuration));
+         nodes.get(nodeId).setSecurityManager(customSecurityManager);
+         nodes.get(nodeId).setMbeanServer(mBeanServer);
+      }
+
+      // node0 initially handles both producer & consumer (head & tail)
+      nodes.get(0).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER|CONSUMER");
+      nodes.get(0).start();
+   }
+
+   @Test (timeout = 20000)
+   public void testScale0_1() throws Exception {
+
+      prepareNodesAndStartCombinedHeadTail();
+
+      // slow consumer, delay on each message received
+      EQConsumer eqConsumer = new EQConsumer(urlForNodes(nodes));
+      eqConsumer.start();
+
+      // verify consumer reconnects on no messages
+      assertTrue(Wait.waitFor(() -> eqConsumer.connectionCount.get() > 1));
+
+      EQProducer eqProducer = new EQProducer(urlForNodes(nodes));
+      eqProducer.start();
+
+      // verify producer reconnects on fail full!
+      assertTrue(Wait.waitFor(() -> eqProducer.connectionCount.get() > 1));

Review comment:
       why not using Wait.assertTrue ?

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.security.auth.Subject;
+import java.io.File;
+import java.net.URI;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
+import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ElasticQueueTest extends ActiveMQTestBase {
+
+   static final String qName = "EQ";
+   static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
+
+   final int base_port = 61616;
+   final int opTimeoutMillis = 2000;
+   final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
+   private final String balancerConfigName = "role_name_sharder";
+
+   String urlForNodes(Stack<EmbeddedActiveMQ> nodes) {
+      StringBuilder builder = new StringBuilder("failover:(");
+      int port_start = base_port;
+      for (EmbeddedActiveMQ activeMQ : nodes) {
+         if (port_start != base_port) {
+            builder.append(",");
+         }
+         builder.append("amqp://localhost:" + (port_start++));
+      }
+      // fast reconnect, randomize to get to all brokers and timeout sends that block on no credit
+      builder.append(")?failover.maxReconnectDelay=2000&failover.randomize=true&jms.sendTimeout=" + opTimeoutMillis);
+      return builder.toString();
+   }
+
+   // allow tracking of failover reconnects
+   class ConnectionListener implements JmsConnectionListener {
+
+      AtomicInteger connectionCount;
+
+      ConnectionListener(AtomicInteger connectionCount) {
+         this.connectionCount = connectionCount;
+      }
+
+      @Override
+      public void onConnectionEstablished(URI uri) {
+      }
+
+      @Override
+      public void onConnectionFailure(Throwable throwable) {
+      }
+
+      @Override
+      public void onConnectionInterrupted(URI uri) {
+      }
+
+      @Override
+      public void onConnectionRestored(URI uri) {
+         connectionCount.incrementAndGet();
+      }
+
+      @Override
+      public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
+      }
+
+      @Override
+      public void onSessionClosed(Session session, Throwable throwable) {
+      }
+
+      @Override
+      public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) {
+      }
+
+      @Override
+      public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) {
+      }
+   }
+
+   // slow consumer
+   class EQConsumer extends Thread {
+
+      final AtomicInteger consumedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      private final String url;
+      private final int delayMillis;
+      long lastConsumed = 0;
+
+      EQConsumer(String url) {
+         this(url, 1000);
+      }
+
+      EQConsumer(String url, int delay) {
+         this.url = url;
+         this.delayMillis = delay;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory("CONSUMER", "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageConsumer messageConsumer = session.createConsumer(session.createQueue(qName));
+
+               while (!done.get()) {
+                  Message receivedMessage = messageConsumer.receiveNoWait();
+                  if (receivedMessage != null) {
+                     consumedCount.incrementAndGet();
+                     lastConsumed = receivedMessage.getLongProperty("PID");
+                  } else {
+                     TimeUnit.MILLISECONDS.sleep(delayMillis);
+                  }
+               }
+            } catch (JMSException | InterruptedException e) {
+            }
+         }
+      }
+
+      public long getLastConsumed() {
+         return lastConsumed;
+      }
+   }
+
+   // regular producer
+   class EQProducer extends Thread {
+
+      final AtomicInteger producedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      private final String url;
+
+      EQProducer(String url) {
+         this.url = url;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory("PRODUCER", "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
+
+               BytesMessage message = session.createBytesMessage();
+               message.writeBytes(new byte[1024]);
+               while (!done.get()) {
+                  message.setLongProperty("PID", producedCount.get() + 1);
+                  messageProducer.send(message);
+                  producedCount.incrementAndGet();
+               }
+            } catch (ResourceAllocationException expected) {
+            } catch (JMSException e) {
+            }
+         }
+      }
+
+      public long getLastProduced() {
+         return producedCount.get();
+      }
+   }
+
+   // combined producer/ async consumer
+   class EQProducerAsyncConsumer extends Thread {
+
+      final AtomicInteger producedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      final AtomicBoolean producerDone = new AtomicBoolean();
+      private final String url;
+      final AtomicInteger consumedCount = new AtomicInteger();
+      private final String user;
+      private long lastConsumed;
+
+      EQProducerAsyncConsumer(String url, String user) {
+         this.url = url;
+         this.user = user;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(user, "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session clientSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+               MessageConsumer messageConsumer = clientSession.createConsumer(clientSession.createQueue(qName));
+               // consume async
+               messageConsumer.setMessageListener(new MessageListener() {
+                  @Override
+                  public void onMessage(Message message) {
+                     consumedCount.incrementAndGet();
+                     try {
+                        lastConsumed = message.getLongProperty("PID");
+                        if (!producerDone.get()) {
+                           TimeUnit.SECONDS.sleep(1);
+                        }
+                        message.acknowledge();
+                     } catch (JMSException | InterruptedException ignored) {
+                     }
+                  }
+               });
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
+               BytesMessage message = session.createBytesMessage();
+               message.writeBytes(new byte[1024]);
+               while (!done.get()) {
+                  if (!producerDone.get()) {
+                     message.setLongProperty("PID", producedCount.get() + 1);
+                     messageProducer.send(message);
+                     producedCount.incrementAndGet();
+                  } else {
+                     // just hang about and let the consumer listener work
+                     TimeUnit.SECONDS.sleep(5);
+                  }
+               }
+            } catch (JMSException | InterruptedException ignored) {
+            }
+         }
+      }
+
+      public long getLastProduced() {
+         return producedCount.get();
+      }
+
+      public long getLastConsumed() {
+         return lastConsumed;
+      }
+   }
+
+   MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+
+   // hardwire authenticaton to map USER to EQ_USER etc
+   final ActiveMQSecurityManager5 customSecurityManager = new ActiveMQSecurityManager5() {
+      @Override
+      public Subject authenticate(String user,
+                                  String password,
+                                  RemotingConnection remotingConnection,
+                                  String securityDomain) {
+         Subject subject = null;
+         if (validateUser(user, password)) {
+            subject = new Subject();
+            subject.getPrincipals().add(new UserPrincipal(user));
+            subject.getPrincipals().add(new RolePrincipal("EQ_" + user));
+            if (user.equals("BOTH")) {
+               subject.getPrincipals().add(new RolePrincipal("EQ_PRODUCER"));
+               subject.getPrincipals().add(new RolePrincipal("EQ_CONSUMER"));
+            }
+         }
+         return subject;
+      }
+
+      @Override
+      public boolean authorize(Subject subject, Set<Role> roles, CheckType checkType, String address) {
+         return true;
+      }
+
+      @Override
+      public boolean validateUser(final String username, final String password) {
+         return (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH"));
+      }
+
+      @Override
+      public boolean validateUserAndRole(final String username,
+                                         final String password,
+                                         final Set<Role> requiredRoles,
+                                         final CheckType checkType) {
+         if (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH")) {
+            return true;
+         }
+         return false;
+      }
+   };
+
+   final ObjectNameBuilder node0NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node0", true);
+   final ObjectNameBuilder node1NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node1", true);
+
+
+   /*
+    use case is dispatch from memory, with non-blocking producers
+    producers add to the head of the broker chain, consumers receive from the tail
+    when head == tail we are back to one broker for that address, the end of the chain
+   */
+   private void prepareNodesAndStartCombinedHeadTail() throws Exception {
+      prepareNodesAndStartCombinedHeadTail(1000, 300);
+   }
+
+   private void prepareNodesAndStartCombinedHeadTail(int credit, int creditMin) throws Exception {
+      Assert.assertTrue(credit > 0);
+
+      AddressSettings blockingQueue = new AddressSettings();
+      blockingQueue.setMaxSizeBytes(100 * 1024)
+         .setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL)
+         .setSlowConsumerPolicy(SlowConsumerPolicy.KILL).setSlowConsumerThreshold(0).setSlowConsumerCheckPeriod(TimeUnit.MILLISECONDS.toSeconds(opTimeoutMillis))
+         .setAutoDeleteQueues(false).setAutoDeleteAddresses(false); // so slow consumer can kick in!
+
+      Configuration baseConfig = new ConfigurationImpl();
+      baseConfig.getAddressesSettings().put(qName, blockingQueue);
+
+      BrokerBalancerConfiguration balancerConfiguration = new BrokerBalancerConfiguration();
+      balancerConfiguration.setName(balancerConfigName).setTargetKey(TargetKey.ROLE_NAME).setTargetKeyFilter("(?<=^EQ_).*"); // strip EQ_ prefix
+      baseConfig.addBalancerConfiguration(balancerConfiguration);
+
+      // prepare two nodes
+      for (int nodeId = 0; nodeId < 2; nodeId++) {
+         Configuration configuration = baseConfig.copy();
+         configuration.setName("Node" + nodeId);
+         configuration.setBrokerInstance(new File(getTestDirfile(), configuration.getName()));
+         configuration.addAcceptorConfiguration("tcp", "tcp://localhost:" + (base_port + (nodeId)) + "?redirect-to=" + balancerConfigName + ";amqpCredits=" + credit + ";amqpMinCredits=" + creditMin);
+         nodes.add(new EmbeddedActiveMQ().setConfiguration(configuration));
+         nodes.get(nodeId).setSecurityManager(customSecurityManager);
+         nodes.get(nodeId).setMbeanServer(mBeanServer);
+      }
+
+      // node0 initially handles both producer & consumer (head & tail)
+      nodes.get(0).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER|CONSUMER");
+      nodes.get(0).start();
+   }
+
+   @Test (timeout = 20000)
+   public void testScale0_1() throws Exception {
+
+      prepareNodesAndStartCombinedHeadTail();
+
+      // slow consumer, delay on each message received
+      EQConsumer eqConsumer = new EQConsumer(urlForNodes(nodes));
+      eqConsumer.start();
+
+      // verify consumer reconnects on no messages
+      assertTrue(Wait.waitFor(() -> eqConsumer.connectionCount.get() > 1));
+
+      EQProducer eqProducer = new EQProducer(urlForNodes(nodes));
+      eqProducer.start();
+
+      // verify producer reconnects on fail full!
+      assertTrue(Wait.waitFor(() -> eqProducer.connectionCount.get() > 1));
+
+      // operator mode, poll queue control - to allow producer to continue, activate next broker in the 'chain'
+      AddressControl addressControl0 = (AddressControl) ManagementControlHelper.createProxy(node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer);
+
+      AddressControl finalAddressControl = addressControl0;
+      assertTrue(Wait.waitFor(() -> {
+         int usage = finalAddressControl.getAddressLimitPercent();
+         System.out.println("Node0 usage % " + usage);
+         return usage == 100;

Review comment:
       I guess usage percent could never be 100




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761933505



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -695,6 +697,14 @@ public boolean checkMemory(final Runnable runWhenAvailable) {
    @Override
    public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
 
+      if (blocked) {
+         if (runWhenAvailable != null) {
+            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+         }
+         ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());

Review comment:
       agree. I have added its own log on state change, the audit logger on operations also covers it but I think it is good to have an indication in the regular server log for something with large repercussions like blocking




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#issuecomment-980038594


   on more investigation, in the absence of doing a drain, we respect existing credit and track requests up to the credit limits. This works for the already limited case needing to get an exception (FAIL strategy) and allows resumption/unblocking to renew credit when there are no other constraints. Eventually such a producer will run out of credit and block.
   For new links/senders - there is no new credit grant which forces blocking on credit and this is perfect. Senders can stay blocked or use a send timeout to error out of that state.
   In short, I think the blocking is in the right place.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761912473



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -126,6 +126,8 @@
 
    private volatile boolean blocking = false;
 
+   private volatile boolean blocked = false;

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760368457



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -130,6 +162,51 @@ public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
       assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
    }
 
+   @Test(timeout = 10000)
+   public void testSendHangsWhenBlockedAndNotAfterUnBlocked() throws Exception {

Review comment:
       better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gemmellr commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gemmellr commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760110656



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.security.auth.Subject;
+import java.io.File;
+import java.net.URI;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
+import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ElasticQueueTest extends ActiveMQTestBase {
+
+   static final String qName = "EQ";
+   static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
+
+   final int base_port = 61616;
+   final int opTimeoutMillis = 2000;
+   final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
+   private final String balancerConfigName = "role_name_sharder";
+
+   String urlForNodes(Stack<EmbeddedActiveMQ> nodes) {
+      StringBuilder builder = new StringBuilder("failover:(");
+      int port_start = base_port;
+      for (EmbeddedActiveMQ activeMQ : nodes) {
+         if (port_start != base_port) {
+            builder.append(",");
+         }
+         builder.append("amqp://localhost:" + (port_start++));
+      }
+      // fast reconnect, randomize to get to all brokers and timeout sends that block on no credit
+      builder.append(")?failover.maxReconnectDelay=2000&failover.randomize=true&jms.sendTimeout=" + opTimeoutMillis);
+      return builder.toString();
+   }
+
+   // allow tracking of failover reconnects
+   class ConnectionListener implements JmsConnectionListener {
+
+      AtomicInteger connectionCount;
+
+      ConnectionListener(AtomicInteger connectionCount) {
+         this.connectionCount = connectionCount;
+      }
+
+      @Override
+      public void onConnectionEstablished(URI uri) {
+      }
+
+      @Override
+      public void onConnectionFailure(Throwable throwable) {
+      }
+
+      @Override
+      public void onConnectionInterrupted(URI uri) {
+      }
+
+      @Override
+      public void onConnectionRestored(URI uri) {
+         connectionCount.incrementAndGet();
+      }
+
+      @Override
+      public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
+      }
+
+      @Override
+      public void onSessionClosed(Session session, Throwable throwable) {
+      }
+
+      @Override
+      public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) {
+      }
+
+      @Override
+      public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) {
+      }
+   }
+
+   // slow consumer
+   class EQConsumer extends Thread {
+
+      final AtomicInteger consumedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      private final String url;
+      private final int delayMillis;
+      long lastConsumed = 0;
+
+      EQConsumer(String url) {
+         this(url, 1000);
+      }
+
+      EQConsumer(String url, int delay) {
+         this.url = url;
+         this.delayMillis = delay;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory("CONSUMER", "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageConsumer messageConsumer = session.createConsumer(session.createQueue(qName));
+
+               while (!done.get()) {
+                  Message receivedMessage = messageConsumer.receiveNoWait();
+                  if (receivedMessage != null) {
+                     consumedCount.incrementAndGet();
+                     lastConsumed = receivedMessage.getLongProperty("PID");
+                  } else {
+                     TimeUnit.MILLISECONDS.sleep(delayMillis);
+                  }
+               }
+            } catch (JMSException | InterruptedException e) {
+            }
+         }
+      }
+
+      public long getLastConsumed() {
+         return lastConsumed;
+      }
+   }
+
+   // regular producer
+   class EQProducer extends Thread {
+
+      final AtomicInteger producedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      private final String url;
+
+      EQProducer(String url) {
+         this.url = url;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory("PRODUCER", "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
+
+               BytesMessage message = session.createBytesMessage();
+               message.writeBytes(new byte[1024]);
+               while (!done.get()) {
+                  message.setLongProperty("PID", producedCount.get() + 1);
+                  messageProducer.send(message);
+                  producedCount.incrementAndGet();
+               }
+            } catch (ResourceAllocationException expected) {
+            } catch (JMSException e) {
+            }
+         }
+      }
+
+      public long getLastProduced() {
+         return producedCount.get();
+      }
+   }
+
+   // combined producer/ async consumer
+   class EQProducerAsyncConsumer extends Thread {
+
+      final AtomicInteger producedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      final AtomicBoolean producerDone = new AtomicBoolean();
+      private final String url;
+      final AtomicInteger consumedCount = new AtomicInteger();
+      private final String user;
+      private long lastConsumed;
+
+      EQProducerAsyncConsumer(String url, String user) {
+         this.url = url;
+         this.user = user;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(user, "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session clientSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+               MessageConsumer messageConsumer = clientSession.createConsumer(clientSession.createQueue(qName));
+               // consume async
+               messageConsumer.setMessageListener(new MessageListener() {
+                  @Override
+                  public void onMessage(Message message) {
+                     consumedCount.incrementAndGet();
+                     try {
+                        lastConsumed = message.getLongProperty("PID");
+                        if (!producerDone.get()) {
+                           TimeUnit.SECONDS.sleep(1);
+                        }
+                        message.acknowledge();
+                     } catch (JMSException | InterruptedException ignored) {
+                     }
+                  }
+               });
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
+               BytesMessage message = session.createBytesMessage();
+               message.writeBytes(new byte[1024]);
+               while (!done.get()) {
+                  if (!producerDone.get()) {
+                     message.setLongProperty("PID", producedCount.get() + 1);
+                     messageProducer.send(message);
+                     producedCount.incrementAndGet();
+                  } else {
+                     // just hang about and let the consumer listener work
+                     TimeUnit.SECONDS.sleep(5);
+                  }
+               }
+            } catch (JMSException | InterruptedException ignored) {
+            }
+         }
+      }
+
+      public long getLastProduced() {
+         return producedCount.get();
+      }
+
+      public long getLastConsumed() {
+         return lastConsumed;
+      }
+   }
+
+   MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+
+   // hardwire authenticaton to map USER to EQ_USER etc
+   final ActiveMQSecurityManager5 customSecurityManager = new ActiveMQSecurityManager5() {
+      @Override
+      public Subject authenticate(String user,
+                                  String password,
+                                  RemotingConnection remotingConnection,
+                                  String securityDomain) {
+         Subject subject = null;
+         if (validateUser(user, password)) {
+            subject = new Subject();
+            subject.getPrincipals().add(new UserPrincipal(user));
+            subject.getPrincipals().add(new RolePrincipal("EQ_" + user));
+            if (user.equals("BOTH")) {
+               subject.getPrincipals().add(new RolePrincipal("EQ_PRODUCER"));
+               subject.getPrincipals().add(new RolePrincipal("EQ_CONSUMER"));
+            }
+         }
+         return subject;
+      }
+
+      @Override
+      public boolean authorize(Subject subject, Set<Role> roles, CheckType checkType, String address) {
+         return true;
+      }
+
+      @Override
+      public boolean validateUser(final String username, final String password) {
+         return (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH"));
+      }
+
+      @Override
+      public boolean validateUserAndRole(final String username,
+                                         final String password,
+                                         final Set<Role> requiredRoles,
+                                         final CheckType checkType) {
+         if (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH")) {
+            return true;
+         }
+         return false;
+      }
+   };
+
+   final ObjectNameBuilder node0NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node0", true);
+   final ObjectNameBuilder node1NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node1", true);
+
+
+   /*
+    use case is dispatch from memory, with non-blocking producers
+    producers add to the head of the broker chain, consumers receive from the tail
+    when head == tail we are back to one broker for that address, the end of the chain
+   */
+   private void prepareNodesAndStartCombinedHeadTail() throws Exception {
+      prepareNodesAndStartCombinedHeadTail(1000, 300);
+   }
+
+   private void prepareNodesAndStartCombinedHeadTail(int credit, int creditMin) throws Exception {
+      Assert.assertTrue(credit > 0);
+
+      AddressSettings blockingQueue = new AddressSettings();
+      blockingQueue.setMaxSizeBytes(100 * 1024)
+         .setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL)
+         .setSlowConsumerPolicy(SlowConsumerPolicy.KILL).setSlowConsumerThreshold(0).setSlowConsumerCheckPeriod(TimeUnit.MILLISECONDS.toSeconds(opTimeoutMillis))
+         .setAutoDeleteQueues(false).setAutoDeleteAddresses(false); // so slow consumer can kick in!
+
+      Configuration baseConfig = new ConfigurationImpl();
+      baseConfig.getAddressesSettings().put(qName, blockingQueue);
+
+      BrokerBalancerConfiguration balancerConfiguration = new BrokerBalancerConfiguration();
+      balancerConfiguration.setName(balancerConfigName).setTargetKey(TargetKey.ROLE_NAME).setTargetKeyFilter("(?<=^EQ_).*"); // strip EQ_ prefix
+      baseConfig.addBalancerConfiguration(balancerConfiguration);
+
+      // prepare two nodes
+      for (int nodeId = 0; nodeId < 2; nodeId++) {
+         Configuration configuration = baseConfig.copy();
+         configuration.setName("Node" + nodeId);
+         configuration.setBrokerInstance(new File(getTestDirfile(), configuration.getName()));
+         configuration.addAcceptorConfiguration("tcp", "tcp://localhost:" + (base_port + (nodeId)) + "?redirect-to=" + balancerConfigName + ";amqpCredits=" + credit + ";amqpMinCredits=" + creditMin);
+         nodes.add(new EmbeddedActiveMQ().setConfiguration(configuration));
+         nodes.get(nodeId).setSecurityManager(customSecurityManager);
+         nodes.get(nodeId).setMbeanServer(mBeanServer);
+      }
+
+      // node0 initially handles both producer & consumer (head & tail)
+      nodes.get(0).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER|CONSUMER");
+      nodes.get(0).start();
+   }
+
+   @Test (timeout = 20000)
+   public void testScale0_1() throws Exception {
+
+      prepareNodesAndStartCombinedHeadTail();
+
+      // slow consumer, delay on each message received
+      EQConsumer eqConsumer = new EQConsumer(urlForNodes(nodes));
+      eqConsumer.start();
+
+      // verify consumer reconnects on no messages
+      assertTrue(Wait.waitFor(() -> eqConsumer.connectionCount.get() > 1));
+
+      EQProducer eqProducer = new EQProducer(urlForNodes(nodes));
+      eqProducer.start();
+
+      // verify producer reconnects on fail full!
+      assertTrue(Wait.waitFor(() -> eqProducer.connectionCount.get() > 1));

Review comment:
       Also the test has an overall 20sec timeout, but this single check (and all the other ones) has a 30sec timeout. Seems like the checks should be gated by how long they are reasonably expected to take.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760371018



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked() throws Exception {
+      AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
+         addressControl.block();
+         AmqpSession session = connection.createSession();
+         final AmqpSender sender = session.createSender(getQueueName());
+         assertEquals("Should get 0 credit", 0, sender.getSender().getCredit());

Review comment:
       thanks for that pointer




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] brusdev commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
brusdev commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r759173212



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -695,6 +697,14 @@ public boolean checkMemory(final Runnable runWhenAvailable) {
    @Override
    public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
 
+      if (blocked) {
+         if (runWhenAvailable != null) {
+            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));

Review comment:
       maybe checkReleaseMemory could check if it is `blocked`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761911893



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.security.auth.Subject;
+import java.io.File;
+import java.net.URI;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
+import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ElasticQueueTest extends ActiveMQTestBase {
+
+   static final String qName = "EQ";
+   static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
+
+   final int base_port = 61616;
+   final int opTimeoutMillis = 2000;
+   final Stack<EmbeddedActiveMQ> nodes = new Stack<>();

Review comment:
       sorted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] brusdev commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
brusdev commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r759129124



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
##########
@@ -612,6 +612,45 @@ public void testMinuteSurviving() throws Exception {
       Wait.assertEquals(0, queue::getConsumerCount);
    }
 
+
+   @Test
+   public void testKilledOnNoMessagesSoCanBeRebalanced() throws Exception {
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setSlowConsumerCheckPeriod(2);
+      addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_SECOND);
+      addressSettings.setSlowConsumerThreshold(0); // if there are no messages pending, kill me
+      addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
+
+      server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
+      server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
+
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = addClientSession(sf.createSession(false, true, true, false));
+
+      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+      int messages = 1;
+
+      for (int i = 0; i < messages; i++) {
+         producer.send(session.createMessage(true));
+      }
+      session.commit();
+
+      ConcurrentHashSet<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
+      FixedRateConsumer consumer = new FixedRateConsumer(10, MESSAGES_PER_SECOND, receivedMessages, sf, QUEUE, 0);
+      consumer.start();
+
+      Queue queue = server.locateQueue(QUEUE);
+      Wait.assertEquals(1, queue::getConsumerCount);

Review comment:
       This could be time sensitive, what about replacing FixedRateConsumer with a manual consumer and check consumerCount before receiving the message?

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BrokerBalancerControlTest.java
##########
@@ -141,6 +141,27 @@ public void testGetLocalTarget() throws Exception {
 
       CompositeData connectorData = (CompositeData)targetData.get("connector");
       Assert.assertNull(connectorData);
+
+      assertNull(brokerBalancerControl.getLocalTargetFilter());

Review comment:
       why not creating other tests for those new BrokerBalancerControl methods?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760372412



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BrokerBalancerControlTest.java
##########
@@ -141,6 +141,27 @@ public void testGetLocalTarget() throws Exception {
 
       CompositeData connectorData = (CompositeData)targetData.get("connector");
       Assert.assertNull(connectorData);
+
+      assertNull(brokerBalancerControl.getLocalTargetFilter());

Review comment:
       would be better, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r760371411



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
##########
@@ -201,4 +201,16 @@ public TargetResult getTarget(String key) {
 
       return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
    }
+
+   public void setLocalTargetFilter(String regExp) {
+      if (regExp == null || regExp.isBlank()) {

Review comment:
       no need for that, well spotted!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761909992



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked() throws Exception {
+      AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
+         addressControl.block();
+         AmqpSession session = connection.createSession();
+         final AmqpSender sender = session.createSender(getQueueName());
+         assertEquals("Should get 0 credit", 0, sender.getSender().getCredit());
+
+         addressControl.unBlock();
+         assertTrue("Should now get issued one credit", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return 1 == sender.getSender().getCredit();
+            }
+         }));

Review comment:
       fair, it will speed up the test a little




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761958888



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
##########
@@ -612,6 +612,45 @@ public void testMinuteSurviving() throws Exception {
       Wait.assertEquals(0, queue::getConsumerCount);
    }
 
+
+   @Test
+   public void testKilledOnNoMessagesSoCanBeRebalanced() throws Exception {
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setSlowConsumerCheckPeriod(2);
+      addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_SECOND);
+      addressSettings.setSlowConsumerThreshold(0); // if there are no messages pending, kill me
+      addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
+
+      server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
+      server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
+
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = addClientSession(sf.createSession(false, true, true, false));
+
+      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+      int messages = 1;
+
+      for (int i = 0; i < messages; i++) {
+         producer.send(session.createMessage(true));
+      }
+      session.commit();
+
+      ConcurrentHashSet<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
+      FixedRateConsumer consumer = new FixedRateConsumer(10, MESSAGES_PER_SECOND, receivedMessages, sf, QUEUE, 0);
+      consumer.start();
+
+      Queue queue = server.locateQueue(QUEUE);
+      Wait.assertEquals(1, queue::getConsumerCount);

Review comment:
       I suppose there is no need for a fixed rate consumer tho, it is irrelevant to the test. a simple consumer would suffice




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] brusdev commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
brusdev commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r759091887



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -1153,6 +1161,31 @@ public boolean isFull() {
       return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
    }
 
+
+   @Override
+   public int getAddressLimitPercent() {
+      final long currentUsage = getAddressSize();
+      if (currentUsage != 0) {
+         if (maxSize > 0) {
+            return Math.toIntExact((currentUsage / maxSize) * 100);
+         } else if (pagingManager.isUsingGlobalSize()) {
+            return Math.toIntExact((currentUsage / pagingManager.getGlobalSize()) * 100);

Review comment:
       Same as previous comment dividing currentUsage by pagingManager.getGlobalSize() should always return 0

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -1153,6 +1161,31 @@ public boolean isFull() {
       return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
    }
 
+
+   @Override
+   public int getAddressLimitPercent() {
+      final long currentUsage = getAddressSize();
+      if (currentUsage != 0) {
+         if (maxSize > 0) {
+            return Math.toIntExact((currentUsage / maxSize) * 100);

Review comment:
       Dividing currentUsage by maxSize should always return 0




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761910918



##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.security.auth.Subject;
+import java.io.File;
+import java.net.URI;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
+import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
+import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ElasticQueueTest extends ActiveMQTestBase {
+
+   static final String qName = "EQ";
+   static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
+
+   final int base_port = 61616;
+   final int opTimeoutMillis = 2000;
+   final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
+   private final String balancerConfigName = "role_name_sharder";
+
+   String urlForNodes(Stack<EmbeddedActiveMQ> nodes) {
+      StringBuilder builder = new StringBuilder("failover:(");
+      int port_start = base_port;
+      for (EmbeddedActiveMQ activeMQ : nodes) {
+         if (port_start != base_port) {
+            builder.append(",");
+         }
+         builder.append("amqp://localhost:" + (port_start++));
+      }
+      // fast reconnect, randomize to get to all brokers and timeout sends that block on no credit
+      builder.append(")?failover.maxReconnectDelay=2000&failover.randomize=true&jms.sendTimeout=" + opTimeoutMillis);
+      return builder.toString();
+   }
+
+   // allow tracking of failover reconnects
+   class ConnectionListener implements JmsConnectionListener {
+
+      AtomicInteger connectionCount;
+
+      ConnectionListener(AtomicInteger connectionCount) {
+         this.connectionCount = connectionCount;
+      }
+
+      @Override
+      public void onConnectionEstablished(URI uri) {
+      }
+
+      @Override
+      public void onConnectionFailure(Throwable throwable) {
+      }
+
+      @Override
+      public void onConnectionInterrupted(URI uri) {
+      }
+
+      @Override
+      public void onConnectionRestored(URI uri) {
+         connectionCount.incrementAndGet();
+      }
+
+      @Override
+      public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
+      }
+
+      @Override
+      public void onSessionClosed(Session session, Throwable throwable) {
+      }
+
+      @Override
+      public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) {
+      }
+
+      @Override
+      public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) {
+      }
+   }
+
+   // slow consumer
+   class EQConsumer extends Thread {
+
+      final AtomicInteger consumedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      private final String url;
+      private final int delayMillis;
+      long lastConsumed = 0;
+
+      EQConsumer(String url) {
+         this(url, 1000);
+      }
+
+      EQConsumer(String url, int delay) {
+         this.url = url;
+         this.delayMillis = delay;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory("CONSUMER", "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageConsumer messageConsumer = session.createConsumer(session.createQueue(qName));
+
+               while (!done.get()) {
+                  Message receivedMessage = messageConsumer.receiveNoWait();
+                  if (receivedMessage != null) {
+                     consumedCount.incrementAndGet();
+                     lastConsumed = receivedMessage.getLongProperty("PID");
+                  } else {
+                     TimeUnit.MILLISECONDS.sleep(delayMillis);
+                  }
+               }
+            } catch (JMSException | InterruptedException e) {
+            }
+         }
+      }
+
+      public long getLastConsumed() {
+         return lastConsumed;
+      }
+   }
+
+   // regular producer
+   class EQProducer extends Thread {
+
+      final AtomicInteger producedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      private final String url;
+
+      EQProducer(String url) {
+         this.url = url;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory("PRODUCER", "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
+
+               BytesMessage message = session.createBytesMessage();
+               message.writeBytes(new byte[1024]);
+               while (!done.get()) {
+                  message.setLongProperty("PID", producedCount.get() + 1);
+                  messageProducer.send(message);
+                  producedCount.incrementAndGet();
+               }
+            } catch (ResourceAllocationException expected) {
+            } catch (JMSException e) {
+            }
+         }
+      }
+
+      public long getLastProduced() {
+         return producedCount.get();
+      }
+   }
+
+   // combined producer/ async consumer
+   class EQProducerAsyncConsumer extends Thread {
+
+      final AtomicInteger producedCount = new AtomicInteger();
+      final AtomicInteger connectionCount = new AtomicInteger();
+      final AtomicBoolean done = new AtomicBoolean();
+      final AtomicBoolean producerDone = new AtomicBoolean();
+      private final String url;
+      final AtomicInteger consumedCount = new AtomicInteger();
+      private final String user;
+      private long lastConsumed;
+
+      EQProducerAsyncConsumer(String url, String user) {
+         this.url = url;
+         this.user = user;
+      }
+
+      @Override
+      public void run() {
+
+         while (!done.get()) {
+            JmsConnectionFactory factory = new JmsConnectionFactory(user, "PASSWORD", url);
+
+            try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
+
+               // track disconnects via faiover listener
+               connectionCount.incrementAndGet();
+               connection.addConnectionListener(new ConnectionListener(connectionCount));
+               connection.start();
+
+               Session clientSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+               MessageConsumer messageConsumer = clientSession.createConsumer(clientSession.createQueue(qName));
+               // consume async
+               messageConsumer.setMessageListener(new MessageListener() {
+                  @Override
+                  public void onMessage(Message message) {
+                     consumedCount.incrementAndGet();
+                     try {
+                        lastConsumed = message.getLongProperty("PID");
+                        if (!producerDone.get()) {
+                           TimeUnit.SECONDS.sleep(1);
+                        }
+                        message.acknowledge();
+                     } catch (JMSException | InterruptedException ignored) {
+                     }
+                  }
+               });
+
+               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
+               BytesMessage message = session.createBytesMessage();
+               message.writeBytes(new byte[1024]);
+               while (!done.get()) {
+                  if (!producerDone.get()) {
+                     message.setLongProperty("PID", producedCount.get() + 1);
+                     messageProducer.send(message);
+                     producedCount.incrementAndGet();
+                  } else {
+                     // just hang about and let the consumer listener work
+                     TimeUnit.SECONDS.sleep(5);
+                  }
+               }
+            } catch (JMSException | InterruptedException ignored) {
+            }
+         }
+      }
+
+      public long getLastProduced() {
+         return producedCount.get();
+      }
+
+      public long getLastConsumed() {
+         return lastConsumed;
+      }
+   }
+
+   MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+
+   // hardwire authenticaton to map USER to EQ_USER etc
+   final ActiveMQSecurityManager5 customSecurityManager = new ActiveMQSecurityManager5() {
+      @Override
+      public Subject authenticate(String user,
+                                  String password,
+                                  RemotingConnection remotingConnection,
+                                  String securityDomain) {
+         Subject subject = null;
+         if (validateUser(user, password)) {
+            subject = new Subject();
+            subject.getPrincipals().add(new UserPrincipal(user));
+            subject.getPrincipals().add(new RolePrincipal("EQ_" + user));
+            if (user.equals("BOTH")) {
+               subject.getPrincipals().add(new RolePrincipal("EQ_PRODUCER"));
+               subject.getPrincipals().add(new RolePrincipal("EQ_CONSUMER"));
+            }
+         }
+         return subject;
+      }
+
+      @Override
+      public boolean authorize(Subject subject, Set<Role> roles, CheckType checkType, String address) {
+         return true;
+      }
+
+      @Override
+      public boolean validateUser(final String username, final String password) {
+         return (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH"));
+      }
+
+      @Override
+      public boolean validateUserAndRole(final String username,
+                                         final String password,
+                                         final Set<Role> requiredRoles,
+                                         final CheckType checkType) {
+         if (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH")) {
+            return true;
+         }
+         return false;
+      }
+   };
+
+   final ObjectNameBuilder node0NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node0", true);
+   final ObjectNameBuilder node1NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node1", true);
+
+
+   /*
+    use case is dispatch from memory, with non-blocking producers
+    producers add to the head of the broker chain, consumers receive from the tail
+    when head == tail we are back to one broker for that address, the end of the chain
+   */
+   private void prepareNodesAndStartCombinedHeadTail() throws Exception {
+      prepareNodesAndStartCombinedHeadTail(1000, 300);
+   }
+
+   private void prepareNodesAndStartCombinedHeadTail(int credit, int creditMin) throws Exception {
+      Assert.assertTrue(credit > 0);
+
+      AddressSettings blockingQueue = new AddressSettings();
+      blockingQueue.setMaxSizeBytes(100 * 1024)
+         .setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL)
+         .setSlowConsumerPolicy(SlowConsumerPolicy.KILL).setSlowConsumerThreshold(0).setSlowConsumerCheckPeriod(TimeUnit.MILLISECONDS.toSeconds(opTimeoutMillis))
+         .setAutoDeleteQueues(false).setAutoDeleteAddresses(false); // so slow consumer can kick in!
+
+      Configuration baseConfig = new ConfigurationImpl();
+      baseConfig.getAddressesSettings().put(qName, blockingQueue);
+
+      BrokerBalancerConfiguration balancerConfiguration = new BrokerBalancerConfiguration();
+      balancerConfiguration.setName(balancerConfigName).setTargetKey(TargetKey.ROLE_NAME).setTargetKeyFilter("(?<=^EQ_).*"); // strip EQ_ prefix
+      baseConfig.addBalancerConfiguration(balancerConfiguration);
+
+      // prepare two nodes
+      for (int nodeId = 0; nodeId < 2; nodeId++) {
+         Configuration configuration = baseConfig.copy();
+         configuration.setName("Node" + nodeId);
+         configuration.setBrokerInstance(new File(getTestDirfile(), configuration.getName()));
+         configuration.addAcceptorConfiguration("tcp", "tcp://localhost:" + (base_port + (nodeId)) + "?redirect-to=" + balancerConfigName + ";amqpCredits=" + credit + ";amqpMinCredits=" + creditMin);
+         nodes.add(new EmbeddedActiveMQ().setConfiguration(configuration));
+         nodes.get(nodeId).setSecurityManager(customSecurityManager);
+         nodes.get(nodeId).setMbeanServer(mBeanServer);
+      }
+
+      // node0 initially handles both producer & consumer (head & tail)
+      nodes.get(0).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER|CONSUMER");
+      nodes.get(0).start();
+   }
+
+   @Test (timeout = 20000)
+   public void testScale0_1() throws Exception {
+
+      prepareNodesAndStartCombinedHeadTail();
+
+      // slow consumer, delay on each message received
+      EQConsumer eqConsumer = new EQConsumer(urlForNodes(nodes));
+      eqConsumer.start();
+
+      // verify consumer reconnects on no messages
+      assertTrue(Wait.waitFor(() -> eqConsumer.connectionCount.get() > 1));
+
+      EQProducer eqProducer = new EQProducer(urlForNodes(nodes));
+      eqProducer.start();
+
+      // verify producer reconnects on fail full!
+      assertTrue(Wait.waitFor(() -> eqProducer.connectionCount.get() > 1));
+
+      // operator mode, poll queue control - to allow producer to continue, activate next broker in the 'chain'
+      AddressControl addressControl0 = (AddressControl) ManagementControlHelper.createProxy(node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer);
+
+      AddressControl finalAddressControl = addressControl0;
+      assertTrue(Wait.waitFor(() -> {
+         int usage = finalAddressControl.getAddressLimitPercent();
+         System.out.println("Node0 usage % " + usage);
+         return usage == 100;

Review comment:
       good catch, the test did indeed need some work with the fixed getAddressLimitPercent :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gtully commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gtully commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r761934263



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -1153,6 +1161,31 @@ public boolean isFull() {
       return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
    }
 
+
+   @Override
+   public int getAddressLimitPercent() {
+      final long currentUsage = getAddressSize();
+      if (currentUsage != 0) {
+         if (maxSize > 0) {
+            return Math.toIntExact((currentUsage / maxSize) * 100);
+         } else if (pagingManager.isUsingGlobalSize()) {
+            return Math.toIntExact((currentUsage / pagingManager.getGlobalSize()) * 100);

Review comment:
       thanks, sorted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [activemq-artemis] gemmellr commented on pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

Posted by GitBox <gi...@apache.org>.
gemmellr commented on pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#issuecomment-988973390


   Not from me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org