You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/09 19:49:30 UTC
[46/50] [abbrv] activemq-artemis git commit: ARTEMIS-877 Add Consumer
support for AMQP for new addressing schema
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 1e12d4c..0d5c874 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -22,14 +22,21 @@ import java.util.LinkedList;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
import org.junit.After;
import org.junit.Before;
@@ -39,6 +46,10 @@ import org.junit.Before;
*/
public class AmqpClientTestSupport extends ActiveMQTestBase {
+ protected static Symbol SHARED = Symbol.getSymbol("shared");
+ protected static Symbol GLOBAL = Symbol.getSymbol("global");
+
+
private boolean useSSL;
protected JMSServerManager serverManager;
@@ -86,6 +97,12 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
ActiveMQServer server = createServer(true, true);
serverManager = new JMSServerManagerImpl(server);
Configuration serverConfig = server.getConfiguration();
+ CoreAddressConfiguration address = new CoreAddressConfiguration();
+ address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST);
+ CoreQueueConfiguration queueConfig = new CoreQueueConfiguration();
+ queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST);
+ address.getQueueConfigurations().add(queueConfig);
+ serverConfig.addAddressConfiguration(address);
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false);
serverManager.start();
@@ -179,4 +196,19 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
return new AmqpClient(brokerURI, username, password);
}
+
+
+ protected void sendMessages(int numMessages, String address) throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(address);
+ for (int i = 0; i < numMessages; i++) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("message-" + i);
+ sender.send(message);
+ }
+ sender.close();
+ connection.connect();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
index abc422b..e760d77 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
@@ -54,7 +56,8 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
@Override
public void setUp() throws Exception {
super.setUp();
- server.createQueue(new SimpleString(getTopicName()), new SimpleString(getTopicName()), null, true, false);
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST));
+ server.createQueue(new SimpleString(getTopicName()), RoutingType.MULTICAST, new SimpleString(getTopicName()), null, true, false);
}
@Test(timeout = 60000)
@@ -371,6 +374,6 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
}
public String getTopicName() {
- return "topic://myTopic";
+ return "myTopic";
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
index c599f38..4dbe21e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
@@ -111,8 +111,6 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
sender.close();
- Thread.sleep(200);
-
queueView = getProxyToQueue(remoteTarget.getAddress());
assertNull(queueView);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index e42a718..1708720 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -20,7 +20,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -28,7 +27,6 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.junit.Before;
import org.junit.Test;
/**
@@ -36,11 +34,6 @@ import org.junit.Test;
*/
public class AmqpTransactionTest extends AmqpClientTestSupport {
- @Before
- public void createQueue() throws Exception {
- server.createQueue(SimpleString.toSimpleString(getTestName()), SimpleString.toSimpleString(getTestName()), null, true, false);
- }
-
@Test(timeout = 30000)
public void testBeginAndCommitTransaction() throws Exception {
AmqpClient client = createAmqpClient();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
new file mode 100644
index 0000000..db2f1b4
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+
+public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
+
+ SimpleString address = new SimpleString("testAddress");
+ SimpleString queue1 = new SimpleString("queue1");
+ SimpleString queue2 = new SimpleString("queue2");
+
+ @Test(timeout = 60000)
+ public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
+ server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+ server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
+
+ sendMessages(1, address.toString());
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(address.toString());
+ receiver.flow(1);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
+
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testConsumeFromSingleQueueOnAddressSameNameMultipleQueues() throws Exception {
+ server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+ server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
+ server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
+
+ sendMessages(2, address.toString());
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(address.toString());
+ receiver.flow(1);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
+ assertEquals(0, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testConsumeFromSingleQueueOnAddressDifferentName() throws Exception {
+ server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+ server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
+
+ sendMessages(1, address.toString());
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(address.toString());
+ receiver.flow(1);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
+
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testConsumeFromSingleQueueOnAddressDifferentNameMultipleQueues() throws Exception {
+ server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+ server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
+ server.createQueue(address, RoutingType.ANYCAST, queue2, null, true, false);
+
+ sendMessages(1, address.toString());
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(address.toString());
+ receiver.flow(1);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
+ assertEquals(0, ((QueueImpl)server.getPostOffice().getBinding(queue2).getBindable()).getConsumerCount());
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testConsumeFromSingleQualifiedQueueOnAddressSameName() throws Exception {
+ server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+ server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false);
+
+ sendMessages(1, address.toString());
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(address.toString() + "::" + queue1.toString());
+ receiver.flow(1);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount());
+
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testConsumeWhenOnlyMulticast() throws Exception {
+ server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
+
+ sendMessages(1, address.toString());
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ AmqpSession session = connection.createSession();
+ Source jmsSource = createJmsSource(false);
+ jmsSource.setAddress(address.toString());
+ try {
+ session.createReceiver(jmsSource);
+ fail("should throw exception");
+ } catch (Exception e) {
+ //ignore
+ }
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testConsumeWhenNoAddressCreatedNoAutoCreate() throws Exception {
+ AddressSettings settings = new AddressSettings();
+ settings.setAutoCreateAddresses(false);
+ server.getAddressSettingsRepository().addMatch(address.toString(), settings);
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ try {
+ session.createReceiver(address.toString());
+ fail("should throw exception");
+ } catch (Exception e) {
+ //ignore
+ }
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testConsumeWhenNoAddressCreatedAutoCreate() throws Exception {
+ AddressSettings settings = new AddressSettings();
+ settings.setAutoCreateAddresses(true);
+ server.getAddressSettingsRepository().addMatch(address.toString(), settings);
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createReceiver(address.toString());
+ sendMessages(1, address.toString());
+ receiver.flow(1);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testConsumeWhenNoAddressHasBothRoutingTypesButDefaultQueueIsMultiCast() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.ANYCAST);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ server.createAddressInfo(addressInfo);
+ server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ try {
+ session.createReceiver(address.toString());
+ fail("expected exception");
+ } catch (Exception e) {
+ //ignore
+ }
+ connection.close();
+ }
+
+
+ protected Source createJmsSource(boolean topic) {
+
+ Source source = new Source();
+ // Set the capability to indicate the node type being created
+ if (!topic) {
+ source.setCapabilities(QUEUE_CAPABILITY);
+ } else {
+ source.setCapabilities(TOPIC_CAPABILITY);
+ }
+
+ return source;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
new file mode 100644
index 0000000..6a114d7
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+
+public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport {
+
+ SimpleString address = new SimpleString("testAddress");
+ SimpleString queue1 = new SimpleString("queue1");
+ SimpleString queue2 = new SimpleString("queue2");
+
+ @Test(timeout = 60000)
+ public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
+ server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
+ server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
+
+ sendMessages(1, address.toString());
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(address.toString() + "::" + address.toString());
+ receiver.flow(1);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
+
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testConsumeWhenOnlyAnycast() throws Exception {
+ server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+
+ sendMessages(1, address.toString());
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ AmqpSession session = connection.createSession();
+ Source jmsSource = createJmsSource(true);
+ jmsSource.setAddress(address.toString());
+ try {
+ session.createReceiver(jmsSource);
+ fail("should throw exception");
+ } catch (Exception e) {
+ //ignore
+ }
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testConsumeWhenNoAddressHasBothRoutingTypesButDefaultQueueIsAnyCast() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ addressInfo.getRoutingTypes().add(RoutingType.ANYCAST);
+ server.createAddressInfo(addressInfo);
+ server.createQueue(address, RoutingType.MULTICAST, address, null, true, false);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ try {
+ session.createReceiver(address.toString());
+ fail("expected exception");
+ } catch (Exception e) {
+ //ignore
+ }
+ connection.close();
+ }
+
+
+ protected Source createJmsSource(boolean topic) {
+
+ Source source = new Source();
+ // Set the capability to indicate the node type being created
+ if (!topic) {
+ source.setCapabilities(QUEUE_CAPABILITY);
+ } else {
+ source.setCapabilities(TOPIC_CAPABILITY);
+ }
+
+ return source;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
new file mode 100644
index 0000000..377cf86
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport {
+
+ SimpleString address = new SimpleString("testAddress");
+
+ @Test(timeout = 60000)
+ public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(address.toString());
+ sendMessages(1, address.toString());
+ receiver.flow(1);
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount());
+
+ receiver.close();
+ connection.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
new file mode 100644
index 0000000..9b5187f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
+
+public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
+
+ SimpleString address = new SimpleString("testAddress");
+
+ @Test(timeout = 60000)
+ public void test2ConsumersOnSharedVolatileAddress() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ server.createAddressInfo(addressInfo);
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect("myClientId"));
+ AmqpSession session = connection.createSession();
+ Source source = createSharedSource(TerminusDurability.NONE);
+ AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+ AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+ receiver.flow(1);
+ receiver2.flow(1);
+ sendMessages(2, address.toString());
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
+ receiver.close();
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+ receiver2.close();
+ //check its been deleted
+ assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void test2ConsumersOnSharedVolatileAddressBrokerDefined() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ server.createAddressInfo(addressInfo);
+ server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false);
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect("myClientId"));
+ AmqpSession session = connection.createSession();
+ Source source = createSharedSource(TerminusDurability.NONE);
+ AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+ AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|1");
+ receiver.flow(1);
+ receiver2.flow(1);
+ sendMessages(2, address.toString());
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
+ receiver.close();
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+ receiver2.close();
+ //check its **Hasn't** been deleted
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void test2ConsumersOnSharedVolatileAddressNoReceiverClose() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ server.createAddressInfo(addressInfo);
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect("myClientId"));
+ AmqpSession session = connection.createSession();
+ Source source = createSharedSource(TerminusDurability.NONE);
+ AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+ AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+ receiver.flow(1);
+ receiver2.flow(1);
+ sendMessages(2, address.toString());
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+ //check its been deleted
+ connection.close();
+ assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+ }
+
+ @Test(timeout = 60000)
+ public void test2ConsumersOnSharedVolatileAddressGlobal() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ server.createAddressInfo(addressInfo);
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect(false));
+ AmqpSession session = connection.createSession();
+ Source source = createSharedGlobalSource(TerminusDurability.NONE);
+ AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+ AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+ receiver.flow(1);
+ receiver2.flow(1);
+ sendMessages(2, address.toString());
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount());
+ receiver.close();
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
+ receiver2.close();
+ //check its been deleted
+ assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void test2ConsumersOnSharedDurableAddress() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ server.createAddressInfo(addressInfo);
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect("myClientId"));
+ AmqpSession session = connection.createSession();
+ Source source = createSharedSource(TerminusDurability.CONFIGURATION);
+ AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+ AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+ receiver.flow(1);
+ receiver2.flow(1);
+ sendMessages(2, address.toString());
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
+ receiver.close();
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+ receiver2.close();
+ //check its been deleted
+ assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void test2ConsumersOnSharedDurableAddressReconnect() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ server.createAddressInfo(addressInfo);
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect("myClientId"));
+ AmqpSession session = connection.createSession();
+ Source source = createSharedSource(TerminusDurability.CONFIGURATION);
+ AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+ AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+ receiver.flow(1);
+ receiver2.flow(1);
+ sendMessages(2, address.toString());
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
+
+ connection.close();
+
+ connection = addConnection(client.connect("myClientId"));
+ session = connection.createSession();
+
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+ receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+ receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+
+ receiver.close();
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+ receiver2.close();
+ //check its been deleted
+ assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void test2ConsumersOnSharedDurableAddressReconnectwithNull() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ server.createAddressInfo(addressInfo);
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect("myClientId"));
+ AmqpSession session = connection.createSession();
+ Source source = createSharedSource(TerminusDurability.CONFIGURATION);
+ AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+ AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+ receiver.flow(1);
+ receiver2.flow(1);
+ sendMessages(2, address.toString());
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount());
+
+ connection.close();
+
+ connection = addConnection(client.connect("myClientId"));
+ session = connection.createSession();
+
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+ receiver = session.createDurableReceiver(null, "mySub");
+ receiver2 = session.createDurableReceiver(null, "mySub|2");
+
+ receiver.close();
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+ receiver2.close();
+ //check its been deleted
+ assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")));
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void test2ConsumersOnSharedDurableAddressGlobal() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ server.createAddressInfo(addressInfo);
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect(false));
+ AmqpSession session = connection.createSession();
+ Source source = createSharedGlobalSource(TerminusDurability.CONFIGURATION);
+ AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+ AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2");
+ receiver.flow(1);
+ receiver2.flow(1);
+ sendMessages(2, address.toString());
+ AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(amqpMessage);
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount());
+ receiver.close();
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
+ receiver2.close();
+ //check its been deleted
+ assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void test2ConsumersOnNonSharedDurableAddress() throws Exception {
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
+ server.createAddressInfo(addressInfo);
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect("myClientId"));
+ AmqpSession session = connection.createSession();
+ Source source = createNonSharedSource(TerminusDurability.CONFIGURATION);
+ Source source1 = createSharedSource(TerminusDurability.CONFIGURATION);
+ AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
+ try {
+ session.createMulticastReceiver(source1, "myReceiverID", "mySub|2");
+ fail("Exception expected");
+ } catch (Exception e) {
+ //expected
+ }
+ connection.close();
+ }
+
+ private Source createNonSharedSource(TerminusDurability terminusDurability) {
+ Source source = new Source();
+ source.setAddress(address.toString());
+ source.setCapabilities(TOPIC_CAPABILITY);
+ source.setDurable(terminusDurability);
+ return source;
+ }
+
+ private Source createSharedSource(TerminusDurability terminusDurability) {
+ Source source = new Source();
+ source.setAddress(address.toString());
+ source.setCapabilities(TOPIC_CAPABILITY, SHARED);
+ source.setDurable(terminusDurability);
+ return source;
+ }
+
+ private Source createSharedGlobalSource(TerminusDurability terminusDurability) {
+ Source source = new Source();
+ source.setAddress(address.toString());
+ source.setCapabilities(TOPIC_CAPABILITY, SHARED, GLOBAL);
+ source.setDurable(terminusDurability);
+ return source;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
index 39197fd..3965947 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
@@ -30,6 +30,8 @@ import javax.jms.TopicSubscriber;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Assert;
@@ -55,6 +57,8 @@ public class ProtonPubSubTest extends ProtonTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
+ server.createAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST));
+ server.createAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST));
server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
factory = new JmsConnectionFactory("amqp://localhost:5672");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 5c56224..5e9b368 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -70,6 +70,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
@@ -151,20 +153,31 @@ public class ProtonTest extends ProtonTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
-
- server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "2"), new SimpleString(coreAddress + "2"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "3"), new SimpleString(coreAddress + "3"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "4"), new SimpleString(coreAddress + "4"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "5"), new SimpleString(coreAddress + "5"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "6"), new SimpleString(coreAddress + "6"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "7"), new SimpleString(coreAddress + "7"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "8"), new SimpleString(coreAddress + "8"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "9"), new SimpleString(coreAddress + "9"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "10"), new SimpleString(coreAddress + "10"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic"), new SimpleString("amqp_testtopic"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false);
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST));
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST));
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST));
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST));
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST));
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST));
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST));
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST));
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST));
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST));
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST));
+ server.createQueue(new SimpleString(coreAddress), RoutingType.ANYCAST, new SimpleString(coreAddress), null, true, false);
+ server.createQueue(new SimpleString(coreAddress + "1"), RoutingType.ANYCAST, new SimpleString(coreAddress + "1"), null, true, false);
+ server.createQueue(new SimpleString(coreAddress + "2"), RoutingType.ANYCAST, new SimpleString(coreAddress + "2"), null, true, false);
+ server.createQueue(new SimpleString(coreAddress + "3"), RoutingType.ANYCAST, new SimpleString(coreAddress + "3"), null, true, false);
+ server.createQueue(new SimpleString(coreAddress + "4"), RoutingType.ANYCAST, new SimpleString(coreAddress + "4"), null, true, false);
+ server.createQueue(new SimpleString(coreAddress + "5"), RoutingType.ANYCAST, new SimpleString(coreAddress + "5"), null, true, false);
+ server.createQueue(new SimpleString(coreAddress + "6"), RoutingType.ANYCAST, new SimpleString(coreAddress + "6"), null, true, false);
+ server.createQueue(new SimpleString(coreAddress + "7"), RoutingType.ANYCAST, new SimpleString(coreAddress + "7"), null, true, false);
+ server.createQueue(new SimpleString(coreAddress + "8"), RoutingType.ANYCAST, new SimpleString(coreAddress + "8"), null, true, false);
+ server.createQueue(new SimpleString(coreAddress + "9"), RoutingType.ANYCAST, new SimpleString(coreAddress + "9"), null, true, false);
+ server.createQueue(new SimpleString(coreAddress + "10"), RoutingType.ANYCAST, new SimpleString(coreAddress + "10"), null, true, false);
+ server.createAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST));
+ server.createQueue(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST, new SimpleString("amqp_testtopic"), null, true, false);
+ /* server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "3"), new SimpleString("amqp_testtopic" + "3"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "4"), new SimpleString("amqp_testtopic" + "4"), null, true, false);
@@ -173,7 +186,7 @@ public class ProtonTest extends ProtonTestBase {
server.createQueue(new SimpleString("amqp_testtopic" + "7"), new SimpleString("amqp_testtopic" + "7"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false);
server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);
+ server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);*/
connection = createConnection();
@@ -769,6 +782,12 @@ public class ProtonTest extends ProtonTestBase {
@Test
public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
+ AddressSettings value = new AddressSettings();
+ value.setAutoCreateJmsQueues(false);
+ value.setAutoCreateQueues(false);
+ value.setAutoCreateAddresses(false);
+ value.setAutoCreateJmsTopics(false);
+ server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", value);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
AmqpSession session = amqpConnection.createSession();
@@ -784,6 +803,7 @@ public class ProtonTest extends ProtonTestBase {
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains("amqp:not-found"));
assertTrue(expectedException.getMessage().contains("target address does not exist"));
+ amqpConnection.close();
}
@Test
@@ -838,6 +858,7 @@ public class ProtonTest extends ProtonTestBase {
@Test
public void testClientIdIsSetInSubscriptionList() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
AmqpConnection amqpConnection = client.createConnection();
amqpConnection.setContainerId("testClient");
amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic")));
@@ -866,14 +887,14 @@ public class ProtonTest extends ProtonTestBase {
String queueName = "TestQueueName";
String address = "TestAddress";
-
- server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true, false);
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST));
+ server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address);
- AmqpReceiver receiver = session.createReceiver(queueName);
+ AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(1);
AmqpMessage message = new AmqpMessage();
@@ -882,6 +903,7 @@ public class ProtonTest extends ProtonTestBase {
AmqpMessage receivedMessage = receiver.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(receivedMessage);
+ amqpConnection.close();
}
@Test
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
index f19b0a4..f424ea2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
@@ -25,7 +25,10 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Random;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
@@ -42,6 +45,7 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
server = createServer(true, true);
+ server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("exampleQueue"), RoutingType.ANYCAST));
server.start();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index b75e019..829410d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
@@ -185,12 +186,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
- checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
- serverControl.createQueue(address.toString(), name.toString());
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ serverControl.createAddress(address.toString(), "ANYCAST");
+ serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false);
- checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
- QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+ checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
@@ -211,12 +212,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
- checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
- serverControl.createQueue(address.toString(), name.toString(), filter, durable);
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ serverControl.createAddress(address.toString(), "ANYCAST");
+ serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false);
- checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
- QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+ checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertEquals(filter, queueControl.getFilter());
@@ -236,12 +237,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
- checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
- serverControl.createQueue(address.toString(), name.toString(), durable);
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ serverControl.createAddress(address.toString(), "ANYCAST");
+ serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
- checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
- QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+ checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
@@ -264,12 +265,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
- checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
- QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
@@ -297,8 +298,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
- serverControl.createQueue(address.toString(), name.toString(), durable);
+ serverControl.createAddress(address.toString(), "ANYCAST");
+ serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
ServerLocator receiveLocator = createInVMNonHALocator();
ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
@@ -307,7 +308,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertFalse(consumer.isClosed());
- checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+ checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.destroyQueue(name.toString(), true);
Wait.waitFor(new Wait.Condition() {
@Override
@@ -329,12 +330,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
- checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
-
- serverControl.createQueue(address.toString(), name.toString(), filter, durable);
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ serverControl.createAddress(address.toString(), "ANYCAST");
+ serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false);
- checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
- QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+ checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
@@ -355,12 +356,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = createManagementControl();
- checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ serverControl.createAddress(address.toString(), "ANYCAST");
+ serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false);
- serverControl.createQueue(address.toString(), name.toString(), filter, durable);
-
- checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
- QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+ checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
Assert.assertEquals(address.toString(), queueControl.getAddress());
Assert.assertEquals(name.toString(), queueControl.getName());
Assert.assertNull(queueControl.getFilter());
@@ -383,8 +384,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
// management operations
Assert.assertFalse(ActiveMQServerControlTest.contains(name.toString(), serverControl.getQueueNames()));
-
- serverControl.createQueue(address.toString(), name.toString());
+ serverControl.createAddress(address.toString(), "ANYCAST");
+ serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false);
Assert.assertTrue(ActiveMQServerControlTest.contains(name.toString(), serverControl.getQueueNames()));
serverControl.destroyQueue(name.toString());
@@ -402,8 +403,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
// management operations
Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
-
- serverControl.createQueue(address.toString(), name.toString());
+ serverControl.createAddress(address.toString(), "ANYCAST");
+ serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false);
Assert.assertTrue(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames()));
serverControl.destroyQueue(name.toString());
@@ -1212,7 +1213,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory factory = createSessionFactory(locator);
ClientSession session = addClientSession(factory.createSession());
- server.createQueue(queueName, queueName, null, false, false);
+ server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
+ server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
addClientConsumer(session.createConsumer(queueName));
Thread.sleep(100); // We check the timestamp for the creation time. We need to make sure it's different
addClientConsumer(session.createConsumer(queueName, SimpleString.toSimpleString(filter), true));
@@ -1269,8 +1271,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ServerLocator locator2 = createInVMNonHALocator();
ClientSessionFactory factory2 = createSessionFactory(locator2);
ClientSession session2 = addClientSession(factory2.createSession());
-
- server.createQueue(queueName, queueName, null, false, false);
+ serverControl.createAddress(queueName.toString(), "ANYCAST");
+ server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
addClientConsumer(session.createConsumer(queueName));
Thread.sleep(200);
@@ -1335,7 +1337,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
@Test
public void testListSessionsAsJSON() throws Exception {
SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
- server.createQueue(queueName, queueName, null, false, false);
+ server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
+ server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false);
ActiveMQServerControl serverControl = createManagementControl();
ServerLocator locator = createInVMNonHALocator();
@@ -1400,8 +1403,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params));
server2.start();
- server.createQueue(address, address, null, true, false);
- server2.createQueue(address, address, null, true, false);
+ server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+ server.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false);
+ server2.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
+ server2.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false);
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 280fdc4..2831f79 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -127,6 +127,21 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
+ public void createQueue(String address, String name, String routingType) throws Exception {
+ proxy.invokeOperation("createQueue", address, name, routingType);
+ }
+
+ @Override
+ public void createQueue(String address, String name, boolean durable, String routingType) throws Exception {
+ proxy.invokeOperation("createQueue", address, name, durable, routingType);
+ }
+
+ @Override
+ public void createQueue(String address,String name, String filter, boolean durable, String routingType) throws Exception {
+ proxy.invokeOperation("createQueue", address, name, filter, durable, routingType);
+ }
+
+ @Override
public void createQueue(final String address, final String name, final boolean durable) throws Exception {
proxy.invokeOperation("createQueue", address, name, durable);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
index 6bc8f3d..11785e4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.server.RoutingType;
public class ManagementControlHelper {
@@ -73,6 +74,13 @@ public class ManagementControlHelper {
return (QueueControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, ActiveMQDefaultConfiguration.getDefaultRoutingType()), QueueControl.class, mbeanServer);
}
+ public static QueueControl createQueueControl(final SimpleString address,
+ final SimpleString name,
+ final RoutingType routingType,
+ final MBeanServer mbeanServer) throws Exception {
+ return (QueueControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, routingType), QueueControl.class, mbeanServer);
+ }
+
public static AddressControl createAddressControl(final SimpleString address,
final MBeanServer mbeanServer) throws Exception {
return (AddressControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getAddressObjectName(address), AddressControl.class, mbeanServer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
index e6026c4..63cf579 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
@@ -33,7 +33,6 @@ import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.After;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index f2c844e..d272c02 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
@@ -51,6 +52,17 @@ public class FakePostOffice implements PostOffice {
}
@Override
+ public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) {
+
+ return null;
+ }
+
+ @Override
+ public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) {
+ return null;
+ }
+
+ @Override
public void start() throws Exception {
}