You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/04/05 14:41:25 UTC

[activemq-artemis] branch master updated (7e6c9eb -> 7868959)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from 7e6c9eb  This closes #3524
     new 17dd86f  ARTEMIS-3219 Improve message routing
     new 08ec7c6  ARTEMIS-3219 Improve FQQN message routing
     new 7868959  This closes #3526

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../artemis/core/postoffice/impl/BindingsImpl.java | 383 +++++++----------
 .../core/postoffice/impl/CopyOnWriteBindings.java  | 270 ++++++++++++
 .../core/postoffice/impl/PostOfficeImpl.java       | 458 +++++++++++----------
 .../artemis/core/server/RoutingContext.java        |   2 +-
 .../core/server/impl/RoutingContextImpl.java       |   4 +-
 5 files changed, 656 insertions(+), 461 deletions(-)
 create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java

[activemq-artemis] 03/03: This closes #3526

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7868959b52d8ec5d3374622d97d377d172373c33
Merge: 7e6c9eb 08ec7c6
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Apr 5 10:01:00 2021 -0400

    This closes #3526

 .../artemis/core/postoffice/impl/BindingsImpl.java | 383 +++++++----------
 .../core/postoffice/impl/CopyOnWriteBindings.java  | 270 ++++++++++++
 .../core/postoffice/impl/PostOfficeImpl.java       | 458 +++++++++++----------
 .../artemis/core/server/RoutingContext.java        |   2 +-
 .../core/server/impl/RoutingContextImpl.java       |   4 +-
 5 files changed, 656 insertions(+), 461 deletions(-)

[activemq-artemis] 01/03: ARTEMIS-3219 Improve message routing

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 17dd86ff4b72eb81c606cf97439808028c2c7e94
Author: franz1981 <ni...@gmail.com>
AuthorDate: Thu Apr 1 17:39:56 2021 +0200

    ARTEMIS-3219 Improve message routing
---
 .../core/postoffice/impl/PostOfficeImpl.java       | 458 +++++++++++----------
 .../artemis/core/server/RoutingContext.java        |   2 +-
 .../core/server/impl/RoutingContextImpl.java       |   4 +-
 3 files changed, 242 insertions(+), 222 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index b027c27..365ce01 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.postoffice.impl;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -32,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@@ -1079,34 +1079,46 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
     * if a DLA still not found, it should then use previous semantics.
     * */
    private RoutingStatus route(final Message message,
-                              final RoutingContext context,
-                              final boolean direct,
-                              boolean rejectDuplicates,
-                              final Binding bindingMove, boolean sendToDLA) throws Exception {
+                               final RoutingContext context,
+                               final boolean direct,
+                               final boolean rejectDuplicates,
+                               final Binding bindingMove,
+                               final boolean sendToDLA) throws Exception {
 
-
-      RoutingStatus result;
       // Sanity check
       if (message.getRefCount() > 0) {
          throw new IllegalStateException("Message cannot be routed more than once");
       }
 
       final SimpleString address = context.getAddress(message);
+      final AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
+      if (settings != null) {
+         applyExpiryDelay(message, settings);
+      }
 
-      AtomicBoolean startedTX = new AtomicBoolean(false);
-
-      applyExpiryDelay(message, address);
+      final boolean startedTX;
+      if (context.isDuplicateDetection()) {
+         final DuplicateCheckResult duplicateCheckResult = checkDuplicateID(message, context, rejectDuplicates);
+         switch (duplicateCheckResult) {
 
-      if (context.isDuplicateDetection() && !checkDuplicateID(message, context, rejectDuplicates, startedTX)) {
-         return RoutingStatus.DUPLICATED_ID;
+            case DuplicateNotStartedTX:
+               return RoutingStatus.DUPLICATED_ID;
+            case NoDuplicateStartedTX:
+               startedTX = true;
+               break;
+            case NoDuplicateNotStartedTX:
+               startedTX = false;
+               //nop
+               break;
+            default:
+               throw new IllegalStateException("Unexpected value: " + duplicateCheckResult);
+         }
+      } else {
+         startedTX = false;
       }
-
       message.clearInternalProperties();
-
-      Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
-
-      AddressInfo addressInfo = addressManager.getAddressInfo(address);
-
+      Bindings bindings;
+      final AddressInfo addressInfo = addressManager.getAddressInfo(address);
       if (bindingMove != null) {
          context.clear();
          context.setReusable(false);
@@ -1114,7 +1126,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          if (addressInfo != null) {
             addressInfo.incrementRoutedMessageCount();
          }
-      } else if (bindings != null) {
+      } else if ((bindings = addressManager.getBindingsForRoutingAddress(address)) != null) {
          bindings.route(message, context);
          if (addressInfo != null) {
             addressInfo.incrementRoutedMessageCount();
@@ -1126,7 +1138,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          }
          // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
          if (logger.isDebugEnabled()) {
-            logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message);
+            logger.debugf("Couldn't find any bindings for address=%s on message=%s", message, address, message);
          }
       }
 
@@ -1135,62 +1147,19 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
 
       if (logger.isTraceEnabled()) {
-         logger.trace("Message after routed=" + message + "\n" + context.toString());
+         logger.tracef("Message after routed=%s\n%s", message, context);
       }
 
       try {
+         final RoutingStatus status;
          if (context.getQueueCount() == 0) {
-            // Send to DLA if appropriate
-
-            AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
-
-
-            if (sendToDLA) {
-               // it's already been through here once, giving up now
-               sendToDLA = false;
-            } else {
-               sendToDLA = addressSettings != null ? addressSettings.isSendToDLAOnNoRoute() : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE;
-            }
-
-            if (sendToDLA) {
-               // Send to the DLA for the address
-
-               SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null;
-
-               if (logger.isDebugEnabled()) {
-                  logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
-               }
-
-               if (dlaAddress == null) {
-                  result = RoutingStatus.NO_BINDINGS;
-                  ActiveMQServerLogger.LOGGER.noDLA(address);
-               } else {
-                  message.referenceOriginalMessage(message, null);
-
-                  message.setAddress(dlaAddress);
-
-                  message.reencode();
-
-                  route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, sendToDLA);
-                  result = RoutingStatus.NO_BINDINGS_DLA;
-               }
-            } else {
-               result = RoutingStatus.NO_BINDINGS;
-
-               if (logger.isDebugEnabled()) {
-                  logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
-               }
-
-               if (message.isLargeMessage()) {
-                  ((LargeServerMessage) message).deleteFile();
-               }
-            }
+            status = maybeSendToDLA(message, context, address, sendToDLA);
          } else {
-            result = RoutingStatus.OK;
+            status = RoutingStatus.OK;
             try {
                processRoute(message, context, direct);
             } catch (ActiveMQAddressFullException e) {
-               if (startedTX.get()) {
+               if (startedTX) {
                   context.getTransaction().rollback();
                } else if (context.getTransaction() != null) {
                   context.getTransaction().markAsRollbackOnly(e);
@@ -1198,45 +1167,83 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                throw e;
             }
          }
-
-         if (startedTX.get()) {
+         if (startedTX) {
             context.getTransaction().commit();
          }
+         if (server.hasBrokerMessagePlugins()) {
+            server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, status));
+         }
+         return status;
       } catch (Exception e) {
          if (server.hasBrokerMessagePlugins()) {
             server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
          }
          throw e;
       }
+   }
 
