You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/30 12:45:13 UTC

[activemq-artemis] branch master updated: ARTEMIS-2937 Implementing skip create and skip delete on Mirror Source

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new dff2ed3  ARTEMIS-2937 Implementing skip create and skip delete on Mirror Source
dff2ed3 is described below

commit dff2ed36383f1e52304eff2705bf43e01e3c3f10
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Oct 30 08:38:20 2020 -0400

    ARTEMIS-2937 Implementing skip create and skip delete on Mirror Source
---
 .../amqp/connect/AMQPBrokerConnection.java         |  2 +-
 .../connect/mirror/AMQPMirrorControllerSource.java | 30 +++++++++----
 .../integration/amqp/connect/AMQPReplicaTest.java  | 50 ++++++++++++++++++++++
 3 files changed, 72 insertions(+), 10 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index fadf64f..5562ac7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -348,7 +348,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
          throw new IllegalAccessException("Cannot start replica");
       }
 
-      AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(snfQueue, server, replicaConfig.isMessageAcknowledgements());
+      AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(snfQueue, server, replicaConfig.isMessageAcknowledgements(), replicaConfig.isQueueCreation(), replicaConfig.isQueueRemoval());
 
       server.scanAddresses(newPartition);
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 645b386..41f0c40 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -67,6 +67,8 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
    final Queue snfQueue;
    final ActiveMQServer server;
    final boolean acks;
+   final boolean addQueues;
+   final boolean deleteQueues;
 
    boolean started;
 
@@ -83,10 +85,12 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
       return started;
    }
 
-   public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks) {
+   public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks, boolean addQueues, boolean deleteQueues) {
       this.snfQueue = snfQueue;
       this.server = server;
       this.acks = acks;
+      this.addQueues = addQueues;
+      this.deleteQueues = deleteQueues;
    }
 
    @Override
@@ -103,26 +107,34 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
 
    @Override
    public void addAddress(AddressInfo addressInfo) throws Exception {
-      Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, addressInfo.toJSON());
-      route(server, message);
+      if (addQueues) {
+         Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, addressInfo.toJSON());
+         route(server, message);
+      }
    }
 
    @Override
    public void deleteAddress(AddressInfo addressInfo) throws Exception {
-      Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, addressInfo.toJSON());
-      route(server, message);
+      if (deleteQueues) {
+         Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, addressInfo.toJSON());
+         route(server, message);
+      }
    }
 
    @Override
    public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
-      Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, queueConfiguration.toJSON());
-      route(server, message);
+      if (addQueues) {
+         Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, queueConfiguration.toJSON());
+         route(server, message);
+      }
    }
 
    @Override
    public void deleteQueue(SimpleString address, SimpleString queue) throws Exception {
-      Message message = createMessage(address, queue, DELETE_QUEUE, queue.toString());
-      route(server, message);
+      if (deleteQueues) {
+         Message message = createMessage(address, queue, DELETE_QUEUE, queue.toString());
+         route(server, message);
+      }
    }
 
    @Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index eb5b202..0a668ae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
 import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
 import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
@@ -114,6 +115,55 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
    }
 
    @Test
+   public void testDoNotSendDelete() throws Exception {
+      testDoNotSendStuff(false);
+   }
+
+   @Test
+   public void testDoNotSendCreate() throws Exception {
+      testDoNotSendStuff(true);
+   }
+
+   private void testDoNotSendStuff(boolean sendCreate) throws Exception {
+      boolean ignoreCreate = false;
+      server.start();
+
+      final SimpleString ADDRESS_NAME = SimpleString.toSimpleString("address");
+
+      server_2 = createServer(AMQP_PORT_2, false);
+
+      AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
+      AMQPMirrorBrokerConnectionElement mirror = new AMQPMirrorBrokerConnectionElement();
+      if (ignoreCreate) {
+         mirror.setQueueCreation(false);
+      } else {
+         mirror.setQueueCreation(true);
+         mirror.setQueueRemoval(false);
+      }
+      amqpConnection.addElement(mirror);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      server_2.start();
+      Wait.assertTrue(server::isActive);
+
+      server_2.addAddressInfo(new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.ANYCAST));
+      server_2.createQueue(new QueueConfiguration(ADDRESS_NAME).setDurable(true).setAddress(ADDRESS_NAME));
+
+      if (!ignoreCreate) {
+         Wait.assertTrue(() -> server.locateQueue(ADDRESS_NAME) != null);
+         Wait.assertTrue(() -> server.getAddressInfo(ADDRESS_NAME) != null);
+      }
+
+      if (ignoreCreate) {
+         Thread.sleep(500); // things are asynchronous, I need to wait some time to make sure things are transferred over
+         Assert.assertTrue(server.locateQueue(ADDRESS_NAME) == null);
+         Assert.assertTrue(server.getAddressInfo(ADDRESS_NAME) == null);
+      }
+      server_2.stop();
+      server.stop();
+   }
+
+   @Test
    public void testReplicaCatchupOnQueueCreatesAndDeletes() throws Exception {
       server.start();
       server.setIdentity("Server1");