You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/15 02:10:36 UTC
[2/5] activemq-artemis git commit: ARTEMIS-789 AMQP tests for routing
semantics
ARTEMIS-789 AMQP tests for routing semantics
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3af1e5c7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3af1e5c7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3af1e5c7
Branch: refs/heads/master
Commit: 3af1e5c734df3ac4d19b5e173f55a2a99fa33e8d
Parents: c18ee83
Author: jbertram <jb...@apache.org>
Authored: Wed Dec 14 09:30:43 2016 -0600
Committer: jbertram <jb...@apache.org>
Committed: Wed Dec 14 15:12:57 2016 -0600
----------------------------------------------------------------------
.../artemis/tests/util/ActiveMQTestBase.java | 4 +-
.../integration/amqp/AmqpClientTestSupport.java | 9 +++
.../integration/amqp/AmqpSendReceiveTest.java | 73 ++++++++++++++++++--
3 files changed, 78 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3af1e5c7/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 7f01767..e6d68b1 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -418,10 +418,10 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
- ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID)));
+ ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID), "invm"));
if (netty) {
- configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY));
+ configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>()));
}
return configuration;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3af1e5c7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 0d5c874..fde38fe 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -19,8 +19,10 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
import java.util.LinkedList;
+import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
@@ -105,6 +107,13 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
serverConfig.addAddressConfiguration(address);
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false);
+ Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
+ for (TransportConfiguration tc : acceptors) {
+ if (tc.getName().equals("netty")) {
+ tc.getExtraParams().put("anycastPrefix", "anycast://");
+ tc.getExtraParams().put("multicastPrefix", "multicast://");
+ }
+ }
serverManager.start();
server.start();
return server;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3af1e5c7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index b817834..e102c77 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -16,11 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
-import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
-import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
-import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
-import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
-
+import javax.jms.JMSException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -30,7 +26,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -48,7 +47,10 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
/**
* Test basic send and receive scenarios using only AMQP sender and receiver links.
@@ -178,6 +180,65 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
+ public void testAnycastMessageRoutingExclusivity() throws Exception {
+ final String addressA = "addressA";
+ final String queueA = "queueA";
+ final String queueB = "queueB";
+ final String queueC = "queueC";
+
+ ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+ serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+ sendMessages("anycast://" + addressA, 1);
+
+ assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+ assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
+ }
+
+ @Test
+ public void testMulticastMessageRoutingExclusivity() throws Exception {
+ final String addressA = "addressA";
+ final String queueA = "queueA";
+ final String queueB = "queueB";
+ final String queueC = "queueC";
+
+ ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+ serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+ sendMessages("multicast://" + addressA, 1);
+
+ assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
+ assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+ }
+
+ @Test
+ public void testAmbiguousMessageRouting() throws Exception {
+ final String addressA = "addressA";
+ final String queueA = "queueA";
+ final String queueB = "queueB";
+ final String queueC = "queueC";
+ final String queueD = "queueD";
+
+ ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+ serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString());
+
+ sendMessages(addressA, 1);
+
+ assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+ assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
+ }
+
+ @Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
int MSG_COUNT = 4;
sendMessages(getTestName(), MSG_COUNT);