You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/15 02:10:36 UTC

[2/5] activemq-artemis git commit: ARTEMIS-789 AMQP tests for routing semantics

ARTEMIS-789 AMQP tests for routing semantics


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3af1e5c7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3af1e5c7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3af1e5c7

Branch: refs/heads/master
Commit: 3af1e5c734df3ac4d19b5e173f55a2a99fa33e8d
Parents: c18ee83
Author: jbertram <jb...@apache.org>
Authored: Wed Dec 14 09:30:43 2016 -0600
Committer: jbertram <jb...@apache.org>
Committed: Wed Dec 14 15:12:57 2016 -0600

----------------------------------------------------------------------
 .../artemis/tests/util/ActiveMQTestBase.java    |  4 +-
 .../integration/amqp/AmqpClientTestSupport.java |  9 +++
 .../integration/amqp/AmqpSendReceiveTest.java   | 73 ++++++++++++++++++--
 3 files changed, 78 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3af1e5c7/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 7f01767..e6d68b1 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -418,10 +418,10 @@ public abstract class ActiveMQTestBase extends Assert {
    }
 
    protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
-      ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID)));
+      ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID), "invm"));
 
       if (netty) {
-         configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY));
+         configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>()));
       }
 
       return configuration;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3af1e5c7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 0d5c874..fde38fe 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -19,8 +19,10 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 
 import java.net.URI;
 import java.util.LinkedList;
+import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
@@ -105,6 +107,13 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
       serverConfig.addAddressConfiguration(address);
       serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
       serverConfig.setSecurityEnabled(false);
+      Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
+      for (TransportConfiguration tc : acceptors) {
+         if (tc.getName().equals("netty")) {
+            tc.getExtraParams().put("anycastPrefix", "anycast://");
+            tc.getExtraParams().put("multicastPrefix", "multicast://");
+         }
+      }
       serverManager.start();
       server.start();
       return server;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3af1e5c7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index b817834..e102c77 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -16,11 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
-import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
-import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
-import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
-
+import javax.jms.JMSException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -30,7 +26,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -48,7 +47,10 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.JMSException;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
 
 /**
  * Test basic send and receive scenarios using only AMQP sender and receiver links.
@@ -178,6 +180,65 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
+   public void testAnycastMessageRoutingExclusivity() throws Exception {
+      final String addressA = "addressA";
+      final String queueA = "queueA";
+      final String queueB = "queueB";
+      final String queueC = "queueC";
+
+      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+      sendMessages("anycast://" + addressA, 1);
+
+      assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+      assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
+   }
+
+   @Test
+   public void testMulticastMessageRoutingExclusivity() throws Exception {
+      final String addressA = "addressA";
+      final String queueA = "queueA";
+      final String queueB = "queueB";
+      final String queueC = "queueC";
+
+      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+      sendMessages("multicast://" + addressA, 1);
+
+      assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
+      assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+   }
+
+   @Test
+   public void testAmbiguousMessageRouting() throws Exception {
+      final String addressA = "addressA";
+      final String queueA = "queueA";
+      final String queueB = "queueB";
+      final String queueC = "queueC";
+      final String queueD = "queueD";
+
+      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
+      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString());
+
+      sendMessages(addressA, 1);
+
+      assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+      assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
+   }
+
+   @Test(timeout = 60000)
    public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
       int MSG_COUNT = 4;
       sendMessages(getTestName(), MSG_COUNT);