You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/04/05 14:41:27 UTC

[activemq-artemis] 02/03: ARTEMIS-3219 Improve FQQN message routing

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 08ec7c67c870e9f8996bd7505e81689b0300b85d
Author: franz1981 <ni...@gmail.com>
AuthorDate: Fri Apr 2 09:50:43 2021 +0200

    ARTEMIS-3219 Improve FQQN message routing
---
 .../artemis/core/postoffice/impl/BindingsImpl.java | 383 ++++++++-------------
 .../core/postoffice/impl/CopyOnWriteBindings.java  | 270 +++++++++++++++
 2 files changed, 414 insertions(+), 239 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 5c13c6c..4cc45b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -25,12 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -54,9 +53,7 @@ public final class BindingsImpl implements Bindings {
    // This is public as we use on test assertions
    public static final int MAX_GROUP_RETRY = 10;
 
-   private final ConcurrentMap<SimpleString, List<Binding>> routingNameBindingMap = new ConcurrentHashMap<>();
-
-   private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<>();
+   private final CopyOnWriteBindings routingNameBindingMap = new CopyOnWriteBindings();
 
    private final Map<Long, Binding> bindingsIdMap = new ConcurrentHashMap<>();
 
@@ -113,6 +110,8 @@ public final class BindingsImpl implements Bindings {
       }
    }
 
