You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by "gemmellr (via GitHub)" <gi...@apache.org> on 2023/01/30 17:50:09 UTC

[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4349: ARTEMIS-3178 Page Limitting (max messages and max bytes)

gemmellr commented on code in PR #4349:
URL: https://github.com/apache/activemq-artemis/pull/4349#discussion_r1090865601


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java:
##########
@@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
 
    AddressFullMessagePolicy getAddressFullMessagePolicy();
 
+   PageFullMessagePolicy getPageFullMessagePolicy();
+
+   Long getPageLimitMessages();

Review Comment:
   Presumably this is being null checked somewhere given the Long usage. Since the config setting is defined as having default -1, might it be nicer to just use that for the checks? Since a non-null value would also have to be checked for being >0 anyway, in case someone excplicitly sets it to the -1 default?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -241,6 +249,23 @@ public void resumeCleanup() {
       scheduleCleanup();
    }
 
+   private long getNumberOfMessagesOnSubscriptions() {
+      AtomicLong largerCounter = new AtomicLong();
+      activeCursors.forEach((id, sub) -> {
+         long value = sub.getCounter().getValue();
+         if (value > largerCounter.get()) {
+            largerCounter.set(value);
+         }
+      });
+
+      return largerCounter.get();
+   }
+
+   void checkClearPageLimit() {
+

Review Comment:
   Superfluous newline



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", storeName, estimatedMaxPages);
+      }
    }
 
    @Override
    public String toString() {
       return "PagingStoreImpl(" + this.address + ")";
    }
 
+   @Override
+   public PageFullMessagePolicy getPageFullMessagePolicy() {
+      return pageFullMessagePolicy;
+   }
+
+   @Override
+   public Long getPageLimitMessages() {
+      return pageLimitMessages;
+   }
+
+   @Override
+   public Long getPageLimitBytes() {
+      return pageLimitBytes;
+   }
+
+   @Override
+   public void pageFull(PageSubscription subscription) {
+      this.pageFull = true;
+      try {
+         ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), subscription.getQueue().getAddress(), pageLimitMessages, subscription.getCounter().getValue());
+      } catch (Throwable e) {
+         // I don't think subscription would ever have a null queue. I'm being cautious here for tests
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public boolean isPageFull() {
+      return pageFull;
+   }
+
+   private boolean isBellowPageLimitBytes() {
+      if (estimatedMaxPages != null) {
+         return (numberOfPages <= estimatedMaxPages.longValue());
+      }  else {
+         return true;
+      }
+   }
+
+   private void checkNumberOfPages() {
+      if (!isBellowPageLimitBytes()) {
+         this.pageFull = true;
+         ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName, numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+      }
+   }
+
+   @Override
+   public void checkPageLimit(long numberOfMessages) {
+
+

Review Comment:
   Superfluous newlines



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", storeName, estimatedMaxPages);
+      }
    }
 
    @Override
    public String toString() {
       return "PagingStoreImpl(" + this.address + ")";
    }
 
+   @Override
+   public PageFullMessagePolicy getPageFullMessagePolicy() {
+      return pageFullMessagePolicy;
+   }
+
+   @Override
+   public Long getPageLimitMessages() {
+      return pageLimitMessages;
+   }
+
+   @Override
+   public Long getPageLimitBytes() {
+      return pageLimitBytes;
+   }
+
+   @Override
+   public void pageFull(PageSubscription subscription) {
+      this.pageFull = true;
+      try {
+         ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), subscription.getQueue().getAddress(), pageLimitMessages, subscription.getCounter().getValue());
+      } catch (Throwable e) {
+         // I don't think subscription would ever have a null queue. I'm being cautious here for tests
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public boolean isPageFull() {
+      return pageFull;
+   }
+
+   private boolean isBellowPageLimitBytes() {

Review Comment:
   typo, Below



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -1029,6 +1131,25 @@ public boolean page(Message message,
          lock.readLock().unlock();
       }
 
+      if (pageFull && pageFullMessagePolicy != null) {

Review Comment:
   Related to earlier comment, if it validates you have a policy, can this then just simply check "if (pageFull)" ?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();

Review Comment:
   Should it validate you have actually set a policy if setting one or both of the others to enforcing values? That way you cant accidentally think you have a limit when you dont.



##########
artemis-server/src/main/resources/schema/artemis-configuration.xsd:
##########
@@ -4062,6 +4079,20 @@
                </xsd:simpleType>
             </xsd:element>
 
+            <xsd:element name="page-full-policy" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     After entering page mode, a second limit will be set by page-limit-bytes, page-limit-messages. page-full-policy will configure what to do when that limit is reach.

Review Comment:
   page-limit-bytes and/or page-limit-messages. The page-full-policy...
   
   ...when that limit is reach**ed.**



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java:
##########
@@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
 
    AddressFullMessagePolicy getAddressFullMessagePolicy();
 
+   PageFullMessagePolicy getPageFullMessagePolicy();
+
+   Long getPageLimitMessages();
+
+   Long getPageLimitBytes();

Review Comment:
   ditto



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", storeName, estimatedMaxPages);
+      }

Review Comment:
   What if someone explicitly set pageLimitBytes to -1, which is noted as the default and allowed by the validators. This would still look to calculate a [incorrect] value for the max?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", storeName, estimatedMaxPages);
+      }
    }
 
    @Override
    public String toString() {
       return "PagingStoreImpl(" + this.address + ")";
    }
 