-      if (server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result));
+   private RoutingStatus maybeSendToDLA(final Message message,
+                                        final RoutingContext context,
+                                        final SimpleString address,
+                                        final boolean sendToDLAHint) throws Exception {
+      final RoutingStatus status;
+      final AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
+      final boolean sendToDLA;
+      if (sendToDLAHint) {
+         // it's already been through here once, giving up now
+         sendToDLA = false;
+      } else {
+         sendToDLA = addressSettings != null ? addressSettings.isSendToDLAOnNoRoute() : AddressSettings.DEFAULT_SEND_TO_DLA_ON_NO_ROUTE;
       }
+      if (sendToDLA) {
+         // Send to the DLA for the address
+         final SimpleString dlaAddress = addressSettings != null ? addressSettings.getDeadLetterAddress() : null;
+         if (logger.isDebugEnabled()) {
+            logger.debugf("sending message to dla address = %s, message=%s", dlaAddress, message);
+         }
+         if (dlaAddress == null) {
+            status = RoutingStatus.NO_BINDINGS;
+            ActiveMQServerLogger.LOGGER.noDLA(address);
+         } else {
+            message.referenceOriginalMessage(message, null);
+
+            message.setAddress(dlaAddress);
+
+            message.reencode();
 
-      return result;
+            route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true);
+            status = RoutingStatus.NO_BINDINGS_DLA;
+         }
+      } else {
+         status = RoutingStatus.NO_BINDINGS;
+         if (logger.isDebugEnabled()) {
+            logger.debugf("Message %s is not going anywhere as it didn't have a binding on address:%s", message, address);
+         }
+         if (message.isLargeMessage()) {
+            ((LargeServerMessage) message).deleteFile();
+         }
+      }
+      return status;
    }
 
    // HORNETQ-1029
