You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/30 12:45:13 UTC
[activemq-artemis] branch master updated: ARTEMIS-2937 Implementing
skip create and skip delete on Mirror Source
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new dff2ed3 ARTEMIS-2937 Implementing skip create and skip delete on Mirror Source
dff2ed3 is described below
commit dff2ed36383f1e52304eff2705bf43e01e3c3f10
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Oct 30 08:38:20 2020 -0400
ARTEMIS-2937 Implementing skip create and skip delete on Mirror Source
---
.../amqp/connect/AMQPBrokerConnection.java | 2 +-
.../connect/mirror/AMQPMirrorControllerSource.java | 30 +++++++++----
.../integration/amqp/connect/AMQPReplicaTest.java | 50 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 10 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index fadf64f..5562ac7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -348,7 +348,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
throw new IllegalAccessException("Cannot start replica");
}
- AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(snfQueue, server, replicaConfig.isMessageAcknowledgements());
+ AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(snfQueue, server, replicaConfig.isMessageAcknowledgements(), replicaConfig.isQueueCreation(), replicaConfig.isQueueRemoval());
server.scanAddresses(newPartition);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 645b386..41f0c40 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -67,6 +67,8 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
final Queue snfQueue;
final ActiveMQServer server;
final boolean acks;
+ final boolean addQueues;
+ final boolean deleteQueues;
boolean started;
@@ -83,10 +85,12 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
return started;
}
- public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks) {
+ public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks, boolean addQueues, boolean deleteQueues) {
this.snfQueue = snfQueue;
this.server = server;
this.acks = acks;
+ this.addQueues = addQueues;
+ this.deleteQueues = deleteQueues;
}
@Override
@@ -103,26 +107,34 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
@Override
public void addAddress(AddressInfo addressInfo) throws Exception {
- Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, addressInfo.toJSON());
- route(server, message);
+ if (addQueues) {
+ Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, addressInfo.toJSON());
+ route(server, message);
+ }
}
@Override
public void deleteAddress(AddressInfo addressInfo) throws Exception {
- Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, addressInfo.toJSON());
- route(server, message);
+ if (deleteQueues) {
+ Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, addressInfo.toJSON());
+ route(server, message);
+ }
}
@Override
public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
- Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, queueConfiguration.toJSON());
- route(server, message);
+ if (addQueues) {
+ Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, queueConfiguration.toJSON());
+ route(server, message);
+ }
}
@Override
public void deleteQueue(SimpleString address, SimpleString queue) throws Exception {
- Message message = createMessage(address, queue, DELETE_QUEUE, queue.toString());
- route(server, message);
+ if (deleteQueues) {
+ Message message = createMessage(address, queue, DELETE_QUEUE, queue.toString());
+ route(server, message);
+ }
}
@Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index eb5b202..0a668ae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
@@ -114,6 +115,55 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
}
@Test
+ public void testDoNotSendDelete() throws Exception {
+ testDoNotSendStuff(false);
+ }
+
+ @Test
+ public void testDoNotSendCreate() throws Exception {
+ testDoNotSendStuff(true);
+ }
+
+ private void testDoNotSendStuff(boolean sendCreate) throws Exception {
+ boolean ignoreCreate = false;
+ server.start();
+
+ final SimpleString ADDRESS_NAME = SimpleString.toSimpleString("address");
+
+ server_2 = createServer(AMQP_PORT_2, false);
+
+ AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
+ AMQPMirrorBrokerConnectionElement mirror = new AMQPMirrorBrokerConnectionElement();
+ if (ignoreCreate) {
+ mirror.setQueueCreation(false);
+ } else {
+ mirror.setQueueCreation(true);
+ mirror.setQueueRemoval(false);
+ }
+ amqpConnection.addElement(mirror);
+ server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+ server_2.start();
+ Wait.assertTrue(server::isActive);
+
+ server_2.addAddressInfo(new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.ANYCAST));
+ server_2.createQueue(new QueueConfiguration(ADDRESS_NAME).setDurable(true).setAddress(ADDRESS_NAME));
+
+ if (!ignoreCreate) {
+ Wait.assertTrue(() -> server.locateQueue(ADDRESS_NAME) != null);
+ Wait.assertTrue(() -> server.getAddressInfo(ADDRESS_NAME) != null);
+ }
+
+ if (ignoreCreate) {
+ Thread.sleep(500); // things are asynchronous, I need to wait some time to make sure things are transferred over
+ Assert.assertTrue(server.locateQueue(ADDRESS_NAME) == null);
+ Assert.assertTrue(server.getAddressInfo(ADDRESS_NAME) == null);
+ }
+ server_2.stop();
+ server.stop();
+ }
+
+ @Test
public void testReplicaCatchupOnQueueCreatesAndDeletes() throws Exception {
server.start();
server.setIdentity("Server1");