You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/10/29 12:37:25 UTC
[activemq-artemis] branch main updated: ARTEMIS-1925 - ensure
OFF_WITH_REDISTRIBUTION behaves like OFF for initial routing
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 2167ac2 ARTEMIS-1925 - ensure OFF_WITH_REDISTRIBUTION behaves like OFF for initial routing
2167ac2 is described below
commit 2167ac2e30be04208da46b93af362a061f0b88c9
Author: gtully <ga...@gmail.com>
AuthorDate: Fri Oct 29 09:03:58 2021 +0100
ARTEMIS-1925 - ensure OFF_WITH_REDISTRIBUTION behaves like OFF for initial routing
---
.../artemis/core/postoffice/impl/BindingsImpl.java | 2 +-
.../cluster/impl/RemoteQueueBindingImpl.java | 2 +-
.../core/postoffice/impl/BindingsImplTest.java | 99 +++++++++++++++++++---
.../cluster/impl/RemoteQueueBindImplTest.java | 17 ++++
4 files changed, 105 insertions(+), 15 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index d7420d3..e193296 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -408,7 +408,7 @@ public final class BindingsImpl implements Bindings {
private static boolean matchBinding(final Message message,
final Binding binding,
final MessageLoadBalancingType loadBalancingType) {
- if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
+ if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) && binding instanceof RemoteQueueBinding) {
return false;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index e87ded4..8a2f88c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -154,7 +154,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
@Override
public synchronized boolean isHighAcceptPriority(final Message message) {
- if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
+ if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) {
return false;
}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 72eb506..2b00dad 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
@@ -35,6 +36,8 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -45,15 +48,39 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
public class BindingsImplTest extends ActiveMQTestBase {
- // Constants -----------------------------------------------------
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
+ @Test
+ public void testGetNextBindingWithLoadBalancingOnDemand() throws Exception {
+ final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
+ fake.filter = null; // such that it wil match all messages
+ fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND;
+ final Bindings bind = new BindingsImpl(null, null);
+ bind.addBinding(fake);
+ bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
+ assertEquals(1, fake.routedCount.get());
+ }
- // Constructors --------------------------------------------------
+ @Test
+ public void testGetNextBindingWithLoadBalancingOff() throws Exception {
+ final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
+ fake.filter = null; // such that it wil match all messages
+ fake.messageLoadBalancingType = MessageLoadBalancingType.OFF;
+ final Bindings bind = new BindingsImpl(null, null);
+ bind.addBinding(fake);
+ bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
+ assertEquals(0, fake.routedCount.get());
+ }
- // Public --------------------------------------------------------
+ @Test
+ public void testGetNextBindingWithLoadBalancingOffWithRedistribution() throws Exception {
+ final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
+ fake.filter = null; // such that it wil match all messages
+ fake.messageLoadBalancingType = MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
+ final Bindings bind = new BindingsImpl(null, null);
+ bind.addBinding(fake);
+ bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
+ assertEquals(0, fake.routedCount.get());
+ }
@Test
public void testRemoveWhileRouting() throws Exception {
@@ -299,8 +326,10 @@ public class BindingsImplTest extends ActiveMQTestBase {
}
- private final class FakeBinding implements Binding {
+ private class FakeBinding implements Binding {
+ Filter filter = new FakeFilter();
+ AtomicInteger routedCount = new AtomicInteger();
@Override
public void close() throws Exception {
@@ -354,7 +383,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
*/
@Override
public Filter getFilter() {
- return new FakeFilter();
+ return filter;
}
@Override
@@ -399,7 +428,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
@Override
public void route(final Message message, final RoutingContext context) throws Exception {
-
+ routedCount.incrementAndGet();
}
/* (non-Javadoc)
@@ -422,12 +451,56 @@ public class BindingsImplTest extends ActiveMQTestBase {
}
- // Package protected ---------------------------------------------
+ private final class FakeRemoteBinding extends FakeBinding implements RemoteQueueBinding {
+ MessageLoadBalancingType messageLoadBalancingType;
+ FakeRemoteBinding(SimpleString name) {
+ super(name);
+ }
- // Protected -----------------------------------------------------
+ @Override
+ public boolean isLocal() {
+ return false;
+ }
- // Private -------------------------------------------------------
+ @Override
+ public int consumerCount() {
+ return 0;
+ }
- // Inner classes -------------------------------------------------
+ @Override
+ public Queue getQueue() {
+ return null;
+ }
+ @Override
+ public void addConsumer(SimpleString filterString) throws Exception {
+
+ }
+
+ @Override
+ public void removeConsumer(SimpleString filterString) throws Exception {
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void disconnect() {
+ }
+
+ @Override
+ public void connect() {
+ }
+
+ @Override
+ public long getRemoteQueueID() {
+ return 0;
+ }
+
+ @Override
+ public MessageLoadBalancingType getMessageLoadBalancingType() {
+ return messageLoadBalancingType;
+ }
+ }
}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
index 1aee0fe..82f5239 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
@@ -81,4 +81,21 @@ public class RemoteQueueBindImplTest extends ActiveMQTestBase {
testAddRemoveConsumerWithFilter(i -> null, 1, 0);
}
+ @Test
+ public void testIsHighAcceptPriority() throws Exception {
+ final long id = RandomUtil.randomLong();
+ final SimpleString address = RandomUtil.randomSimpleString();
+ final SimpleString uniqueName = RandomUtil.randomSimpleString();
+ final SimpleString routingName = RandomUtil.randomSimpleString();
+ final Long remoteQueueID = RandomUtil.randomLong();
+ final SimpleString filterString = new SimpleString("A>B");
+ final Queue storeAndForwardQueue = new FakeQueue(null);
+ final SimpleString bridgeName = RandomUtil.randomSimpleString();
+ final int distance = 0;
+ RemoteQueueBindingImpl bindingOff = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.OFF);
+ assertFalse(bindingOff.isHighAcceptPriority(null));
+
+ RemoteQueueBindingImpl bindingOffWithRedistribution = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
+ assertFalse(bindingOffWithRedistribution.isHighAcceptPriority(null));
+ }
}