-   private void applyExpiryDelay(Message message, SimpleString address) {
-      AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
-      if (settings != null) {
-         long expirationOverride = settings.getExpiryDelay();
-
-         // A -1 <expiry-delay> means don't do anything
-         if (expirationOverride >= 0) {
-            // only override the expiration on messages where the expiration hasn't been set by the user
-            if (message.getExpiration() == 0) {
-               message.setExpiration(System.currentTimeMillis() + expirationOverride);
-            }
-         } else {
-            long minExpiration = settings.getMinExpiryDelay();
-            long maxExpiration = settings.getMaxExpiryDelay();
+   private static void applyExpiryDelay(Message message, AddressSettings settings) {
+      long expirationOverride = settings.getExpiryDelay();
+
+      // A -1 <expiry-delay> means don't do anything
+      if (expirationOverride >= 0) {
+         // only override the expiration on messages where the expiration hasn't been set by the user
+         if (message.getExpiration() == 0) {
+            message.setExpiration(System.currentTimeMillis() + expirationOverride);
+         }
+      } else {
+         long minExpiration = settings.getMinExpiryDelay();
+         long maxExpiration = settings.getMaxExpiryDelay();
 
-            if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) {
-               message.setExpiration(System.currentTimeMillis() + maxExpiration);
-            } else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) {
-               message.setExpiration(System.currentTimeMillis() + minExpiration);
-            }
+         if (maxExpiration != AddressSettings.DEFAULT_MAX_EXPIRY_DELAY && (message.getExpiration() == 0 || message.getExpiration() > (System.currentTimeMillis() + maxExpiration))) {
+            message.setExpiration(System.currentTimeMillis() + maxExpiration);
+         } else if (minExpiration != AddressSettings.DEFAULT_MIN_EXPIRY_DELAY && message.getExpiration() < (System.currentTimeMillis() + minExpiration)) {
+            message.setExpiration(System.currentTimeMillis() + minExpiration);
          }
       }
    }
@@ -1489,20 +1496,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    public void processRoute(final Message message,
                             final RoutingContext context,
                             final boolean direct) throws Exception {
-      final List<MessageReference> refs = new ArrayList<>();
+      final ArrayList<MessageReference> refs = new ArrayList<>();
 
-      Transaction tx = context.getTransaction();
+      final Transaction tx = context.getTransaction();
 
-      Long deliveryTime = null;
+      final Long deliveryTime;
       if (message.hasScheduledDeliveryTime()) {
          deliveryTime = message.getScheduledDeliveryTime();
+      } else {
+         deliveryTime = null;
       }
-
-      PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString());
+      final SimpleString messageAddress = message.getAddressSimpleString();
+      final PagingStore owningStore = pagingManager.getPageStore(messageAddress);
       message.setOwner(owningStore);
       for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
-         PagingStore store;
-         if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) {
+         final PagingStore store;
+         if (entry.getKey().equals(messageAddress)) {
             store = owningStore;
          } else {
             store = pagingManager.getPageStore(entry.getKey());
@@ -1518,68 +1527,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             continue;
          }
 
-         for (Queue queue : entry.getValue().getNonDurableQueues()) {
-            MessageReference reference = MessageReference.Factory.createReference(message, queue);
-
-            if (deliveryTime != null) {
-               reference.setScheduledDeliveryTime(deliveryTime);
-            }
-            refs.add(reference);
-
-            queue.refUp(reference);
-         }
-
-         Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
-
-         while (iter.hasNext()) {
-            Queue queue = iter.next();
-
-            MessageReference reference = MessageReference.Factory.createReference(message, queue);
-
-            if (context.isAlreadyAcked(context.getAddress(message), queue)) {
-               reference.setAlreadyAcked();
-               if (tx != null) {
-                  queue.acknowledge(tx, reference);
-               }
-            }
-
-            if (deliveryTime != null) {
-               reference.setScheduledDeliveryTime(deliveryTime);
-            }
-            refs.add(reference);
-            queue.refUp(reference);
-
-            if (message.isDurable()) {
-               int durableRefCount = queue.durableUp(message);
-
-               if (durableRefCount == 1) {
-                  if (tx != null) {
-                     storageManager.storeMessageTransactional(tx.getID(), message);
-                  } else {
-                     storageManager.storeMessage(message);
-                  }
-
-                  if (message.isLargeMessage()) {
-                     confirmLargeMessageSend(tx, message);
-                  }
-               }
-
-               if (tx != null) {
-                  storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
-
-                  tx.setContainsPersistent();
-               } else {
-                  storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
+         final List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
+         if (!nonDurableQueues.isEmpty()) {
+            refs.ensureCapacity(nonDurableQueues.size());
+            nonDurableQueues.forEach(queue -> {
+               final MessageReference reference = MessageReference.Factory.createReference(message, queue);
+               if (deliveryTime != null) {
+                  reference.setScheduledDeliveryTime(deliveryTime);
                }
+               refs.add(reference);
+               queue.refUp(reference);
+            });
+         }
 
-               if (deliveryTime != null && deliveryTime > 0) {
-                  if (tx != null) {
-                     storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
-                  } else {
-                     storageManager.updateScheduledDeliveryTime(reference);
-                  }
-               }
-            }
+         final List<Queue> durableQueues = entry.getValue().getDurableQueues();
+         if (!durableQueues.isEmpty()) {
+            processRouteToDurableQueues(message, context, deliveryTime, tx, durableQueues, refs);
          }
       }
 
@@ -1608,6 +1571,59 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
    }
 
+   private void processRouteToDurableQueues(final Message message,
+                                            final RoutingContext context,
+                                            final Long deliveryTime,
+                                            final Transaction tx,
+                                            final List<Queue> durableQueues,
+                                            final ArrayList<MessageReference> refs) throws Exception {
+      final int durableQueuesCount = durableQueues.size();
+      refs.ensureCapacity(durableQueuesCount);
+      final Iterator<Queue> iter = durableQueues.iterator();
+      for (int i = 0; i < durableQueuesCount; i++) {
+         final Queue queue = iter.next();
+         final MessageReference reference = MessageReference.Factory.createReference(message, queue);
+         if (context.isAlreadyAcked(message, queue)) {
+            reference.setAlreadyAcked();
+            if (tx != null) {
+               queue.acknowledge(tx, reference);
+            }
+         }
+         if (deliveryTime != null) {
+            reference.setScheduledDeliveryTime(deliveryTime);
+         }
+         refs.add(reference);
+         queue.refUp(reference);
+         if (message.isDurable()) {
+            final int durableRefCount = queue.durableUp(message);
+            if (durableRefCount == 1) {
+               if (tx != null) {
+                  storageManager.storeMessageTransactional(tx.getID(), message);
+               } else {
+                  storageManager.storeMessage(message);
+               }
+               if (message.isLargeMessage()) {
+                  confirmLargeMessageSend(tx, message);
+               }
+            }
+            if (tx != null) {
+               storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
+               tx.setContainsPersistent();
+            } else {
+               final boolean last = i == (durableQueuesCount - 1);
+               storageManager.storeReference(queue.getID(), message.getMessageID(), last);
+            }
+            if (deliveryTime != null && deliveryTime > 0) {
+               if (tx != null) {
+                  storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
+               } else {
+                  storageManager.updateScheduledDeliveryTime(reference);
+               }
+            }
+         }
+      }
+   }
+
    /**
     * @param tx
     * @param message
@@ -1671,72 +1687,76 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
    }
 
-   private boolean checkDuplicateID(final Message message,
-                                    final RoutingContext context,
-                                    boolean rejectDuplicates,
-                                    AtomicBoolean startedTX) throws Exception {
-      // Check the DuplicateCache for the Bridge first
+   private enum DuplicateCheckResult {
+      DuplicateNotStartedTX, NoDuplicateStartedTX, NoDuplicateNotStartedTX
+   }
 
-      Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
+   private DuplicateCheckResult checkDuplicateID(final Message message,
+                                                 final RoutingContext context,
+                                                 final boolean rejectDuplicates) throws Exception {
+      // Check the DuplicateCache for the Bridge first
+      final Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
       if (bridgeDup != null) {
-         // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
-         byte[] bridgeDupBytes = (byte[]) bridgeDup;
-
-         DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));
-
-         if (context.getTransaction() == null) {
-            context.setTransaction(new TransactionImpl(storageManager));
-            startedTX.set(true);
-         }
-
-         if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) {
-            context.getTransaction().rollback();
-            startedTX.set(false);
-            message.usageDown(); // this will cause large message delete
-            return false;
+         return checkBridgeDuplicateID(message, context, (byte[]) bridgeDup);
+      }
+      // if used BridgeDuplicate, it's not going to use the regular duplicate
+      // since this will would break redistribution (re-setting the duplicateId)
+      final byte[] duplicateIDBytes = message.getDuplicateIDBytes();
+      if (duplicateIDBytes == null) {
+         return DuplicateCheckResult.NoDuplicateNotStartedTX;
+      }
+      return checkNotBridgeDuplicateID(message, context, rejectDuplicates, duplicateIDBytes);
+   }
+
+   private DuplicateCheckResult checkNotBridgeDuplicateID(final Message message,
+                                                          final RoutingContext context,
+                                                          final boolean rejectDuplicates,
+                                                          final byte[] duplicateIDBytes) throws Exception {
+      assert duplicateIDBytes != null && Arrays.equals(message.getDuplicateIDBytes(), duplicateIDBytes);
+      final DuplicateIDCache cache = getDuplicateIDCache(context.getAddress(message));
+      final boolean isDuplicate = cache.contains(duplicateIDBytes);
+      if (rejectDuplicates && isDuplicate) {
+         ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message);
+         if (context.getTransaction() != null) {
+            final String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message;
+            context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage));
          }
+         message.usageDown(); // this will cause large message delete
+         return DuplicateCheckResult.DuplicateNotStartedTX;
+      }
+      if (isDuplicate) {
+         assert !rejectDuplicates;
+         return DuplicateCheckResult.NoDuplicateNotStartedTX;
+      }
+      final boolean startedTX;
+      if (context.getTransaction() == null) {
+         // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
+         context.setTransaction(new TransactionImpl(storageManager));
+         startedTX = true;
       } else {
-         // if used BridgeDuplicate, it's not going to use the regular duplicate
-         // since this will would break redistribution (re-setting the duplicateId)
-         byte[] duplicateIDBytes = message.getDuplicateIDBytes();
-
-         DuplicateIDCache cache = null;
-
-         boolean isDuplicate = false;
-
-         if (duplicateIDBytes != null) {
-            cache = getDuplicateIDCache(context.getAddress(message));
-
-            isDuplicate = cache.contains(duplicateIDBytes);
-
-            if (rejectDuplicates && isDuplicate) {
-               ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message);
-
-               String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message.toString();
-
-               if (context.getTransaction() != null) {
-                  context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage));
-               }
-
-               message.usageDown(); // this will cause large message delete
-
-               return false;
-            }
-         }
-
-         if (cache != null && !isDuplicate) {
-            if (context.getTransaction() == null) {
-               // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
-               context.setTransaction(new TransactionImpl(storageManager));
-
-               startedTX.set(true);
-            }
-
-            cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get());
-         }
+         startedTX = false;
       }
+      cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX);
+      return startedTX ? DuplicateCheckResult.NoDuplicateStartedTX : DuplicateCheckResult.NoDuplicateNotStartedTX;
+   }
 
-      return true;
+   private DuplicateCheckResult checkBridgeDuplicateID(final Message message,
+                                                       final RoutingContext context,
+                                                       final byte[] bridgeDupBytes) throws Exception {
+      assert bridgeDupBytes != null;
+      boolean startedTX = false;
+      if (context.getTransaction() == null) {
+         context.setTransaction(new TransactionImpl(storageManager));
+         startedTX = true;
+      }
+      // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
+      final DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));
+      if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) {
+         context.getTransaction().rollback();
+         message.usageDown(); // this will cause large message delete
+         return DuplicateCheckResult.DuplicateNotStartedTX;
+      }
+      return startedTX ? DuplicateCheckResult.NoDuplicateStartedTX : DuplicateCheckResult.NoDuplicateNotStartedTX;
    }
 
    /**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index 1b2b665..a6e11a9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -68,7 +68,7 @@ public interface RoutingContext {
 
    void addQueueWithAck(SimpleString address, Queue queue);
 
-   boolean isAlreadyAcked(SimpleString address, Queue queue);
+   boolean isAlreadyAcked(Message message, Queue queue);
 
    void setAddress(SimpleString address);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 3cd8869..7a99294 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -192,8 +192,8 @@ public class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
-   public boolean isAlreadyAcked(SimpleString address, Queue queue) {
-      RouteContextList listing = map.get(address);
+   public boolean isAlreadyAcked(Message message, Queue queue) {
+      final RouteContextList listing = map.get(getAddress(message));
       return listing == null ? false : listing.isAlreadyAcked(queue);
    }
 

[activemq-artemis] 02/03: ARTEMIS-3219 Improve FQQN message routing

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 08ec7c67c870e9f8996bd7505e81689b0300b85d
Author: franz1981 <ni...@gmail.com>
AuthorDate: Fri Apr 2 09:50:43 2021 +0200

    ARTEMIS-3219 Improve FQQN message routing
---
 .../artemis/core/postoffice/impl/BindingsImpl.java | 383 ++++++++-------------
 .../core/postoffice/impl/CopyOnWriteBindings.java  | 270 +++++++++++++++
 2 files changed, 414 insertions(+), 239 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 5c13c6c..4cc45b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -25,12 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -54,9 +53,7 @@ public final class BindingsImpl implements Bindings {
    // This is public as we use on test assertions
    public static final int MAX_GROUP_RETRY = 10;
 
-   private final ConcurrentMap<SimpleString, List<Binding>> routingNameBindingMap = new ConcurrentHashMap<>();
-
-   private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<>();
+   private final CopyOnWriteBindings routingNameBindingMap = new CopyOnWriteBindings();
 
    private final Map<Long, Binding> bindingsIdMap = new ConcurrentHashMap<>();
 
@@ -113,6 +110,8 @@ public final class BindingsImpl implements Bindings {
       }
    }
 
+
+
    @Override
    public void addBinding(final Binding binding) {
       try {
@@ -122,30 +121,14 @@ public final class BindingsImpl implements Bindings {
          if (binding.isExclusive()) {
             exclusiveBindings.add(binding);
          } else {
-            SimpleString routingName = binding.getRoutingName();
-
-            List<Binding> bindings = routingNameBindingMap.get(routingName);
-
-            if (bindings == null) {
-               bindings = new CopyOnWriteArrayList<>();
-
-               List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
-
-               if (oldBindings != null) {
-                  bindings = oldBindings;
-               }
-            }
-
-            if (!bindings.contains(binding)) {
-               bindings.add(binding);
-            }
+            routingNameBindingMap.addBindingIfAbsent(binding);
          }
 
          bindingsIdMap.put(binding.getID(), binding);
          bindingsNameMap.put(binding.getUniqueName(), binding);
 
          if (binding instanceof RemoteQueueBinding) {
-            setMessageLoadBalancingType(((RemoteQueueBinding)binding).getMessageLoadBalancingType());
+            setMessageLoadBalancingType(((RemoteQueueBinding) binding).getMessageLoadBalancingType());
          }
          if (logger.isTraceEnabled()) {
             logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
@@ -174,17 +157,7 @@ public final class BindingsImpl implements Bindings {
          if (binding.isExclusive()) {
             exclusiveBindings.remove(binding);
          } else {
-            SimpleString routingName = binding.getRoutingName();
-
-            List<Binding> bindings = routingNameBindingMap.get(routingName);
-
-            if (bindings != null) {
-               bindings.remove(binding);
-
-               if (bindings.isEmpty()) {
-                  routingNameBindingMap.remove(routingName);
-               }
-            }
+            routingNameBindingMap.removeBinding(binding);
          }
 
          bindingsIdMap.remove(binding.getID());
@@ -208,78 +181,56 @@ public final class BindingsImpl implements Bindings {
    public boolean redistribute(final Message message,
                                final Queue originatingQueue,
                                final RoutingContext context) throws Exception {
-      if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
+      final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
+      if (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || loadBalancingType.equals(MessageLoadBalancingType.OFF)) {
          return false;
       }
 
       if (logger.isTraceEnabled()) {
-         logger.trace("Redistributing message " + message);
+         logger.tracef("Redistributing message %s", message);
       }
 
-      SimpleString routingName = originatingQueue.getName();
+      final SimpleString routingName = originatingQueue.getName();
 
-      List<Binding> bindings = routingNameBindingMap.get(routingName);
+      final Pair<Binding[], CopyOnWriteBindings.BindingIndex> bindingsAndPosition = routingNameBindingMap.getBindings(routingName);
 
-      if (bindings == null) {
+      if (bindingsAndPosition == null) {
          // The value can become null if it's concurrently removed while we're iterating - this is expected
          // ConcurrentHashMap behaviour!
          return false;
       }
 
-      Integer ipos = routingNamePositions.get(routingName);
-
-      int pos = ipos != null ? ipos.intValue() : 0;
-
-      int length = bindings.size();
-
-      int startPos = pos;
+      final Binding[] bindings = bindingsAndPosition.getA();
 
-      Binding theBinding = null;
+      final CopyOnWriteBindings.BindingIndex bindingIndex = bindingsAndPosition.getB();
 
-      // TODO - combine this with similar logic in route()
-      while (true) {
-         Binding binding;
-         try {
-            binding = bindings.get(pos);
-         } catch (IndexOutOfBoundsException e) {
-            // This can occur if binding is removed while in route
-            if (!bindings.isEmpty()) {
-               pos = 0;
-               startPos = 0;
-               length = bindings.size();
-
-               continue;
-            } else {
-               break;
-            }
-         }
+      assert bindings.length > 0;
 
-         pos = incrementPos(pos, length);
+      final int bindingsCount = bindings.length;
 
-         Filter filter = binding.getFilter();
+      int nextPosition = bindingIndex.getIndex();
 
-         boolean highPrior = binding.isHighAcceptPriority(message);
+      if (nextPosition >= bindingsCount) {
+         nextPosition = 0;
+      }
 
+      Binding nextBinding = null;
+      for (int i = 0; i < bindingsCount; i++) {
+         final Binding binding = bindings[nextPosition];
+         nextPosition = moveNextPosition(nextPosition, bindingsCount);
+         final Filter filter = binding.getFilter();
+         final boolean highPrior = binding.isHighAcceptPriority(message);
          if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) {
-            theBinding = binding;
-
-            break;
-         }
-
-         if (pos == startPos) {
+            nextBinding = binding;
             break;
          }
       }
-
-      routingNamePositions.put(routingName, pos);
-
-      if (theBinding != null) {
-         theBinding.route(message, context);
-
-         return true;
-      } else {
+      if (nextBinding == null) {
          return false;
       }
+      bindingIndex.setIndex(nextPosition);
+      nextBinding.route(message, context);
+      return true;
    }
 
    @Override
@@ -290,8 +241,8 @@ public final class BindingsImpl implements Bindings {
    private void route(final Message message,
                       final RoutingContext context,
                       final boolean groupRouting) throws Exception {
-      int currentVersion = version.get();
-      boolean reusableContext = context.isReusable(message, currentVersion);
+      final int currentVersion = version.get();
+      final boolean reusableContext = context.isReusable(message, currentVersion);
 
       if (!reusableContext) {
          context.clear();
@@ -300,54 +251,33 @@ public final class BindingsImpl implements Bindings {
       /* This is a special treatment for scaled-down messages involving SnF queues.
        * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
        */
-      byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
+      final byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
 
       if (ids != null) {
-         ByteBuffer buffer = ByteBuffer.wrap(ids);
-         while (buffer.hasRemaining()) {
-            long id = buffer.getLong();
-            for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
-               if (entry.getValue() instanceof RemoteQueueBinding) {
-                  RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
-                  if (remoteQueueBinding.getRemoteQueueID() == id) {
-                     message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
-                  }
-               }
-            }
-         }
+         handleScaledDownMessage(message, ids);
       }
 
