You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/11/29 14:12:57 UTC

[GitHub] [activemq-artemis] gemmellr commented on a change in pull request #3863: elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

gemmellr commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r758286287



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -695,6 +697,14 @@ public boolean checkMemory(final Runnable runWhenAvailable) {
    @Override
    public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
 
+      if (blocked) {
+         if (runWhenAvailable != null) {
+            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+         }
+         ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());

Review comment:
       This could be spammy, logging for each call. The existing usage of this logger call is guarded by the 'blocking' boolean to have it act as more a log that the state has toggled. The equivalent of that would seem to be doing this logger call in the blocked() method (and checking the boolean) rather than here.
   
   Perhaps it should have its own logger call given that its matching 'unblock' is really separate from the existing 'blocking' mechanism.

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -1153,6 +1161,31 @@ public boolean isFull() {
       return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
    }
 
+
+   @Override
+   public int getAddressLimitPercent() {
+      final long currentUsage = getAddressSize();
+      if (currentUsage != 0) {
+         if (maxSize > 0) {
+            return Math.toIntExact((currentUsage / maxSize) * 100);
+         } else if (pagingManager.isUsingGlobalSize()) {
+            return Math.toIntExact((currentUsage / pagingManager.getGlobalSize()) * 100);
+         }
+      }
+      return 0;
+   }
+
+   @Override
+   public void block() {
+      blocked = true;
+   }
+
+   @Override
+   public void unBlock() {

Review comment:
       No capital needed in unblock.

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked() throws Exception {

Review comment:
       ```suggestion
      public void testCreditIsNotGivenOnLinkCreationWhileBlockedAndIsGivenOnceThenUnblocked() throws Exception {
   ```

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -130,6 +162,51 @@ public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
       assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
    }
 
+   @Test(timeout = 10000)
+   public void testSendHangsWhenBlockedAndNotAfterUnBlocked() throws Exception {
+      Connection connection = createConnection(new URI(singleCreditAcceptorURI.replace("tcp", "amqp")), null, null, null, true);
+      final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Destination d = session.createQueue(getQueueName());
+      final MessageProducer p = session.createProducer(d);
+
+      final CountDownLatch running = new CountDownLatch(1);
+      final CountDownLatch done = new CountDownLatch(1);
+
+      AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
+
+      assertTrue("blocked ok", addressControl.block());
+
+      // one credit
+      p.send(session.createBytesMessage());
+
+      // this send will block, no credit
+      new Thread(new Runnable() {
+         @Override
+         public void run() {
+            try {
+               running.countDown();
+               p.send(session.createBytesMessage());
+            } catch (JMSException ignored) {
+            } finally {
+               done.countDown();
+            }
+         }
+      }).start();
+
+      assertTrue(running.await(5, TimeUnit.SECONDS));
+
+      assertFalse(done.await(1, TimeUnit.SECONDS));

Review comment:
       I dont think it needs as long as this. Especially if you verify there is no credit left. The message will never actually even leave the client until some arrives, waiting an entire second after knowing the thread started and is about to call send next doesn't seem necessary in the typical case.

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -126,6 +126,8 @@
 
    private volatile boolean blocking = false;
 
+   private volatile boolean blocked = false;

Review comment:
       I would give this (and possibly also the existing 'blocking' variable) a different name so its more distinguished, e.g manuallyBlocked. Its not at all clear with blocked+blocking that they are really quite unrelated, and that setting the store 'blocked' does not mean it is 'blocking'. Which leads to the next comment.

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -695,6 +697,14 @@ public boolean checkMemory(final Runnable runWhenAvailable) {
    @Override
    public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
 
+      if (blocked) {
+         if (runWhenAvailable != null) {
+            onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));

Review comment:
       This seems questionable, the onMemoryFreedRunnables queue is already used by the existing disk usage based 'blocking' mechanism. Using it for this new but separate 'blocked' mechanism as well means there are likely states that the other [un]'blocking' mechanism could actually run these actions even though the store was and is still 'blocked'.
   
   (Perhaps also vice-versa when the unBlock() is called)

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked() throws Exception {
+      AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
+         addressControl.block();
+         AmqpSession session = connection.createSession();
+         final AmqpSender sender = session.createSender(getQueueName());
+         assertEquals("Should get 0 credit", 0, sender.getSender().getCredit());
+
+         addressControl.unBlock();
+         assertTrue("Should now get issued one credit", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return 1 == sender.getSender().getCredit();
+            }
+         }));

Review comment:
       The default check interval is 100ms, I would set a smaller one for this (and smaller limit, default 30sec), as its likely not to happen on the first check, but shouldnt take anything like that long to then arrive.

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -130,6 +162,51 @@ public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
       assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
    }
 
+   @Test(timeout = 10000)
+   public void testSendHangsWhenBlockedAndNotAfterUnBlocked() throws Exception {

Review comment:
       ```suggestion
      public void testSendBlocksWhenAddressBlockedAndCompletesAfterUnblocked() throws Exception {
   ```

##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
##########
@@ -201,4 +201,16 @@ public TargetResult getTarget(String key) {
 
       return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
    }
+
+   public void setLocalTargetFilter(String regExp) {
+      if (regExp == null || regExp.isBlank()) {

Review comment:
       Advanced Java 11 features! :D ;)

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated() throws Exception {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked() throws Exception {
+      AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
+      AmqpConnection connection = addConnection(client.connect());
+
+      try {
+         AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
+         addressControl.block();
+         AmqpSession session = connection.createSession();
+         final AmqpSender sender = session.createSender(getQueueName());
+         assertEquals("Should get 0 credit", 0, sender.getSender().getCredit());

Review comment:
       This test is susceptible to the inherent race of checking credit, as credit is granted separately and so may not have arrived before the create method returned and you check it. Should probably use a very small wait (e.g 5-10ms) here to help give more confidence in this state and a small window it could fail if some should arrive unexpectedly before checking it, and before/regardless of the unblock call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org