You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/10/29 12:37:25 UTC

[activemq-artemis] branch main updated: ARTEMIS-1925 - ensure OFF_WITH_REDISTRIBUTION behaves like OFF for initial routing

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 2167ac2  ARTEMIS-1925 - ensure OFF_WITH_REDISTRIBUTION behaves like OFF for initial routing
2167ac2 is described below

commit 2167ac2e30be04208da46b93af362a061f0b88c9
Author: gtully <ga...@gmail.com>
AuthorDate: Fri Oct 29 09:03:58 2021 +0100

    ARTEMIS-1925 - ensure OFF_WITH_REDISTRIBUTION behaves like OFF for initial routing
---
 .../artemis/core/postoffice/impl/BindingsImpl.java |  2 +-
 .../cluster/impl/RemoteQueueBindingImpl.java       |  2 +-
 .../core/postoffice/impl/BindingsImplTest.java     | 99 +++++++++++++++++++---
 .../cluster/impl/RemoteQueueBindImplTest.java      | 17 ++++
 4 files changed, 105 insertions(+), 15 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index d7420d3..e193296 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -408,7 +408,7 @@ public final class BindingsImpl implements Bindings {
    private static boolean matchBinding(final Message message,
                                        final Binding binding,
                                        final MessageLoadBalancingType loadBalancingType) {
-      if (loadBalancingType.equals(MessageLoadBalancingType.OFF) && binding instanceof RemoteQueueBinding) {
+      if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) && binding instanceof RemoteQueueBinding) {
          return false;
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index e87ded4..8a2f88c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -154,7 +154,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
 
    @Override
    public synchronized boolean isHighAcceptPriority(final Message message) {
-      if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
+      if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) {
          return false;
       }
 
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 72eb506..2b00dad 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -35,6 +36,8 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -45,15 +48,39 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Test;
 
 public class BindingsImplTest extends ActiveMQTestBase {
-   // Constants -----------------------------------------------------
 
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
+   @Test
+   public void testGetNextBindingWithLoadBalancingOnDemand() throws Exception {
+      final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
+      fake.filter = null;  // such that it wil match all messages
+      fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND;
+      final Bindings bind = new BindingsImpl(null, null);
+      bind.addBinding(fake);
+      bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
+      assertEquals(1, fake.routedCount.get());
+   }
 
-   // Constructors --------------------------------------------------
+   @Test
+   public void testGetNextBindingWithLoadBalancingOff() throws Exception {
+      final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
+      fake.filter = null;  // such that it wil match all messages
+      fake.messageLoadBalancingType = MessageLoadBalancingType.OFF;
+      final Bindings bind = new BindingsImpl(null, null);
+      bind.addBinding(fake);
+      bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
+      assertEquals(0, fake.routedCount.get());
+   }
 
-   // Public --------------------------------------------------------
+   @Test
+   public void testGetNextBindingWithLoadBalancingOffWithRedistribution() throws Exception {
+      final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
+      fake.filter = null;  // such that it wil match all messages
+      fake.messageLoadBalancingType = MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
+      final Bindings bind = new BindingsImpl(null, null);
+      bind.addBinding(fake);
+      bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
+      assertEquals(0, fake.routedCount.get());
+   }
 
    @Test
    public void testRemoveWhileRouting() throws Exception {
@@ -299,8 +326,10 @@ public class BindingsImplTest extends ActiveMQTestBase {
 
    }
 
-   private final class FakeBinding implements Binding {
+   private class FakeBinding implements Binding {
 
+      Filter filter = new FakeFilter();
+      AtomicInteger routedCount = new AtomicInteger();
       @Override
       public void close() throws Exception {
 
@@ -354,7 +383,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
        */
       @Override
       public Filter getFilter() {
-         return new FakeFilter();
+         return filter;
       }
 
       @Override
@@ -399,7 +428,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
 
       @Override
       public void route(final Message message, final RoutingContext context) throws Exception {
-
+         routedCount.incrementAndGet();
       }
 
       /* (non-Javadoc)
@@ -422,12 +451,56 @@ public class BindingsImplTest extends ActiveMQTestBase {
 
    }
 
-   // Package protected ---------------------------------------------
+   private final class FakeRemoteBinding extends FakeBinding implements RemoteQueueBinding  {
+      MessageLoadBalancingType messageLoadBalancingType;
+      FakeRemoteBinding(SimpleString name) {
+         super(name);
+      }
 
-   // Protected -----------------------------------------------------
+      @Override
+      public boolean isLocal() {
+         return false;
+      }
 
-   // Private -------------------------------------------------------
+      @Override
+      public int consumerCount() {
+         return 0;
+      }
 
-   // Inner classes -------------------------------------------------
+      @Override
+      public Queue getQueue() {
+         return null;
+      }
 
+      @Override
+      public void addConsumer(SimpleString filterString) throws Exception {
+
+      }
+
+      @Override
+      public void removeConsumer(SimpleString filterString) throws Exception {
+      }
+
+      @Override
+      public void reset() {
+      }
+
+      @Override
+      public void disconnect() {
+      }
+
+      @Override
+      public void connect() {
+      }
+
+      @Override
+      public long getRemoteQueueID() {
+         return 0;
+      }
+
+      @Override
+      public MessageLoadBalancingType getMessageLoadBalancingType() {
+         return messageLoadBalancingType;
+      }
+   }
 }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
index 1aee0fe..82f5239 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
@@ -81,4 +81,21 @@ public class RemoteQueueBindImplTest extends ActiveMQTestBase {
       testAddRemoveConsumerWithFilter(i -> null, 1, 0);
    }
 
+   @Test
+   public void testIsHighAcceptPriority() throws Exception {
+      final long id = RandomUtil.randomLong();
+      final SimpleString address = RandomUtil.randomSimpleString();
+      final SimpleString uniqueName = RandomUtil.randomSimpleString();
+      final SimpleString routingName = RandomUtil.randomSimpleString();
+      final Long remoteQueueID = RandomUtil.randomLong();
+      final SimpleString filterString = new SimpleString("A>B");
+      final Queue storeAndForwardQueue = new FakeQueue(null);
+      final SimpleString bridgeName = RandomUtil.randomSimpleString();
+      final int distance = 0;
+      RemoteQueueBindingImpl bindingOff = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.OFF);
+      assertFalse(bindingOff.isHighAcceptPriority(null));
+
+      RemoteQueueBindingImpl bindingOffWithRedistribution = new RemoteQueueBindingImpl(id, address, uniqueName, routingName, remoteQueueID, filterString, storeAndForwardQueue, bridgeName, distance, MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
+      assertFalse(bindingOffWithRedistribution.isHighAcceptPriority(null));
+   }
 }