-      boolean routed = false;
-
-      boolean hasExclusives = false;
-
-      for (Binding binding : exclusiveBindings) {
-         if (!hasExclusives) {
-            context.clear().setReusable(false);
-            hasExclusives = true;
-         }
-
-         if (binding.getFilter() == null || binding.getFilter().match(message)) {
-            binding.getBindable().route(message, context);
-            routed = true;
-         }
+      final boolean routed;
+      // despite the double check can lead to some race, this can save allocating an iterator for an empty set
+      if (!exclusiveBindings.isEmpty()) {
+         routed = routeToExclusiveBindings(message, context);
+      } else {
+         routed = false;
       }
       if (!routed) {
          // Remove the ids now, in order to avoid double check
-         ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
-
-         // Fetch the groupId now, in order to avoid double checking
-         SimpleString groupId = message.getGroupID();
+         final byte[] routeToIds = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
 
-         if (ids != null) {
+         SimpleString groupId;
+         if (routeToIds != null) {
             context.clear().setReusable(false);
-            routeFromCluster(message, context, ids);
-         } else if (groupingHandler != null && groupRouting && groupId != null) {
+            routeFromCluster(message, context, routeToIds);
+         } else if (groupRouting && groupingHandler != null && (groupId = message.getGroupID()) != null) {
             context.clear().setReusable(false);
             routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
          } else if (CompositeAddress.isFullyQualified(message.getAddress())) {
             context.clear().setReusable(false);
-            Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
+            final Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
             if (theBinding != null) {
                theBinding.route(message, context);
             }
@@ -361,35 +291,58 @@ public final class BindingsImpl implements Bindings {
       }
    }
 
-   private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception {
-      if (logger.isTraceEnabled()) {
-         logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context);
+   private boolean routeToExclusiveBindings(final Message message, final RoutingContext context) throws Exception {
+      boolean hasExclusives = false;
+      boolean routed = false;
+      for (Binding binding : exclusiveBindings) {
+         if (!hasExclusives) {
+            context.clear().setReusable(false);
+            hasExclusives = true;
+         }
+         final Filter filter = binding.getFilter();
+         if (filter == null || filter.match(message)) {
+            binding.getBindable().route(message, context);
+            routed = true;
+         }
       }
+      return routed;
+   }
 
-      for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
-         SimpleString routingName = entry.getKey();
-
-         List<Binding> bindings = entry.getValue();
-
-         if (bindings == null) {
-            // The value can become null if it's concurrently removed while we're iterating - this is expected
-            // ConcurrentHashMap behaviour!
-            continue;
+   private void handleScaledDownMessage(final Message message, final byte[] ids) {
+      ByteBuffer buffer = ByteBuffer.wrap(ids);
+      while (buffer.hasRemaining()) {
+         long id = buffer.getLong();
+         for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
+            if (entry.getValue() instanceof RemoteQueueBinding) {
+               RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
+               if (remoteQueueBinding.getRemoteQueueID() == id) {
+                  message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
+               }
+            }
          }
+      }
+   }
 
-         Binding theBinding = getNextBinding(message, routingName, bindings);
+   private void simpleRouting(final Message message,
+                              final RoutingContext context,
+                              final int currentVersion) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.tracef("Routing message %s on binding=%s current context::$s", message, this, context);
+      }
 
-         if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) {
+      routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
+         final Binding nextBinding = getNextBinding(message, bindings, nextPosition);
+         if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) {
             context.setReusable(true, currentVersion);
          } else {
             // notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it
             context.setReusable(false, currentVersion);
          }
 
-         if (theBinding != null) {
-            theBinding.route(message, context);
+         if (nextBinding != null) {
+            nextBinding.route(message, context);
          }
-      }
+      });
    }
 
    @Override