+   @Override
+   public PageFullMessagePolicy getPageFullMessagePolicy() {
+      return pageFullMessagePolicy;
+   }
+
+   @Override
+   public Long getPageLimitMessages() {
+      return pageLimitMessages;
+   }
+
+   @Override
+   public Long getPageLimitBytes() {
+      return pageLimitBytes;
+   }
+
+   @Override
+   public void pageFull(PageSubscription subscription) {
+      this.pageFull = true;
+      try {
+         ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), subscription.getQueue().getAddress(), pageLimitMessages, subscription.getCounter().getValue());
+      } catch (Throwable e) {
+         // I don't think subscription would ever have a null queue. I'm being cautious here for tests
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public boolean isPageFull() {
+      return pageFull;
+   }
+
+   private boolean isBellowPageLimitBytes() {
+      if (estimatedMaxPages != null) {
+         return (numberOfPages <= estimatedMaxPages.longValue());
+      }  else {
+         return true;
+      }
+   }
+
+   private void checkNumberOfPages() {
+      if (!isBellowPageLimitBytes()) {
+         this.pageFull = true;
+         ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName, numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+      }
+   }
+
+   @Override
+   public void checkPageLimit(long numberOfMessages) {
+
+
+      boolean pageMessageMessagesClear = true;
+      Long pageLimitMessages = getPageLimitMessages();
+
+      if (pageLimitMessages != null) {
+         if (logger.isDebugEnabled()) { // gate to avoid boxing of msgCount
+            logger.debug("Address {} has {} messages on the larger queue", storeName, numberOfMessages);
+         }
+
+         pageMessageMessagesClear = (numberOfMessages < pageLimitMessages.longValue());
+      }
+
+      boolean pageMessageBytesClear = isBellowPageLimitBytes();
+
+

Review Comment:
   Superfluous newline



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -225,13 +237,100 @@ public void applySetting(final AddressSettings addressSettings) {
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
       rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
+      pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+
+      pageLimitBytes = addressSettings.getPageLimitBytes();
+
+      pageLimitMessages = addressSettings.getPageLimitMessages();
+
+      if (pageLimitBytes != null && pageSize > 0) {
+         estimatedMaxPages = pageLimitBytes / pageSize;
+         logger.debug("Address {} should not allow more than {} pages", storeName, estimatedMaxPages);
+      }
    }
 
    @Override
    public String toString() {
       return "PagingStoreImpl(" + this.address + ")";
    }
 
+   @Override
+   public PageFullMessagePolicy getPageFullMessagePolicy() {
+      return pageFullMessagePolicy;
+   }
+
+   @Override
+   public Long getPageLimitMessages() {
+      return pageLimitMessages;
+   }
+
+   @Override
+   public Long getPageLimitBytes() {
+      return pageLimitBytes;
+   }
+
+   @Override
+   public void pageFull(PageSubscription subscription) {
+      this.pageFull = true;
+      try {
+         ActiveMQServerLogger.LOGGER.pageFull(subscription.getQueue().getName(), subscription.getQueue().getAddress(), pageLimitMessages, subscription.getCounter().getValue());
+      } catch (Throwable e) {
+         // I don't think subscription would ever have a null queue. I'm being cautious here for tests
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public boolean isPageFull() {
+      return pageFull;
+   }
+
+   private boolean isBellowPageLimitBytes() {
+      if (estimatedMaxPages != null) {
+         return (numberOfPages <= estimatedMaxPages.longValue());
+      }  else {
+         return true;
+      }
+   }
+
+   private void checkNumberOfPages() {
+      if (!isBellowPageLimitBytes()) {
+         this.pageFull = true;
+         ActiveMQServerLogger.LOGGER.pageFullMaxBytes(storeName, numberOfPages, estimatedMaxPages, pageLimitBytes, pageSize);
+      }
+   }
+
+   @Override
+   public void checkPageLimit(long numberOfMessages) {
+
+
+      boolean pageMessageMessagesClear = true;
+      Long pageLimitMessages = getPageLimitMessages();
+
+      if (pageLimitMessages != null) {
+         if (logger.isDebugEnabled()) { // gate to avoid boxing of msgCount

Review Comment:
   msgCount should be numberOfMessages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org