You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/04/05 14:41:27 UTC
[activemq-artemis] 02/03: ARTEMIS-3219 Improve FQQN message routing
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 08ec7c67c870e9f8996bd7505e81689b0300b85d
Author: franz1981 <ni...@gmail.com>
AuthorDate: Fri Apr 2 09:50:43 2021 +0200
ARTEMIS-3219 Improve FQQN message routing
---
.../artemis/core/postoffice/impl/BindingsImpl.java | 383 ++++++++-------------
.../core/postoffice/impl/CopyOnWriteBindings.java | 270 +++++++++++++++
2 files changed, 414 insertions(+), 239 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 5c13c6c..4cc45b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -25,12 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -54,9 +53,7 @@ public final class BindingsImpl implements Bindings {
// This is public as we use on test assertions
public static final int MAX_GROUP_RETRY = 10;
- private final ConcurrentMap<SimpleString, List<Binding>> routingNameBindingMap = new ConcurrentHashMap<>();
-
- private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<>();
+ private final CopyOnWriteBindings routingNameBindingMap = new CopyOnWriteBindings();
private final Map<Long, Binding> bindingsIdMap = new ConcurrentHashMap<>();
@@ -113,6 +110,8 @@ public final class BindingsImpl implements Bindings {
}
}
+
+
@Override
public void addBinding(final Binding binding) {
try {
@@ -122,30 +121,14 @@ public final class BindingsImpl implements Bindings {
if (binding.isExclusive()) {
exclusiveBindings.add(binding);
} else {
- SimpleString routingName = binding.getRoutingName();
-
- List<Binding> bindings = routingNameBindingMap.get(routingName);
-
- if (bindings == null) {
- bindings = new CopyOnWriteArrayList<>();
-
- List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
-
- if (oldBindings != null) {
- bindings = oldBindings;
- }
- }
-
- if (!bindings.contains(binding)) {
- bindings.add(binding);
- }
+ routingNameBindingMap.addBindingIfAbsent(binding);
}
bindingsIdMap.put(binding.getID(), binding);
bindingsNameMap.put(binding.getUniqueName(), binding);
if (binding instanceof RemoteQueueBinding) {
- setMessageLoadBalancingType(((RemoteQueueBinding)binding).getMessageLoadBalancingType());
+ setMessageLoadBalancingType(((RemoteQueueBinding) binding).getMessageLoadBalancingType());
}
if (logger.isTraceEnabled()) {
logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
@@ -174,17 +157,7 @@ public final class BindingsImpl implements Bindings {
if (binding.isExclusive()) {
exclusiveBindings.remove(binding);
} else {
- SimpleString routingName = binding.getRoutingName();
-
- List<Binding> bindings = routingNameBindingMap.get(routingName);
-
- if (bindings != null) {
- bindings.remove(binding);
-
- if (bindings.isEmpty()) {
- routingNameBindingMap.remove(routingName);
- }
- }
+ routingNameBindingMap.removeBinding(binding);
}
bindingsIdMap.remove(binding.getID());
@@ -208,78 +181,56 @@ public final class BindingsImpl implements Bindings {
public boolean redistribute(final Message message,
final Queue originatingQueue,
final RoutingContext context) throws Exception {
- if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
+ final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
+ if (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || loadBalancingType.equals(MessageLoadBalancingType.OFF)) {
return false;
}
if (logger.isTraceEnabled()) {
- logger.trace("Redistributing message " + message);
+ logger.tracef("Redistributing message %s", message);
}
- SimpleString routingName = originatingQueue.getName();
+ final SimpleString routingName = originatingQueue.getName();
- List<Binding> bindings = routingNameBindingMap.get(routingName);
+ final Pair<Binding[], CopyOnWriteBindings.BindingIndex> bindingsAndPosition = routingNameBindingMap.getBindings(routingName);
- if (bindings == null) {
+ if (bindingsAndPosition == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
return false;
}
- Integer ipos = routingNamePositions.get(routingName);
-
- int pos = ipos != null ? ipos.intValue() : 0;
-
- int length = bindings.size();
-
- int startPos = pos;
+ final Binding[] bindings = bindingsAndPosition.getA();
- Binding theBinding = null;
+ final CopyOnWriteBindings.BindingIndex bindingIndex = bindingsAndPosition.getB();
- // TODO - combine this with similar logic in route()
- while (true) {
- Binding binding;
- try {
- binding = bindings.get(pos);
- } catch (IndexOutOfBoundsException e) {
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty()) {
- pos = 0;
- startPos = 0;
- length = bindings.size();
-
- continue;
- } else {
- break;
- }
- }
+ assert bindings.length > 0;
- pos = incrementPos(pos, length);
+ final int bindingsCount = bindings.length;
- Filter filter = binding.getFilter();
+ int nextPosition = bindingIndex.getIndex();
- boolean highPrior = binding.isHighAcceptPriority(message);
+ if (nextPosition >= bindingsCount) {
+ nextPosition = 0;
+ }
+ Binding nextBinding = null;
+ for (int i = 0; i < bindingsCount; i++) {
+ final Binding binding = bindings[nextPosition];
+ nextPosition = moveNextPosition(nextPosition, bindingsCount);
+ final Filter filter = binding.getFilter();
+ final boolean highPrior = binding.isHighAcceptPriority(message);
if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) {
- theBinding = binding;
-
- break;
- }
-
- if (pos == startPos) {
+ nextBinding = binding;
break;
}
}
-
- routingNamePositions.put(routingName, pos);
-
- if (theBinding != null) {
- theBinding.route(message, context);
-
- return true;
- } else {
+ if (nextBinding == null) {
return false;
}
+ bindingIndex.setIndex(nextPosition);
+ nextBinding.route(message, context);
+ return true;
}
@Override
@@ -290,8 +241,8 @@ public final class BindingsImpl implements Bindings {
private void route(final Message message,
final RoutingContext context,
final boolean groupRouting) throws Exception {
- int currentVersion = version.get();
- boolean reusableContext = context.isReusable(message, currentVersion);
+ final int currentVersion = version.get();
+ final boolean reusableContext = context.isReusable(message, currentVersion);
if (!reusableContext) {
context.clear();
@@ -300,54 +251,33 @@ public final class BindingsImpl implements Bindings {
/* This is a special treatment for scaled-down messages involving SnF queues.
* See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
*/
- byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
+ final byte[] ids = message.removeExtraBytesProperty(Message.HDR_SCALEDOWN_TO_IDS);
if (ids != null) {
- ByteBuffer buffer = ByteBuffer.wrap(ids);
- while (buffer.hasRemaining()) {
- long id = buffer.getLong();
- for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
- if (entry.getValue() instanceof RemoteQueueBinding) {
- RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
- if (remoteQueueBinding.getRemoteQueueID() == id) {
- message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
- }
- }
- }
- }
+ handleScaledDownMessage(message, ids);
}
- boolean routed = false;
-
- boolean hasExclusives = false;
-
- for (Binding binding : exclusiveBindings) {
- if (!hasExclusives) {
- context.clear().setReusable(false);
- hasExclusives = true;
- }
-
- if (binding.getFilter() == null || binding.getFilter().match(message)) {
- binding.getBindable().route(message, context);
- routed = true;
- }
+ final boolean routed;
+ // despite the double check can lead to some race, this can save allocating an iterator for an empty set
+ if (!exclusiveBindings.isEmpty()) {
+ routed = routeToExclusiveBindings(message, context);
+ } else {
+ routed = false;
}
if (!routed) {
// Remove the ids now, in order to avoid double check
- ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
-
- // Fetch the groupId now, in order to avoid double checking
- SimpleString groupId = message.getGroupID();
+ final byte[] routeToIds = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
- if (ids != null) {
+ SimpleString groupId;
+ if (routeToIds != null) {
context.clear().setReusable(false);
- routeFromCluster(message, context, ids);
- } else if (groupingHandler != null && groupRouting && groupId != null) {
+ routeFromCluster(message, context, routeToIds);
+ } else if (groupRouting && groupingHandler != null && (groupId = message.getGroupID()) != null) {
context.clear().setReusable(false);
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
} else if (CompositeAddress.isFullyQualified(message.getAddress())) {
context.clear().setReusable(false);
- Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
+ final Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
if (theBinding != null) {
theBinding.route(message, context);
}
@@ -361,35 +291,58 @@ public final class BindingsImpl implements Bindings {
}
}
- private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception {
- if (logger.isTraceEnabled()) {
- logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context);
+ private boolean routeToExclusiveBindings(final Message message, final RoutingContext context) throws Exception {
+ boolean hasExclusives = false;
+ boolean routed = false;
+ for (Binding binding : exclusiveBindings) {
+ if (!hasExclusives) {
+ context.clear().setReusable(false);
+ hasExclusives = true;
+ }
+ final Filter filter = binding.getFilter();
+ if (filter == null || filter.match(message)) {
+ binding.getBindable().route(message, context);
+ routed = true;
+ }
}
+ return routed;
+ }
- for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
- SimpleString routingName = entry.getKey();
-
- List<Binding> bindings = entry.getValue();
-
- if (bindings == null) {
- // The value can become null if it's concurrently removed while we're iterating - this is expected
- // ConcurrentHashMap behaviour!
- continue;
+ private void handleScaledDownMessage(final Message message, final byte[] ids) {
+ ByteBuffer buffer = ByteBuffer.wrap(ids);
+ while (buffer.hasRemaining()) {
+ long id = buffer.getLong();
+ for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
+ if (entry.getValue() instanceof RemoteQueueBinding) {
+ RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
+ if (remoteQueueBinding.getRemoteQueueID() == id) {
+ message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
+ }
+ }
}
+ }
+ }
- Binding theBinding = getNextBinding(message, routingName, bindings);
+ private void simpleRouting(final Message message,
+ final RoutingContext context,
+ final int currentVersion) throws Exception {
+ if (logger.isTraceEnabled()) {
+ logger.tracef("Routing message %s on binding=%s current context::$s", message, this, context);
+ }
- if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) {
+ routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
+ final Binding nextBinding = getNextBinding(message, bindings, nextPosition);
+ if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) {
context.setReusable(true, currentVersion);
} else {
// notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it
context.setReusable(false, currentVersion);
}
- if (theBinding != null) {
- theBinding.route(message, context);
+ if (nextBinding != null) {
+ nextBinding.route(message, context);
}
- }
+ });
}
@Override
@@ -406,99 +359,65 @@ public final class BindingsImpl implements Bindings {
* (depending if you are using multi-thread), and not lose messages.
*/
private Binding getNextBinding(final Message message,
- final SimpleString routingName,
- final List<Binding> bindings) {
- Integer ipos = routingNamePositions.get(routingName);
+ final Binding[] bindings,
+ final CopyOnWriteBindings.BindingIndex bindingIndex) {
+ int nextPosition = bindingIndex.getIndex();
- int pos = ipos != null ? ipos : 0;
+ final int bindingsCount = bindings.length;
- int length = bindings.size();
-
- int startPos = pos;
-
- Binding theBinding = null;
+ if (nextPosition >= bindingsCount) {
+ nextPosition = 0;
+ }
+ Binding nextBinding = null;
int lastLowPriorityBinding = -1;
+ // snapshot this, to save loading it on each iteration
+ final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
- while (true) {
- Binding binding;
- try {
- binding = bindings.get(pos);
- } catch (IndexOutOfBoundsException e) {
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty()) {
- pos = 0;
- startPos = 0;
- length = bindings.size();
-
- continue;
- } else {
- break;
- }
- }
-
- if (matchBinding(message, binding)) {
+ for (int i = 0; i < bindingsCount; i++) {
+ final Binding binding = bindings[nextPosition];
+ if (matchBinding(message, binding, loadBalancingType)) {
// bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
// unnecessary overhead)
- if (length == 1 || (binding.isConnected() && (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) {
- theBinding = binding;
-
- pos = incrementPos(pos, length);
-
+ if (bindingsCount == 1 || (binding.isConnected() && (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || binding.isHighAcceptPriority(message)))) {
+ nextBinding = binding;
+ nextPosition = moveNextPosition(nextPosition, bindingsCount);
break;
- } else {
- //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers,
- // the localQueue should always have the priority over the secondary bindings
- if (lastLowPriorityBinding == -1 || messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) {
- lastLowPriorityBinding = pos;
- }
}
- }
-
- pos = incrementPos(pos, length);
-
- if (pos == startPos) {
-
- // if no bindings were found, we will apply a secondary level on the routing logic
- if (lastLowPriorityBinding != -1) {
- try {
- theBinding = bindings.get(lastLowPriorityBinding);
- } catch (IndexOutOfBoundsException e) {
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty()) {
- pos = 0;
-
- lastLowPriorityBinding = -1;
-
- continue;
- } else {
- break;
- }
- }
-
- pos = incrementPos(lastLowPriorityBinding, length);
+ //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers,
+ // the localQueue should always have the priority over the secondary bindings
+ if (lastLowPriorityBinding == -1 || loadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) {
+ lastLowPriorityBinding = nextPosition;
}
- break;
}
+ nextPosition = moveNextPosition(nextPosition, bindingsCount);
}
- if (pos != startPos) {
- routingNamePositions.put(routingName, pos);
+ if (nextBinding == null) {
+ // if no bindings were found, we will apply a secondary level on the routing logic
+ if (lastLowPriorityBinding != -1) {
+ nextBinding = bindings[lastLowPriorityBinding];
+ nextPosition = moveNextPosition(lastLowPriorityBinding, bindingsCount);
+ }
+ }
+ if (nextBinding != null) {
+ bindingIndex.setIndex(nextPosition);
}
- return theBinding;
+ return nextBinding;
}
- private boolean matchBinding(Message message, Binding binding) {
- if (messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
+ private static boolean matchBinding(final Message message,
+ final Binding binding,
+ final MessageLoadBalancingType loadBalancingType) {
+ if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
return false;
}
- Filter filter = binding.getFilter();
+ final Filter filter = binding.getFilter();
if (filter == null || filter.match(message)) {
return true;
- } else {
- return false;
}
+ return false;
}
private void routeUsingStrictOrdering(final Message message,
@@ -506,17 +425,7 @@ public final class BindingsImpl implements Bindings {
final GroupingHandler groupingGroupingHandler,
final SimpleString groupId,
final int tries) throws Exception {
- for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
- SimpleString routingName = entry.getKey();
-
- List<Binding> bindings = entry.getValue();
-
- if (bindings == null) {
- // The value can become null if it's concurrently removed while we're iterating - this is expected
- // ConcurrentHashMap behaviour!
- continue;
- }
-
+ routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
// concat a full group id, this is for when a binding has multiple bindings
// NOTE: In case a dev ever change this rule, QueueImpl::unproposed is using this rule to determine if
// the binding belongs to its Queue before removing it
@@ -527,9 +436,9 @@ public final class BindingsImpl implements Bindings {
if (resp == null) {
// ok let's find the next binding to propose
- Binding theBinding = getNextBinding(message, routingName, bindings);
+ Binding theBinding = getNextBinding(message, bindings, nextPosition);
if (theBinding == null) {
- continue;
+ return;
}
resp = groupingGroupingHandler.propose(new Proposal(fullID, theBinding.getClusterName()));
@@ -554,10 +463,10 @@ public final class BindingsImpl implements Bindings {
routeAndCheckNull(message, context, resp, chosen, groupId, tries);
}
- }
+ });
}
- private Binding locateBinding(SimpleString clusterName, List<Binding> bindings) {
+ private static Binding locateBinding(SimpleString clusterName, Binding[] bindings) {
for (Binding binding : bindings) {
if (binding.getClusterName().equals(clusterName)) {
return binding;
@@ -603,21 +512,13 @@ public final class BindingsImpl implements Bindings {
if (routingNameBindingMap.isEmpty()) {
out.println("\tEMPTY!");
}
- for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
- out.println("\tkey=" + entry.getKey() + ", value(s):");
- for (Binding bind : entry.getValue()) {
+ routingNameBindingMap.forEach((routingName, bindings, nextPosition) -> {
+ out.println("\tkey=" + routingName + ",\tposition=" + nextPosition.getIndex() + "\tvalue(s):");
+ for (Binding bind : bindings) {
out.println("\t\t" + bind);
}
out.println();
- }
-
- out.println("routingNamePositions:");
- if (routingNamePositions.isEmpty()) {
- out.println("\tEMPTY!");
- }
- for (Map.Entry<SimpleString, Integer> entry : routingNamePositions.entrySet()) {
- out.println("\tkey=" + entry.getKey() + ", value=" + entry.getValue());
- }
+ });
out.println();
@@ -679,17 +580,21 @@ public final class BindingsImpl implements Bindings {
}
}
- private int incrementPos(int pos, final int length) {
- pos++;
+ private static int moveNextPosition(int position, final int length) {
+ position++;
- if (pos == length) {
- pos = 0;
+ if (position == length) {
+ position = 0;
}
- return pos;
+ return position;
}
+ /**
+ * debug method: used just for tests!!
+ * @return
+ */
public Map<SimpleString, List<Binding>> getRoutingNameBindingMap() {
- return routingNameBindingMap;
+ return routingNameBindingMap.copyAsMap();
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java
new file mode 100644
index 0000000..fc79da4
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+
+/**
+ * This is a copy-on-write map of {@link Binding} along with the last index set.<br>
+ */
+final class CopyOnWriteBindings {
+
+ public interface BindingIndex {
+
+ /**
+ * Cannot return a negative value and returns {@code 0} if uninitialized.
+ */
+ int getIndex();
+
+ /**
+ * Cannot set a negative value.
+ */
+ void setIndex(int v);
+ }
+
+ private static final class BindingsAndPosition extends AtomicReference<Binding[]> implements BindingIndex {
+
+ private static final AtomicIntegerFieldUpdater<BindingsAndPosition> NEXT_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextPosition");
+
+ public volatile int nextPosition;
+
+ BindingsAndPosition(Binding[] bindings) {
+ super(bindings);
+ NEXT_POSITION_UPDATER.lazySet(this, 0);
+ }
+
+ @Override
+ public int getIndex() {
+ return nextPosition;
+ }
+
+ @Override
+ public void setIndex(int v) {
+ if (v < 0) {
+ throw new IllegalArgumentException("cannot set a negative position");
+ }
+ NEXT_POSITION_UPDATER.lazySet(this, v);
+ }
+ }
+
+ private final ConcurrentHashMap<SimpleString, BindingsAndPosition> map;
+
+ CopyOnWriteBindings() {
+ map = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Add the specified {@code binding}, if not present.
+ */
+ public void addBindingIfAbsent(Binding binding) {
+ Objects.requireNonNull(binding);
+ final SimpleString routingName = binding.getRoutingName();
+ Objects.requireNonNull(routingName);
+ BindingsAndPosition bindings;
+ do {
+ bindings = map.get(routingName);
+ if (bindings == null || bindings.get() == TOMBSTONE_BINDINGS) {
+ final BindingsAndPosition newBindings = new BindingsAndPosition(new Binding[]{binding});
+ bindings = map.compute(routingName, (ignored, bindingsAndPosition) -> {
+ if (bindingsAndPosition == null || bindingsAndPosition.get() == TOMBSTONE_BINDINGS) {
+ return newBindings;
+ }
+ return bindingsAndPosition;
+ });
+ assert bindings != null;
+ if (bindings == newBindings) {
+ return;
+ }
+ }
+ }
+ while (!addBindingIfAbsent(bindings, binding));
+ }
+
+ /**
+ * Remove the specified {@code binding}, if present.
+ */
+ public void removeBinding(Binding binding) {
+ Objects.requireNonNull(binding);
+ final SimpleString routingName = binding.getRoutingName();
+ Objects.requireNonNull(routingName);
+ final BindingsAndPosition bindings = map.get(routingName);
+ if (bindings == null) {
+ return;
+ }
+ final Binding[] newBindings = removeBindingIfPresent(bindings, binding);
+ if (newBindings == TOMBSTONE_BINDINGS) {
+ // GC attempt
+ map.computeIfPresent(routingName, (bindingsRoutingName, existingBindings) -> {
+ if (existingBindings.get() == TOMBSTONE_BINDINGS) {
+ return null;
+ }
+ return existingBindings;
+ });
+ }
+ }
+
+ /**
+ * Returns a snapshot of the bindings, if present and a "lazy" binding index, otherwise {@code null}.<br>
+ * There is no strong commitment on preserving the index value if the related bindings are concurrently modified
+ * or the index itself is concurrently modified.
+ */
+ public Pair<Binding[], BindingIndex> getBindings(SimpleString routingName) {
+ Objects.requireNonNull(routingName);
+ BindingsAndPosition bindings = map.get(routingName);
+ if (bindings == null) {
+ return null;
+ }
+ final Binding[] bindingsSnapshot = bindings.get();
+ if (bindingsSnapshot == TOMBSTONE_BINDINGS) {
+ return null;
+ }
+ assert bindingsSnapshot != null && bindingsSnapshot.length > 0;
+ return new Pair<>(bindingsSnapshot, bindings);
+ }
+
+ @FunctionalInterface
+ public interface BindingsConsumer<T extends Throwable> {
+
+ /**
+ * {@code routingName} cannot be {@code null}.
+ * {@code bindings} cannot be {@code null} or empty.
+ * {@code nextPosition} cannot be null.
+ */
+ void accept(SimpleString routingName, Binding[] bindings, BindingIndex nextPosition) throws T;
+ }
+
+ /**
+ * Iterates through the bindings and its related indexes.<br>
+ */
+ public <T extends Throwable> void forEach(BindingsConsumer<T> bindingsConsumer) throws T {
+ Objects.requireNonNull(bindingsConsumer);
+ if (map.isEmpty()) {
+ return;
+ }
+ for (Map.Entry<SimpleString, BindingsAndPosition> entry : map.entrySet()) {
+ final BindingsAndPosition value = entry.getValue();
+ final Binding[] bindings = value.get();
+ if (bindings == TOMBSTONE_BINDINGS) {
+ continue;
+ }
+ assert bindings != null && bindings.length > 0;
+ bindingsConsumer.accept(entry.getKey(), bindings, value);
+ }
+ }
+
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ public Map<SimpleString, List<Binding>> copyAsMap() {
+ if (map.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ final HashMap<SimpleString, List<Binding>> copy = new HashMap<>(map.size());
+ map.forEach((routingName, bindings) -> {
+ final Binding[] bindingArray = bindings.get();
+ if (bindingArray == TOMBSTONE_BINDINGS) {
+ return;
+ }
+ copy.put(routingName, Arrays.asList(bindingArray));
+ });
+ return copy;
+ }
+
+ private static final Binding[] TOMBSTONE_BINDINGS = new Binding[0];
+
+ private static int indexOfBinding(final Binding[] bindings, final Binding toFind) {
+ for (int i = 0, size = bindings.length; i < size; i++) {
+ final Binding binding = bindings[i];
+ if (binding.equals(toFind)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Remove the binding if present and returns the new bindings, {@code null} otherwise.
+ */
+ private static Binding[] removeBindingIfPresent(final AtomicReference<Binding[]> bindings,
+ final Binding bindingToRemove) {
+ Objects.requireNonNull(bindings);
+ Objects.requireNonNull(bindingToRemove);
+ Binding[] oldBindings;
+ Binding[] newBindings;
+ do {
+ oldBindings = bindings.get();
+ // no need to check vs TOMBSTONE_BINDINGS, because found === -1;
+ final int found = indexOfBinding(oldBindings, bindingToRemove);
+ if (found == -1) {
+ return null;
+ }
+ final int oldBindingsCount = oldBindings.length;
+ if (oldBindingsCount == 1) {
+ newBindings = TOMBSTONE_BINDINGS;
+ } else {
+ final int newBindingsCount = oldBindingsCount - 1;
+ newBindings = new Binding[newBindingsCount];
+ System.arraycopy(oldBindings, 0, newBindings, 0, found);
+ final int remaining = newBindingsCount - found;
+ if (remaining > 0) {
+ System.arraycopy(oldBindings, found + 1, newBindings, found, remaining);
+ }
+ }
+ }
+ while (!bindings.compareAndSet(oldBindings, newBindings));
+ return newBindings;
+ }
+
+ /**
+ * Returns {@code true} if the given binding has been added or already present,
+ * {@code false} if bindings are going to be garbage-collected.
+ */
+ private static boolean addBindingIfAbsent(final AtomicReference<Binding[]> bindings, final Binding newBinding) {
+ Objects.requireNonNull(bindings);
+ Objects.requireNonNull(newBinding);
+ Binding[] oldBindings;
+ Binding[] newBindings;
+ do {
+ oldBindings = bindings.get();
+ if (oldBindings == TOMBSTONE_BINDINGS) {
+ return false;
+ }
+ if (indexOfBinding(oldBindings, newBinding) >= 0) {
+ return true;
+ }
+ final int oldLength = oldBindings.length;
+ newBindings = Arrays.copyOf(oldBindings, oldLength + 1);
+ assert newBindings[oldLength] == null;
+ newBindings[oldLength] = newBinding;
+ }
+ while (!bindings.compareAndSet(oldBindings, newBindings));
+ return true;
+ }
+}