+
+
    @Override
    public void addBinding(final Binding binding) {
       try {
@@ -122,30 +121,14 @@ public final class BindingsImpl implements Bindings {
          if (binding.isExclusive()) {
             exclusiveBindings.add(binding);
          } else {
-            SimpleString routingName = binding.getRoutingName();
-
-            List<Binding> bindings = routingNameBindingMap.get(routingName);
-
-            if (bindings == null) {
-               bindings = new CopyOnWriteArrayList<>();
-
-               List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
-
-               if (oldBindings != null) {
-                  bindings = oldBindings;
-               }
-            }
-
-            if (!bindings.contains(binding)) {
-               bindings.add(binding);
-            }
+            routingNameBindingMap.addBindingIfAbsent(binding);
          }
 
          bindingsIdMap.put(binding.getID(), binding);
          bindingsNameMap.put(binding.getUniqueName(), binding);
 
          if (binding instanceof RemoteQueueBinding) {
-            setMessageLoadBalancingType(((RemoteQueueBinding)binding).getMessageLoadBalancingType());
+            setMessageLoadBalancingType(((RemoteQueueBinding) binding).getMessageLoadBalancingType());
          }
          if (logger.isTraceEnabled()) {
             logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
@@ -174,17 +157,7 @@ public final class BindingsImpl implements Bindings {
          if (binding.isExclusive()) {
             exclusiveBindings.remove(binding);
          } else {
-            SimpleString routingName = binding.getRoutingName();
-
-            List<Binding> bindings = routingNameBindingMap.get(routingName);
-
-            if (bindings != null) {
-               bindings.remove(binding);
-
-               if (bindings.isEmpty()) {
-                  routingNameBindingMap.remove(routingName);
-               }
-            }
+            routingNameBindingMap.removeBinding(binding);
          }
 
          bindingsIdMap.remove(binding.getID());
@@ -208,78 +181,56 @@ public final class BindingsImpl implements Bindings {
    public boolean redistribute(final Message message,
                                final Queue originatingQueue,
                                final RoutingContext context) throws Exception {
-      if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
+      final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
+      if (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || loadBalancingType.equals(MessageLoadBalancingType.OFF)) {
          return false;
       }
 
       if (logger.isTraceEnabled()) {
-         logger.trace("Redistributing message " + message);
+         logger.tracef("Redistributing message %s", message);
       }
 
-      SimpleString routingName = originatingQueue.getName();
+      final SimpleString routingName = originatingQueue.getName();
 
-      List<Binding> bindings = routingNameBindingMap.get(routingName);
+      final Pair<Binding[], CopyOnWriteBindings.BindingIndex> bindingsAndPosition = routingNameBindingMap.getBindings(routingName);
 
-      if (bindings == null) {
+      if (bindingsAndPosition == null) {
          // The value can become null if it's concurrently removed while we're iterating - this is expected
          // ConcurrentHashMap behaviour!
          return false;
       }
 
-      Integer ipos = routingNamePositions.get(routingName);
-
-      int pos = ipos != null ? ipos.intValue() : 0;
-
-      int length = bindings.size();
-
-      int startPos = pos;
+      final Binding[] bindings = bindingsAndPosition.getA();
 
-      Binding theBinding = null;
+      final CopyOnWriteBindings.BindingIndex bindingIndex = bindingsAndPosition.getB();
 
-      // TODO - combine this with similar logic in route()
-      while (true) {
-         Binding binding;
-         try {
-            binding = bindings.get(pos);
-         } catch (IndexOutOfBoundsException e) {
-            // This can occur if binding is removed while in route
-            if (!bindings.isEmpty()) {
-               pos = 0;
-               startPos = 0;
-               length = bindings.size();
-
-               continue;
-            } else {
-               break;
-            }
-         }
+      assert bindings.length > 0;
 
-         pos = incrementPos(pos, length);
+      final int bindingsCount = bindings.length;
 
-         Filter filter = binding.getFilter();
+      int nextPosition = bindingIndex.getIndex();
 
-         boolean highPrior = binding.isHighAcceptPriority(message);
+      if (nextPosition >= bindingsCount) {
+         nextPosition = 0;
+      }
 
+      Binding nextBinding = null;
+      for (int i = 0; i < bindingsCount; i++) {
+         final Binding binding = bindings[nextPosition];
+         nextPosition = moveNextPosition(nextPosition, bindingsCount);
+         final Filter filter = binding.getFilter();
+         final boolean highPrior = binding.isHighAcceptPriority(message);
          if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) {
-            theBinding = binding;
-
-            break;
-         }
-
-         if (pos == startPos) {
+            nextBinding = binding;
             break;
          }
       }
-
-      routingNamePositions.put(routingName, pos);
-
-      if (theBinding != null) {
-         theBinding.route(message, context);
-
-         return true;
-      } else {
+      if (nextBinding == null) {
          return false;
       }
+      bindingIndex.setIndex(nextPosition);
+      nextBinding.route(message, context);
+      return true;
    }
 
    @Override
@@ -290,8 +241,8 @@ public final class BindingsImpl implements Bindings {
    private void route(final Message message,
                       final RoutingContext context,
                       final boolean groupRouting) throws Exception {
-      int currentVersion = version.get();
-      boolean reusableContext = context.isReusable(message, currentVersion);
+      final int currentVersion = version.get();
+      final boolean reusableContext = context.isReusable(message, currentVersion);
 
       if (!reusableContext) {
          context.clear();
@@ -300,54 +251,33 @@ public final class BindingsImpl implements Bindings {
       /* This is a special treatment for scaled-down messages involving SnF queues.
        * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
        */
-      byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
+      final byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
 
       if (ids != null) {
-         ByteBuffer buffer = ByteBuffer.wrap(ids);
-         while (buffer.hasRemaining()) {
-            long id = buffer.getLong();
-            for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
-               if (entry.getValue() instanceof RemoteQueueBinding) {
-                  RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
-                  if (remoteQueueBinding.getRemoteQueueID() == id) {
-                     message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
-                  }
-               }
-            }
-         }
+         handleScaledDownMessage(message, ids);
       }
 
-      boolean routed = false;
-
-      boolean hasExclusives = false;
-
-      for (Binding binding : exclusiveBindings) {
-         if (!hasExclusives) {
-            context.clear().setReusable(false);
-            hasExclusives = true;
-         }
-
-         if (binding.getFilter() == null || binding.getFilter().match(message)) {
-            binding.getBindable().route(message, context);
-            routed = true;
-         }
+      final boolean routed;
+      // despite the double check can lead to some race, this can save allocating an iterator for an empty set
+      if (!exclusiveBindings.isEmpty()) {
+         routed = routeToExclusiveBindings(message, context);
+      } else {
+         routed = false;
       }
       if (!routed) {
          // Remove the ids now, in order to avoid double check
-         ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
-
-         // Fetch the groupId now, in order to avoid double checking
-         SimpleString groupId = message.getGroupID();
+         final byte[] routeToIds = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
 
-         if (ids != null) {
+         SimpleString groupId;
+         if (routeToIds != null) {
             context.clear().setReusable(false);
-            routeFromCluster(message, context, ids);
-         } else if (groupingHandler != null && groupRouting && groupId != null) {
+            routeFromCluster(message, context, routeToIds);
+         } else if (groupRouting && groupingHandler != null && (groupId = message.getGroupID()) != null) {
             context.clear().setReusable(false);
             routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
          } else if (CompositeAddress.isFullyQualified(message.getAddress())) {
             context.clear().setReusable(false);
-            Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
+            final Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
             if (theBinding != null) {
                theBinding.route(message, context);
             }
@@ -361,35 +291,58 @@ public final class BindingsImpl implements Bindings {
       }
    }
 
-   private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception {
-      if (logger.isTraceEnabled()) {
-         logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context);
+   private boolean routeToExclusiveBindings(final Message message, final RoutingContext context) throws Exception {
+      boolean hasExclusives = false;
+      boolean routed = false;
+      for (Binding binding : exclusiveBindings) {
+         if (!hasExclusives) {
+            context.clear().setReusable(false);
+            hasExclusives = true;
+         }
+         final Filter filter = binding.getFilter();
+         if (filter == null || filter.match(message)) {
+            binding.getBindable().route(message, context);
+            routed = true;
+         }
       }
+      return routed;
+   }
 
-      for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
-         SimpleString routingName = entry.getKey();
-
-         List<Binding> bindings = entry.getValue();
-
-         if (bindings == null) {
-            // The value can become null if it's concurrently removed while we're iterating - this is expected
-            // ConcurrentHashMap behaviour!
-            continue;
+   private void handleScaledDownMessage(final Message message, final byte[] ids) {
+      ByteBuffer buffer = ByteBuffer.wrap(ids);
+      while (buffer.hasRemaining()) {
+         long id = buffer.getLong();
+         for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
+            if (entry.getValue() instanceof RemoteQueueBinding) {
+               RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
+               if (remoteQueueBinding.getRemoteQueueID() == id) {
+                  message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
+               }
+            }
          }
+      }
+   }
 
-         Binding theBinding = getNextBinding(message, routingName, bindings);
+   private void simpleRouting(final Message message,
+                              final RoutingContext context,
+                              final int currentVersion) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.tracef("Routing message %s on binding=%s current context::$s", message, this, context);
+      }
 
-         if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) {
+      routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
+         final Binding nextBinding = getNextBinding(message, bindings, nextPosition);
+         if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) {
             context.setReusable(true, currentVersion);
          } else {
             // notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it
             context.setReusable(false, currentVersion);
          }
 
-         if (theBinding != null) {
-            theBinding.route(message, context);
+         if (nextBinding != null) {
+            nextBinding.route(message, context);
          }
-      }
+      });
    }
 
    @Override
@@ -406,99 +359,65 @@ public final class BindingsImpl implements Bindings {
     * (depending if you are using multi-thread), and not lose messages.
     */
    private Binding getNextBinding(final Message message,
-                                  final SimpleString routingName,
-                                  final List<Binding> bindings) {
-      Integer ipos = routingNamePositions.get(routingName);
+                                  final Binding[] bindings,
+                                  final CopyOnWriteBindings.BindingIndex bindingIndex) {
+      int nextPosition = bindingIndex.getIndex();
 
-      int pos = ipos != null ? ipos : 0;
+      final int bindingsCount = bindings.length;
 
-      int length = bindings.size();
-
-      int startPos = pos;
-
-      Binding theBinding = null;
+      if (nextPosition >= bindingsCount) {
+         nextPosition = 0;
+      }
 
+      Binding nextBinding = null;
       int lastLowPriorityBinding = -1;
+      // snapshot this, to save loading it on each iteration
+      final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
 
-      while (true) {
-         Binding binding;
-         try {
-            binding = bindings.get(pos);
-         } catch (IndexOutOfBoundsException e) {
-            // This can occur if binding is removed while in route
-            if (!bindings.isEmpty()) {
-               pos = 0;
-               startPos = 0;
-               length = bindings.size();
-
-               continue;
-            } else {
-               break;
-            }
-         }
-
-         if (matchBinding(message, binding)) {
+      for (int i = 0; i < bindingsCount; i++) {
+         final Binding binding = bindings[nextPosition];
+         if (matchBinding(message, binding, loadBalancingType)) {
             // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
             // unnecessary overhead)
-            if (length == 1 || (binding.isConnected() && (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) {
-               theBinding = binding;
-
-               pos = incrementPos(pos, length);
-
+            if (bindingsCount == 1 || (binding.isConnected() && (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) {
+               nextBinding = binding;
+               nextPosition = moveNextPosition(nextPosition, bindingsCount);
                break;
-            } else {
-               //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers,
-               // the localQueue should always have the priority over the secondary bindings
-               if (lastLowPriorityBinding == -1 || messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) {
-                  lastLowPriorityBinding = pos;
-               }
             }
-         }
-
-         pos = incrementPos(pos, length);
-
-         if (pos == startPos) {
-
-            // if no bindings were found, we will apply a secondary level on the routing logic
-            if (lastLowPriorityBinding != -1) {
-               try {
-                  theBinding = bindings.get(lastLowPriorityBinding);
-               } catch (IndexOutOfBoundsException e) {
-                  // This can occur if binding is removed while in route
-                  if (!bindings.isEmpty()) {
-                     pos = 0;
-
-                     lastLowPriorityBinding = -1;
-
-                     continue;
-                  } else {
-                     break;
-                  }
-               }
-
-               pos = incrementPos(lastLowPriorityBinding, length);
+            //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers,
+            // the localQueue should always have the priority over the secondary bindings
+            if (lastLowPriorityBinding == -1 || loadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) {
+               lastLowPriorityBinding = nextPosition;
             }
-            break;
          }
+         nextPosition = moveNextPosition(nextPosition, bindingsCount);
       }
-      if (pos != startPos) {
-         routingNamePositions.put(routingName, pos);
+      if (nextBinding == null) {
+         // if no bindings were found, we will apply a secondary level on the routing logic
+         if (lastLowPriorityBinding != -1) {
+            nextBinding = bindings[lastLowPriorityBinding];
+            nextPosition = moveNextPosition(lastLowPriorityBinding, bindingsCount);
+         }
+      }
+      if (nextBinding != null) {
+         bindingIndex.setIndex(nextPosition);
       }
-      return theBinding;
+      return nextBinding;
    }
 
-   private boolean matchBinding(Message message, Binding binding) {
-      if (messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
+   private static boolean matchBinding(final Message message,
+                                       final Binding binding,
+                                       final MessageLoadBalancingType loadBalancingType) {
+      if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
          return false;
       }
 
-      Filter filter = binding.getFilter();
+      final Filter filter = binding.getFilter();
 
       if (filter == null || filter.match(message)) {
          return true;
-      } else {
-         return false;
       }
+      return false;
    }
 
    private void routeUsingStrictOrdering(final Message message,
@@ -506,17 +425,7 @@ public final class BindingsImpl implements Bindings {
                                          final GroupingHandler groupingGroupingHandler,
                                          final SimpleString groupId,
                                          final int tries) throws Exception {
-      for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
-         SimpleString routingName = entry.getKey();
-
-         List<Binding> bindings = entry.getValue();
-
-         if (bindings == null) {
-            // The value can become null if it's concurrently removed while we're iterating - this is expected
-            // ConcurrentHashMap behaviour!
-            continue;
-         }
-
+      routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
          // concat a full group id, this is for when a binding has multiple bindings
          // NOTE: In case a dev ever change this rule, QueueImpl::unproposed is using this rule to determine if
          //       the binding belongs to its Queue before removing it
@@ -527,9 +436,9 @@ public final class BindingsImpl implements Bindings {
 
          if (resp == null) {
             // ok let's find the next binding to propose
-            Binding theBinding = getNextBinding(message, routingName, bindings);
+            Binding theBinding = getNextBinding(message, bindings, nextPosition);
             if (theBinding == null) {
-               continue;
+               return;
             }
 
             resp = groupingGroupingHandler.propose(new Proposal(fullID, theBinding.getClusterName()));
@@ -554,10 +463,10 @@ public final class BindingsImpl implements Bindings {
 
             routeAndCheckNull(message, context, resp, chosen, groupId, tries);
          }
-      }
+      });
    }
 
-   private Binding locateBinding(SimpleString clusterName, List<Binding> bindings) {
+   private static Binding locateBinding(SimpleString clusterName, Binding[] bindings) {
       for (Binding binding : bindings) {
          if (binding.getClusterName().equals(clusterName)) {
             return binding;
@@ -603,21 +512,13 @@ public final class BindingsImpl implements Bindings {
       if (routingNameBindingMap.isEmpty()) {
          out.println("\tEMPTY!");
       }
-      for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
-         out.println("\tkey=" + entry.getKey() + ", value(s):");
-         for (Binding bind : entry.getValue()) {
+      routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
+         out.println("\tkey=" + routingName + ",\tposition=" + nextPosition.getIndex() + "\tvalue(s):");
+         for (Binding bind : bindings) {
             out.println("\t\t" + bind);
          }
          out.println();
-      }
-
-      out.println("routingNamePositions:");
-      if (routingNamePositions.isEmpty()) {
-         out.println("\tEMPTY!");
-      }
-      for (Map.Entry<SimpleString, Integer> entry : routingNamePositions.entrySet()) {
-         out.println("\tkey=" + entry.getKey() + ", value=" + entry.getValue());
-      }
+      });
 
       out.println();
 
@@ -679,17 +580,21 @@ public final class BindingsImpl implements Bindings {
       }
    }
 
-   private int incrementPos(int pos, final int length) {
-      pos++;
+   private static int moveNextPosition(int position, final int length) {
+      position++;
 
-      if (pos == length) {
-         pos = 0;
+      if (position == length) {
+         position = 0;
       }
 
-      return pos;
+      return position;
    }
 
+   /**
+    * debug method: used just for tests!!
+    * @return
+    */
    public Map<SimpleString, List<Binding>> getRoutingNameBindingMap() {
-      return routingNameBindingMap;
+      return routingNameBindingMap.copyAsMap();
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java
new file mode 100644
index 0000000..fc79da4
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+
+/**
+ * This is a copy-on-write map of {@link Binding} along with the last index set.<br>
+ */
+final class CopyOnWriteBindings {
+
+   public interface BindingIndex {
+
+      /**
+       * Cannot return a negative value and returns {@code 0} if uninitialized.
+       */
+      int getIndex();
+
+      /**
+       * Cannot set a negative value.
+       */
+      void setIndex(int v);
+   }
+
+   private static final class BindingsAndPosition extends AtomicReference<Binding[]> implements BindingIndex {
+
+      private static final AtomicIntegerFieldUpdater<BindingsAndPosition> NEXT_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextPosition");
+
+      public volatile int nextPosition;
+
+      BindingsAndPosition(Binding[] bindings) {
+         super(bindings);
+         NEXT_POSITION_UPDATER.lazySet(this, 0);
+      }
+
+      @Override
+      public int getIndex() {
+         return nextPosition;
+      }
+
+      @Override
+      public void setIndex(int v) {
+         if (v < 0) {
+            throw new IllegalArgumentException("cannot set a negative position");
+         }
+         NEXT_POSITION_UPDATER.lazySet(this, v);
+      }
+   }
+
+   private final ConcurrentHashMap<SimpleString, BindingsAndPosition> map;
+
+   CopyOnWriteBindings() {
+      map = new ConcurrentHashMap<>();
+   }
+
+   /**
+    * Add the specified {@code binding}, if not present.
+    */
+   public void addBindingIfAbsent(Binding binding) {
+      Objects.requireNonNull(binding);
+      final SimpleString routingName = binding.getRoutingName();
+      Objects.requireNonNull(routingName);
+      BindingsAndPosition bindings;
+      do {
+         bindings = map.get(routingName);
+         if (bindings == null || bindings.get() == TOMBSTONE_BINDINGS) {
+            final BindingsAndPosition newBindings = new BindingsAndPosition(new Binding[]{binding});
+            bindings = map.compute(routingName, (ignored, bindingsAndPosition) -> {
+               if (bindingsAndPosition == null || bindingsAndPosition.get() == TOMBSTONE_BINDINGS) {
+                  return newBindings;
+               }
+               return bindingsAndPosition;
+            });
+            assert bindings != null;
+            if (bindings == newBindings) {
+               return;
+            }
+         }
+      }
+      while (!addBindingIfAbsent(bindings, binding));
+   }
+
+   /**
+    * Remove the specified {@code binding}, if present.
+    */
+   public void removeBinding(Binding binding) {
+      Objects.requireNonNull(binding);
+      final SimpleString routingName = binding.getRoutingName();
+      Objects.requireNonNull(routingName);
+      final BindingsAndPosition bindings = map.get(routingName);
+      if (bindings == null) {
+         return;
+      }
+      final Binding[] newBindings = removeBindingIfPresent(bindings, binding);
+      if (newBindings == TOMBSTONE_BINDINGS) {
+         // GC attempt
+         map.computeIfPresent(routingName, (bindingsRoutingName, existingBindings) -> {
+            if (existingBindings.get() == TOMBSTONE_BINDINGS) {
+               return null;
+            }
+            return existingBindings;
+         });
+      }
+   }
+
+   /**
+    * Returns a snapshot of the bindings, if present and a "lazy" binding index, otherwise {@code null}.<br>
+    * There is no strong commitment on preserving the index value if the related bindings are concurrently modified
+    * or the index itself is concurrently modified.
+    */
+   public Pair<Binding[], BindingIndex> getBindings(SimpleString routingName) {
+      Objects.requireNonNull(routingName);
+      BindingsAndPosition bindings = map.get(routingName);
+      if (bindings == null) {
+         return null;
+      }
+      final Binding[] bindingsSnapshot = bindings.get();
+      if (bindingsSnapshot == TOMBSTONE_BINDINGS) {
+         return null;
+      }
+      assert bindingsSnapshot != null && bindingsSnapshot.length > 0;
+      return new Pair<>(bindingsSnapshot, bindings);
+   }
+
+   @FunctionalInterface
+   public interface BindingsConsumer<T extends Throwable> {
+
+      /**
+       * {@code routingName} cannot be {@code null}.
+       * {@code bindings} cannot be {@code null} or empty.
+       * {@code nextPosition} cannot be null.
+       */
+      void accept(SimpleString routingName, Binding[] bindings, BindingIndex nextPosition) throws T;
+   }
+
+   /**
+    * Iterates through the bindings and its related indexes.<br>
+    */
+   public <T extends Throwable> void forEach(BindingsConsumer<T> bindingsConsumer) throws T {
+      Objects.requireNonNull(bindingsConsumer);
+      if (map.isEmpty()) {
+         return;
+      }
+      for (Map.Entry<SimpleString, BindingsAndPosition> entry : map.entrySet()) {
+         final BindingsAndPosition value = entry.getValue();
+         final Binding[] bindings = value.get();
+         if (bindings == TOMBSTONE_BINDINGS) {
+            continue;
+         }
+         assert bindings != null && bindings.length > 0;
+         bindingsConsumer.accept(entry.getKey(), bindings, value);
+      }
+   }
+
+   public boolean isEmpty() {
+      return map.isEmpty();
+   }
+
+   public Map<SimpleString, List<Binding>> copyAsMap() {
+      if (map.isEmpty()) {
+         return Collections.emptyMap();
+      }
+      final HashMap<SimpleString, List<Binding>> copy = new HashMap<>(map.size());
+      map.forEach((routingName, bindings) -> {
+         final Binding[] bindingArray = bindings.get();
+         if (bindingArray == TOMBSTONE_BINDINGS) {
+            return;
+         }
+         copy.put(routingName, Arrays.asList(bindingArray));
+      });
+      return copy;
+   }
+
+   private static final Binding[] TOMBSTONE_BINDINGS = new Binding[0];
+
+   private static int indexOfBinding(final Binding[] bindings, final Binding toFind) {
+      for (int i = 0, size = bindings.length; i < size; i++) {
+         final Binding binding = bindings[i];
+         if (binding.equals(toFind)) {
+            return i;
+         }
+      }
+      return -1;
+   }
+
+   /**
+    * Remove the binding if present and returns the new bindings, {@code null} otherwise.
+    */
+   private static Binding[] removeBindingIfPresent(final AtomicReference<Binding[]> bindings,
+                                                   final Binding bindingToRemove) {
+      Objects.requireNonNull(bindings);
+      Objects.requireNonNull(bindingToRemove);
+      Binding[] oldBindings;
+      Binding[] newBindings;
+      do {
+         oldBindings = bindings.get();
+         // no need to check vs TOMBSTONE_BINDINGS, because found === -1;
+         final int found = indexOfBinding(oldBindings, bindingToRemove);
+         if (found == -1) {
+            return null;
+         }
+         final int oldBindingsCount = oldBindings.length;
+         if (oldBindingsCount == 1) {
+            newBindings = TOMBSTONE_BINDINGS;
+         } else {
+            final int newBindingsCount = oldBindingsCount - 1;
+            newBindings = new Binding[newBindingsCount];
+            System.arraycopy(oldBindings, 0, newBindings, 0, found);
+            final int remaining = newBindingsCount - found;
+            if (remaining > 0) {
+               System.arraycopy(oldBindings, found + 1, newBindings, found, remaining);
+            }
+         }
+      }
+      while (!bindings.compareAndSet(oldBindings, newBindings));
+      return newBindings;
+   }
+
+   /**
+    * Returns {@code true} if the given binding has been added or already present,
+    * {@code false} if bindings are going to be garbage-collected.
+    */
+   private static boolean addBindingIfAbsent(final AtomicReference<Binding[]> bindings, final Binding newBinding) {
+      Objects.requireNonNull(bindings);
+      Objects.requireNonNull(newBinding);
+      Binding[] oldBindings;
+      Binding[] newBindings;
+      do {
+         oldBindings = bindings.get();
+         if (oldBindings == TOMBSTONE_BINDINGS) {
+            return false;
+         }
+         if (indexOfBinding(oldBindings, newBinding) >= 0) {
+            return true;
+         }
+         final int oldLength = oldBindings.length;
+         newBindings = Arrays.copyOf(oldBindings, oldLength + 1);
+         assert newBindings[oldLength] == null;
+         newBindings[oldLength] = newBinding;
+      }
+      while (!bindings.compareAndSet(oldBindings, newBindings));
+      return true;
+   }
+}