@@ -406,99 +359,65 @@ public final class BindingsImpl implements Bindings {
     * (depending if you are using multi-thread), and not lose messages.
     */
    private Binding getNextBinding(final Message message,
-                                  final SimpleString routingName,
-                                  final List<Binding> bindings) {
-      Integer ipos = routingNamePositions.get(routingName);
+                                  final Binding[] bindings,
+                                  final CopyOnWriteBindings.BindingIndex bindingIndex) {
+      int nextPosition = bindingIndex.getIndex();
 
-      int pos = ipos != null ? ipos : 0;
+      final int bindingsCount = bindings.length;
 
-      int length = bindings.size();
-
-      int startPos = pos;
-
-      Binding theBinding = null;
+      if (nextPosition >= bindingsCount) {
+         nextPosition = 0;
+      }
 
+      Binding nextBinding = null;
       int lastLowPriorityBinding = -1;
+      // snapshot this, to save loading it on each iteration
+      final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
 
-      while (true) {
-         Binding binding;
-         try {
-            binding = bindings.get(pos);
-         } catch (IndexOutOfBoundsException e) {
-            // This can occur if binding is removed while in route
-            if (!bindings.isEmpty()) {
-               pos = 0;
-               startPos = 0;
-               length = bindings.size();
-
-               continue;
-            } else {
-               break;
-            }
-         }
-
-         if (matchBinding(message, binding)) {
+      for (int i = 0; i < bindingsCount; i++) {
+         final Binding binding = bindings[nextPosition];
+         if (matchBinding(message, binding, loadBalancingType)) {
             // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
             // unnecessary overhead)
-            if (length == 1 || (binding.isConnected() && (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) {
-               theBinding = binding;
-
-               pos = incrementPos(pos, length);
-
+            if (bindingsCount == 1 || (binding.isConnected() && (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) {
+               nextBinding = binding;
+               nextPosition = moveNextPosition(nextPosition, bindingsCount);
                break;
-            } else {
-               //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers,
-               // the localQueue should always have the priority over the secondary bindings
-               if (lastLowPriorityBinding == -1 || messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) {
-                  lastLowPriorityBinding = pos;
-               }
             }
-         }
-
-         pos = incrementPos(pos, length);
-
-         if (pos == startPos) {
-
-            // if no bindings were found, we will apply a secondary level on the routing logic
-            if (lastLowPriorityBinding != -1) {
-               try {
-                  theBinding = bindings.get(lastLowPriorityBinding);
-               } catch (IndexOutOfBoundsException e) {
-                  // This can occur if binding is removed while in route
-                  if (!bindings.isEmpty()) {
-                     pos = 0;
-
-                     lastLowPriorityBinding = -1;
-
-                     continue;
-                  } else {
-                     break;
-                  }
-               }
-
-               pos = incrementPos(lastLowPriorityBinding, length);
+            //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers,
+            // the localQueue should always have the priority over the secondary bindings
+            if (lastLowPriorityBinding == -1 || loadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) {
+               lastLowPriorityBinding = nextPosition;
             }
-            break;
          }
+         nextPosition = moveNextPosition(nextPosition, bindingsCount);
       }
-      if (pos != startPos) {
-         routingNamePositions.put(routingName, pos);
+      if (nextBinding == null) {
+         // if no bindings were found, we will apply a secondary level on the routing logic
+         if (lastLowPriorityBinding != -1) {
+            nextBinding = bindings[lastLowPriorityBinding];
+            nextPosition = moveNextPosition(lastLowPriorityBinding, bindingsCount);
+         }
+      }
+      if (nextBinding != null) {
+         bindingIndex.setIndex(nextPosition);
       }
-      return theBinding;
+      return nextBinding;
    }
 
-   private boolean matchBinding(Message message, Binding binding) {
-      if (messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
+   private static boolean matchBinding(final Message message,
+                                       final Binding binding,
+                                       final MessageLoadBalancingType loadBalancingType) {
+      if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
          return false;
       }
 
-      Filter filter = binding.getFilter();
+      final Filter filter = binding.getFilter();
 
       if (filter == null || filter.match(message)) {
          return true;
-      } else {
-         return false;
       }
+      return false;
    }
 
    private void routeUsingStrictOrdering(final Message message,
@@ -506,17 +425,7 @@ public final class BindingsImpl implements Bindings {
                                          final GroupingHandler groupingGroupingHandler,
                                          final SimpleString groupId,
                                          final int tries) throws Exception {
-      for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
-         SimpleString routingName = entry.getKey();
-
-         List<Binding> bindings = entry.getValue();
-
-         if (bindings == null) {
-            // The value can become null if it's concurrently removed while we're iterating - this is expected
-            // ConcurrentHashMap behaviour!
-            continue;
-         }
-
+      routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
          // concat a full group id, this is for when a binding has multiple bindings
          // NOTE: In case a dev ever change this rule, QueueImpl::unproposed is using this rule to determine if
          //       the binding belongs to its Queue before removing it
@@ -527,9 +436,9 @@ public final class BindingsImpl implements Bindings {
 
          if (resp == null) {
             // ok let's find the next binding to propose
-            Binding theBinding = getNextBinding(message, routingName, bindings);
+            Binding theBinding = getNextBinding(message, bindings, nextPosition);
             if (theBinding == null) {
-               continue;
+               return;
             }
 
             resp = groupingGroupingHandler.propose(new Proposal(fullID, theBinding.getClusterName()));
@@ -554,10 +463,10 @@ public final class BindingsImpl implements Bindings {
 
             routeAndCheckNull(message, context, resp, chosen, groupId, tries);
          }
-      }
+      });
    }
 
-   private Binding locateBinding(SimpleString clusterName, List<Binding> bindings) {
+   private static Binding locateBinding(SimpleString clusterName, Binding[] bindings) {
       for (Binding binding : bindings) {
          if (binding.getClusterName().equals(clusterName)) {
             return binding;
@@ -603,21 +512,13 @@ public final class BindingsImpl implements Bindings {
       if (routingNameBindingMap.isEmpty()) {
          out.println("\tEMPTY!");
       }
-      for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
-         out.println("\tkey=" + entry.getKey() + ", value(s):");
-         for (Binding bind : entry.getValue()) {
+      routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
+         out.println("\tkey=" + routingName + ",\tposition=" + nextPosition.getIndex() + "\tvalue(s):");
+         for (Binding bind : bindings) {
             out.println("\t\t" + bind);
          }
          out.println();
-      }
-
-      out.println("routingNamePositions:");
-      if (routingNamePositions.isEmpty()) {
-         out.println("\tEMPTY!");
-      }
-      for (Map.Entry<SimpleString, Integer> entry : routingNamePositions.entrySet()) {
-         out.println("\tkey=" + entry.getKey() + ", value=" + entry.getValue());
-      }
+      });
 
       out.println();
 
@@ -679,17 +580,21 @@ public final class BindingsImpl implements Bindings {
       }
    }
 
-   private int incrementPos(int pos, final int length) {
-      pos++;
+   private static int moveNextPosition(int position, final int length) {
+      position++;
 
-      if (pos == length) {
-         pos = 0;
+      if (position == length) {
+         position = 0;
       }
 
-      return pos;
+      return position;
    }
 
+   /**
+    * debug method: used just for tests!!
+    * @return
+    */
    public Map<SimpleString, List<Binding>> getRoutingNameBindingMap() {
-      return routingNameBindingMap;
+      return routingNameBindingMap.copyAsMap();
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java
new file mode 100644
index 0000000..fc79da4
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+
+/**
+ * This is a copy-on-write map of {@link Binding} along with the last index set.<br>
+ */
+final class CopyOnWriteBindings {
+
+   public interface BindingIndex {
+
+      /**
+       * Cannot return a negative value and returns {@code 0} if uninitialized.
+       */
+      int getIndex();
+
+      /**
+       * Cannot set a negative value.
+       */
+      void setIndex(int v);
+   }
+
+   private static final class BindingsAndPosition extends AtomicReference<Binding[]> implements BindingIndex {
+
+      private static final AtomicIntegerFieldUpdater<BindingsAndPosition> NEXT_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextPosition");
+
+      public volatile int nextPosition;
+
+      BindingsAndPosition(Binding[] bindings) {
+         super(bindings);
+         NEXT_POSITION_UPDATER.lazySet(this, 0);
+      }
+
+      @Override
+      public int getIndex() {
+         return nextPosition;
+      }
+
+      @Override
+      public void setIndex(int v) {
+         if (v < 0) {
+            throw new IllegalArgumentException("cannot set a negative position");
+         }
+         NEXT_POSITION_UPDATER.lazySet(this, v);
+      }
+   }
+
+   private final ConcurrentHashMap<SimpleString, BindingsAndPosition> map;
+
+   CopyOnWriteBindings() {
+      map = new ConcurrentHashMap<>();
+   }
+
+   /**
+    * Add the specified {@code binding}, if not present.
+    */
+   public void addBindingIfAbsent(Binding binding) {
+      Objects.requireNonNull(binding);
+      final SimpleString routingName = binding.getRoutingName();
+      Objects.requireNonNull(routingName);
+      BindingsAndPosition bindings;
+      do {
+         bindings = map.get(routingName);
+         if (bindings == null || bindings.get() == TOMBSTONE_BINDINGS) {
+            final BindingsAndPosition newBindings = new BindingsAndPosition(new Binding[]{binding});
+            bindings = map.compute(routingName, (ignored, bindingsAndPosition) -> {
+               if (bindingsAndPosition == null || bindingsAndPosition.get() == TOMBSTONE_BINDINGS) {
+                  return newBindings;
+               }
+               return bindingsAndPosition;
+            });
+            assert bindings != null;
+            if (bindings == newBindings) {
+               return;
+            }
+         }
+      }
+      while (!addBindingIfAbsent(bindings, binding));
+   }
+
+   /**
+    * Remove the specified {@code binding}, if present.
+    */
+   public void removeBinding(Binding binding) {
+      Objects.requireNonNull(binding);
+      final SimpleString routingName = binding.getRoutingName();
+      Objects.requireNonNull(routingName);
+      final BindingsAndPosition bindings = map.get(routingName);
+      if (bindings == null) {
+         return;
+      }
+      final Binding[] newBindings = removeBindingIfPresent(bindings, binding);
+      if (newBindings == TOMBSTONE_BINDINGS) {
+         // GC attempt
+         map.computeIfPresent(routingName, (bindingsRoutingName, existingBindings) -> {
+            if (existingBindings.get() == TOMBSTONE_BINDINGS) {
+               return null;
+            }
+            return existingBindings;
+         });
+      }
+   }
+
+   /**
+    * Returns a snapshot of the bindings, if present and a "lazy" binding index, otherwise {@code null}.<br>
+    * There is no strong commitment on preserving the index value if the related bindings are concurrently modified
+    * or the index itself is concurrently modified.
+    */
+   public Pair<Binding[], BindingIndex> getBindings(SimpleString routingName) {
+      Objects.requireNonNull(routingName);
+      BindingsAndPosition bindings = map.get(routingName);
+      if (bindings == null) {
+         return null;
+      }
+      final Binding[] bindingsSnapshot = bindings.get();
+      if (bindingsSnapshot == TOMBSTONE_BINDINGS) {
+         return null;
+      }
+      assert bindingsSnapshot != null && bindingsSnapshot.length > 0;
+      return new Pair<>(bindingsSnapshot, bindings);
+   }
+
+   @FunctionalInterface
+   public interface BindingsConsumer<T extends Throwable> {
+
+      /**
+       * {@code routingName} cannot be {@code null}.
+       * {@code bindings} cannot be {@code null} or empty.
+       * {@code nextPosition} cannot be null.
+       */
+      void accept(SimpleString routingName, Binding[] bindings, BindingIndex nextPosition) throws T;
+   }
+
+   /**
+    * Iterates through the bindings and its related indexes.<br>
+    */
+   public <T extends Throwable> void forEach(BindingsConsumer<T> bindingsConsumer) throws T {
+      Objects.requireNonNull(bindingsConsumer);
+      if (map.isEmpty()) {
+         return;
+      }
+      for (Map.Entry<SimpleString, BindingsAndPosition> entry : map.entrySet()) {
+         final BindingsAndPosition value = entry.getValue();
+         final Binding[] bindings = value.get();
+         if (bindings == TOMBSTONE_BINDINGS) {
+            continue;
+         }
+         assert bindings != null && bindings.length > 0;
+         bindingsConsumer.accept(entry.getKey(), bindings, value);
+      }
+   }
+
+   public boolean isEmpty() {
+      return map.isEmpty();
+   }
+
+   public Map<SimpleString, List<Binding>> copyAsMap() {
+      if (map.isEmpty()) {
+         return Collections.emptyMap();
+      }
+      final HashMap<SimpleString, List<Binding>> copy = new HashMap<>(map.size());
+      map.forEach((routingName, bindings) -> {
+         final Binding[] bindingArray = bindings.get();
+         if (bindingArray == TOMBSTONE_BINDINGS) {
+            return;
+         }
+         copy.put(routingName, Arrays.asList(bindingArray));
+      });
+      return copy;
+   }
+
+   private static final Binding[] TOMBSTONE_BINDINGS = new Binding[0];
+
+   private static int indexOfBinding(final Binding[] bindings, final Binding toFind) {
+      for (int i = 0, size = bindings.length; i < size; i++) {
+         final Binding binding = bindings[i];
+         if (binding.equals(toFind)) {
+            return i;
+         }
+      }
+      return -1;
+   }
+
+   /**
+    * Remove the binding if present and returns the new bindings, {@code null} otherwise.
+    */
+   private static Binding[] removeBindingIfPresent(final AtomicReference<Binding[]> bindings,
+                                                   final Binding bindingToRemove) {
+      Objects.requireNonNull(bindings);
+      Objects.requireNonNull(bindingToRemove);
+      Binding[] oldBindings;
+      Binding[] newBindings;
+      do {
+         oldBindings = bindings.get();
+         // no need to check vs TOMBSTONE_BINDINGS, because found === -1;
+         final int found = indexOfBinding(oldBindings, bindingToRemove);
+         if (found == -1) {
+            return null;
+         }
+         final int oldBindingsCount = oldBindings.length;
+         if (oldBindingsCount == 1) {
+            newBindings = TOMBSTONE_BINDINGS;
+         } else {
+            final int newBindingsCount = oldBindingsCount - 1;
+            newBindings = new Binding[newBindingsCount];
+            System.arraycopy(oldBindings, 0, newBindings, 0, found);
+            final int remaining = newBindingsCount - found;
+            if (remaining > 0) {
+               System.arraycopy(oldBindings, found + 1, newBindings, found, remaining);
+            }
+         }
+      }
+      while (!bindings.compareAndSet(oldBindings, newBindings));
+      return newBindings;
+   }
+
+   /**
+    * Returns {@code true} if the given binding has been added or already present,
+    * {@code false} if bindings are going to be garbage-collected.
+    */
+   private static boolean addBindingIfAbsent(final AtomicReference<Binding[]> bindings, final Binding newBinding) {
+      Objects.requireNonNull(bindings);
+      Objects.requireNonNull(newBinding);
+      Binding[] oldBindings;
+      Binding[] newBindings;
+      do {
+         oldBindings = bindings.get();
+         if (oldBindings == TOMBSTONE_BINDINGS) {
+            return false;
+         }
+         if (indexOfBinding(oldBindings, newBinding) >= 0) {
+            return true;
+         }
+         final int oldLength = oldBindings.length;
+         newBindings = Arrays.copyOf(oldBindings, oldLength + 1);
+         assert newBindings[oldLength] == null;
+         newBindings[oldLength] = newBinding;
+      }
+      while (!bindings.compareAndSet(oldBindings, newBindings));
+      return true;
+   }
+}