You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/10/13 15:36:54 UTC
[activemq-artemis] branch main updated: ARTEMIS-4035 all consumers of federated queue drop if only one consumer drops
This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 0ab098e456 ARTEMIS-4035 all consumers of federated queue drop if only one consumer drops
new a7bbe3c1fb This closes #4249
0ab098e456 is described below
commit 0ab098e4561a335d3db5bc6b484437918d316b05
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Mon Oct 10 23:25:14 2022 -0500
ARTEMIS-4035 all consumers of federated queue drop if only one consumer drops
---
.../federation/FederationStreamConfiguration.java | 6 +++
.../federation/FederatedQueueConsumerImpl.java | 3 +-
.../integration/federation/FederatedQueueTest.java | 54 ++++++++++++++++++++++
3 files changed, 61 insertions(+), 2 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationStreamConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationStreamConfiguration.java
index 0f993b357d..937e993b79 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationStreamConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationStreamConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config.federation;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -98,4 +99,9 @@ public abstract class FederationStreamConfiguration <T extends FederationStreamC
policyRefs.add(buffer.readString());
}
}
+
+ public T setStaticConnectors(List<String> connectors) {
+ connectionConfiguration.setStaticConnectors(connectors);
+ return (T) this;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index 3b2c78a140..cd8093a2cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -167,8 +167,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
clientConsumer = null;
clientSession = null;
- if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() ||
- clientSessionFactory.numSessions() == 0)) {
+ if (clientSessionFactory != null && clientSessionFactory.numSessions() == 0 && !upstream.getConnection().isSharedConnection()) {
clientSessionFactory.close();
clientSessionFactory = null;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
index 6acbc775e8..ff501b29be 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
@@ -25,7 +25,10 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.util.Collections;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
@@ -80,6 +83,57 @@ public class FederatedQueueTest extends FederatedTestBase {
testFederatedQueueRemoteConsume(queueName);
}
+ @Test
+ public void testMultipleFederatedQueueRemoteConsumersUpstream() throws Exception {
+ String connector = "server1";
+
+ getServer(0).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
+ getServer(1).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
+
+ getServer(1).createQueue(new QueueConfiguration("Test.Q.1").setRoutingType(RoutingType.ANYCAST));
+ getServer(1).createQueue(new QueueConfiguration("Test.Q.2").setRoutingType(RoutingType.ANYCAST));
+
+ getServer(0).getConfiguration().getFederationConfigurations().add(new FederationConfiguration()
+ .setName("default")
+ .addFederationPolicy(new FederationQueuePolicyConfiguration()
+ .setName("myQueuePolicy")
+ .addInclude(new FederationQueuePolicyConfiguration.Matcher()
+ .setQueueMatch("#")
+ .setAddressMatch("Test.#")))
+ .addUpstreamConfiguration(new FederationUpstreamConfiguration()
+ .setName("server1-upstream")
+ .addPolicyRef("myQueuePolicy")
+ .setStaticConnectors(Collections.singletonList(connector))));
+ getServer(0).getFederationManager().deploy();
+
+ ConnectionFactory cf1 = getCF(0);
+ ConnectionFactory cf2 = getCF(0);
+ ConnectionFactory cf3 = getCF(1);
+ try (Connection consumer1Connection = cf1.createConnection(); Connection consumer2Connection = cf2.createConnection(); Connection producerConnection = cf3.createConnection()) {
+ consumer1Connection.start();
+ Session session1 = consumer1Connection.createSession();
+ Queue queue1 = session1.createQueue("Test.Q.1");
+ MessageConsumer consumer1 = session1.createConsumer(queue1);
+
+ consumer2Connection.start();
+ Session session2 = consumer2Connection.createSession();
+ Queue queue2 = session2.createQueue("Test.Q.2");
+ MessageConsumer consumer2 = session2.createConsumer(queue2);
+
+ Session session3 = producerConnection.createSession();
+ MessageProducer producer = session3.createProducer(queue2);
+ producer.send(session3.createTextMessage("hello"));
+
+ assertNotNull(consumer2.receive(1000));
+
+ consumer1Connection.close();
+
+ producer.send(session3.createTextMessage("hello"));
+
+ assertNotNull(consumer2.receive(1000));
+ }
+ }
+
@Test
public void testFederatedQueueRemoteConsumeUpstreamPriorityAdjustment() throws Exception {
String queueName = getName();