You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/10/13 15:36:54 UTC

[activemq-artemis] branch main updated: ARTEMIS-4035 all consumers of federated queue drop if only one consumer drops

This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ab098e456 ARTEMIS-4035 all consumers of federated queue drop if only one consumer drops
     new a7bbe3c1fb This closes #4249
0ab098e456 is described below

commit 0ab098e4561a335d3db5bc6b484437918d316b05
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Mon Oct 10 23:25:14 2022 -0500

    ARTEMIS-4035 all consumers of federated queue drop if only one consumer drops
---
 .../federation/FederationStreamConfiguration.java  |  6 +++
 .../federation/FederatedQueueConsumerImpl.java     |  3 +-
 .../integration/federation/FederatedQueueTest.java | 54 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 2 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationStreamConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationStreamConfiguration.java
index 0f993b357d..937e993b79 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationStreamConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationStreamConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config.federation;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 
@@ -98,4 +99,9 @@ public abstract class FederationStreamConfiguration <T extends FederationStreamC
          policyRefs.add(buffer.readString());
       }
    }
+
+   public T setStaticConnectors(List<String> connectors) {
+      connectionConfiguration.setStaticConnectors(connectors);
+      return (T) this;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index 3b2c78a140..cd8093a2cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -167,8 +167,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
       clientConsumer = null;
       clientSession = null;
 
-      if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() ||
-          clientSessionFactory.numSessions() == 0)) {
+      if (clientSessionFactory != null && clientSessionFactory.numSessions() == 0 && !upstream.getConnection().isSharedConnection()) {
          clientSessionFactory.close();
          clientSessionFactory = null;
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
index 6acbc775e8..ff501b29be 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
@@ -25,7 +25,10 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.util.Collections;
 
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.FederationConfiguration;
 import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
@@ -80,6 +83,57 @@ public class FederatedQueueTest extends FederatedTestBase {
       testFederatedQueueRemoteConsume(queueName);
    }
 
+   @Test
+   public void testMultipleFederatedQueueRemoteConsumersUpstream() throws Exception {
+      String connector = "server1";
+
+      getServer(0).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
+      getServer(1).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
+
+      getServer(1).createQueue(new QueueConfiguration("Test.Q.1").setRoutingType(RoutingType.ANYCAST));
+      getServer(1).createQueue(new QueueConfiguration("Test.Q.2").setRoutingType(RoutingType.ANYCAST));
+
+      getServer(0).getConfiguration().getFederationConfigurations().add(new FederationConfiguration()
+                                                                           .setName("default")
+                                                                           .addFederationPolicy(new FederationQueuePolicyConfiguration()
+                                                                                                   .setName("myQueuePolicy")
+                                                                                                   .addInclude(new FederationQueuePolicyConfiguration.Matcher()
+                                                                                                                  .setQueueMatch("#")
+                                                                                                                  .setAddressMatch("Test.#")))
+                                                                           .addUpstreamConfiguration(new FederationUpstreamConfiguration()
+                                                                                                        .setName("server1-upstream")
+                                                                                                        .addPolicyRef("myQueuePolicy")
+                                                                                                        .setStaticConnectors(Collections.singletonList(connector))));
+      getServer(0).getFederationManager().deploy();
+
+      ConnectionFactory cf1 = getCF(0);
+      ConnectionFactory cf2 = getCF(0);
+      ConnectionFactory cf3 = getCF(1);
+      try (Connection consumer1Connection = cf1.createConnection(); Connection consumer2Connection = cf2.createConnection(); Connection producerConnection = cf3.createConnection()) {
+         consumer1Connection.start();
+         Session session1 = consumer1Connection.createSession();
+         Queue queue1 = session1.createQueue("Test.Q.1");
+         MessageConsumer consumer1 = session1.createConsumer(queue1);
+
+         consumer2Connection.start();
+         Session session2 = consumer2Connection.createSession();
+         Queue queue2 = session2.createQueue("Test.Q.2");
+         MessageConsumer consumer2 = session2.createConsumer(queue2);
+
+         Session session3 = producerConnection.createSession();
+         MessageProducer producer = session3.createProducer(queue2);
+         producer.send(session3.createTextMessage("hello"));
+
+         assertNotNull(consumer2.receive(1000));
+
+         consumer1Connection.close();
+
+         producer.send(session3.createTextMessage("hello"));
+
+         assertNotNull(consumer2.receive(1000));
+      }
+   }
+
    @Test
    public void testFederatedQueueRemoteConsumeUpstreamPriorityAdjustment() throws Exception {
       String queueName = getName();