You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 10:21:59 UTC
[30/34] activemq-artemis git commit: Added ANYCAST routing to local
queues
Added ANYCAST routing to local queues
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4de48304
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4de48304
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4de48304
Branch: refs/heads/ARTEMIS-780
Commit: 4de48304634ac031ee3731fbfdbbc966be31337d
Parents: 853c5a4
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Oct 24 14:27:00 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Nov 1 10:20:52 2016 +0000
----------------------------------------------------------------------
.../artemis/core/postoffice/AddressManager.java | 2 +
.../artemis/core/postoffice/PostOffice.java | 2 +
.../core/postoffice/impl/BindingsImpl.java | 1 +
.../core/postoffice/impl/LocalQueueBinding.java | 9 +-
.../core/postoffice/impl/PostOfficeImpl.java | 5 +
.../postoffice/impl/SimpleAddressManager.java | 15 ++
.../artemis/core/server/ActiveMQServer.java | 2 +-
.../core/server/impl/ActiveMQServerImpl.java | 12 +-
.../artemis/core/server/impl/AddressInfo.java | 12 +-
.../server/impl/PostOfficeJournalLoader.java | 3 +-
.../core/server/impl/QueueFactoryImpl.java | 8 +
.../core/config/impl/FileConfigurationTest.java | 4 +-
.../integration/addressing/AddressingTest.java | 240 ++++++++++++++++++-
.../integration/client/HangConsumerTest.java | 2 +-
.../jms/client/TopicCleanupTest.java | 2 +-
.../core/server/impl/fakes/FakePostOffice.java | 5 +
16 files changed, 300 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
index 5519822..1cf1a07 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java
@@ -54,6 +54,8 @@ public interface AddressManager {
AddressInfo addAddressInfo(AddressInfo addressInfo);
+ AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
+
AddressInfo removeAddressInfo(SimpleString address);
AddressInfo getAddressInfo(SimpleString address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index f719966..7902352 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -45,6 +45,8 @@ public interface PostOffice extends ActiveMQComponent {
AddressInfo addAddressInfo(AddressInfo addressInfo);
+ AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
+
AddressInfo removeAddressInfo(SimpleString address);
AddressInfo getAddressInfo(SimpleString address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index e5df737..6be0311 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -262,6 +262,7 @@ public final class BindingsImpl implements Bindings {
boolean routed = false;
for (Binding binding : exclusiveBindings) {
+
if (binding.getFilter() == null || binding.getFilter().match(message)) {
binding.getBindable().route(message, context);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 2a6d9c5..2921388 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -24,10 +24,11 @@ import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
public class LocalQueueBinding implements QueueBinding {
- private final SimpleString address;
+ private final AddressInfo address;
private final Queue queue;
@@ -37,7 +38,7 @@ public class LocalQueueBinding implements QueueBinding {
private final SimpleString clusterName;
- public LocalQueueBinding(final SimpleString address, final Queue queue, final SimpleString nodeID) {
+ public LocalQueueBinding(final AddressInfo address, final Queue queue, final SimpleString nodeID) {
this.address = address;
this.queue = queue;
@@ -61,7 +62,7 @@ public class LocalQueueBinding implements QueueBinding {
@Override
public SimpleString getAddress() {
- return address;
+ return address.getName();
}
@Override
@@ -76,7 +77,7 @@ public class LocalQueueBinding implements QueueBinding {
@Override
public SimpleString getRoutingName() {
- return name;
+ return (address.getRoutingType() == AddressInfo.RoutingType.MULTICAST) ? name : address.getName();
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 9b7ed0c..6c654bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -425,6 +425,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
+ public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
+ return addressManager.addOrUpdateAddressInfo(addressInfo);
+ }
+
+ @Override
public AddressInfo removeAddressInfo(SimpleString address) {
return addressManager.removeAddressInfo(address);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index 2994f9e..969a1a9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -188,6 +188,21 @@ public class SimpleAddressManager implements AddressManager {
}
@Override
+ public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
+ AddressInfo from = addAddressInfo(addressInfo);
+ return (from == null) ? addressInfo : updateAddressInfo(from, addressInfo);
+ }
+
+ private AddressInfo updateAddressInfo(AddressInfo from, AddressInfo to) {
+ synchronized (from) {
+ from.setRoutingType(to.getRoutingType());
+ from.setDefaultMaxConsumers(to.getDefaultMaxConsumers());
+ from.setDefaultDeleteOnNoConsumers(to.isDefaultDeleteOnNoConsumers());
+ return from;
+ }
+ }
+
+ @Override
public AddressInfo removeAddressInfo(SimpleString address) {
return addressInfoMap.remove(address);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index fb5aee4..2a3a0b4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -413,7 +413,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
void removeClientConnection(String clientId);
- AddressInfo addAddressInfo(AddressInfo addressInfo);
+ AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo removeAddressInfo(SimpleString address);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 208a317..2e09083 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2089,7 +2089,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
info.setDefaultDeleteOnNoConsumers(config.getDefaultDeleteOnNoConsumers());
info.setDefaultMaxConsumers(config.getDefaultMaxConsumers());
- addAddressInfo(info);
+ createOrUpdateAddressInfo(info);
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
}
}
@@ -2193,8 +2193,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
- public AddressInfo addAddressInfo(AddressInfo addressInfo) {
- return postOffice.addAddressInfo(addressInfo);
+ public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) {
+ return postOffice.addOrUpdateAddressInfo(addressInfo);
}
@Override
@@ -2204,7 +2204,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public AddressInfo getAddressInfo(SimpleString address) {
- return postOffice.removeAddressInfo(address);
+ return postOffice.getAddressInfo(address);
}
private Queue createQueue(final SimpleString addressName,
@@ -2240,15 +2240,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build();
final Queue queue = queueFactory.createQueueWith(queueConfig);
- addAddressInfo(new AddressInfo(queue.getAddress()));
-
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else if (queue.isAutoCreated()) {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName()));
}
- final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
+ final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
if (queue.isDurable()) {
storageManager.addQueueBinding(txID, localQueueBinding);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 4c6ec1f..1449107 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -22,7 +22,7 @@ public class AddressInfo {
private final SimpleString name;
- private RoutingType routingType = RoutingType.Multicast;
+ private RoutingType routingType = RoutingType.MULTICAST;
private boolean defaultDeleteOnNoConsumers;
@@ -61,13 +61,13 @@ public class AddressInfo {
}
public enum RoutingType {
- Multicast, Anycast;
+ MULTICAST, ANYCAST;
public byte getType() {
switch (this) {
- case Multicast:
+ case MULTICAST:
return 0;
- case Anycast:
+ case ANYCAST:
return 1;
default:
return -1;
@@ -77,9 +77,9 @@ public class AddressInfo {
public static RoutingType getType(byte type) {
switch (type) {
case 0:
- return Multicast;
+ return MULTICAST;
case 1:
- return Anycast;
+ return ANYCAST;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 9a8ae74..71c5b2b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -155,7 +155,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
}
}
- final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
+ final Binding binding = new LocalQueueBinding(postOffice.getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
+
queues.put(queue.getID(), queue);
postOffice.addBinding(binding);
managementService.registerAddress(queue.getAddress());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 5686c7b..3678553 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -68,6 +68,10 @@ public class QueueFactoryImpl implements QueueFactory {
@Override
public Queue createQueueWith(final QueueConfig config) {
+
+ // Add default address info if one doesn't exist
+ postOffice.addAddressInfo(new AddressInfo(config.address()));
+
final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
final Queue queue;
if (addressSettings.isLastValueQueue()) {
@@ -89,6 +93,10 @@ public class QueueFactoryImpl implements QueueFactory {
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
+
+ // Add default address info if one doesn't exist
+ postOffice.addAddressInfo(new AddressInfo(address));
+
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
Queue queue;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index f7a0175..46f3958 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -367,7 +367,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
// Addr 1
CoreAddressConfiguration addressConfiguration = conf.getAddressConfigurations().get(0);
assertEquals("addr1", addressConfiguration.getName());
- assertEquals(AddressInfo.RoutingType.Anycast, addressConfiguration.getRoutingType());
+ assertEquals(AddressInfo.RoutingType.ANYCAST, addressConfiguration.getRoutingType());
assertEquals(2, addressConfiguration.getQueueConfigurations().size());
// Addr 1 Queue 1
@@ -393,7 +393,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
// Addr 2
addressConfiguration = conf.getAddressConfigurations().get(1);
assertEquals("addr2", addressConfiguration.getName());
- assertEquals(AddressInfo.RoutingType.Multicast, addressConfiguration.getRoutingType());
+ assertEquals(AddressInfo.RoutingType.MULTICAST, addressConfiguration.getRoutingType());
assertEquals(2, addressConfiguration.getQueueConfigurations().size());
// Addr 2 Queue 1
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
index 43d6071..2e0fda4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
@@ -16,6 +16,244 @@
*/
package org.apache.activemq.artemis.tests.integration.addressing;
-public class AddressingTest {
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AddressingTest extends ActiveMQTestBase {
+
+ private ActiveMQServer server;
+
+ private ClientSessionFactory sessionFactory;
+
+ @Before
+ public void setup() throws Exception {
+ server = createServer(true);
+ server.start();
+
+ server.waitForActivation(10, TimeUnit.SECONDS);
+
+ ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ sessionFactory = sl.createSessionFactory();
+
+ addSessionFactory(sessionFactory);
+ }
+
+ @Test
+ public void testMulticastRouting() throws Exception {
+
+ SimpleString sendAddress = new SimpleString("test.address");
+
+ List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*");
+
+ for (String consumeAddress : testAddresses) {
+
+ // For each address, create 2 Queues with the same address, assert both queues receive message
+
+ AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress));
+ addressInfo.setRoutingType(AddressInfo.RoutingType.MULTICAST);
+
+ server.createOrUpdateAddressInfo(addressInfo);
+ Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false);
+ Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false);
+
+ ClientSession session = sessionFactory.createSession();
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(q1.getName());
+ ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+ ClientProducer producer = session.createProducer(sendAddress);
+ ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
+ m.getBodyBuffer().writeString("TestMessage");
+
+ producer.send(m);
+
+ assertNotNull(consumer1.receive(2000));
+ assertNotNull(consumer2.receive(2000));
+
+ q1.deleteQueue();
+ q2.deleteQueue();
+
+ System.out.println(consumeAddress);
+ }
+ }
+
+ @Test
+ public void testAnycastRouting() throws Exception {
+
+ SimpleString sendAddress = new SimpleString("test.address");
+
+ List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*");
+
+ for (String consumeAddress : testAddresses) {
+
+ // For each address, create 2 Queues with the same address, assert one queue receive message
+
+ AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress));
+ addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST);
+
+ server.createOrUpdateAddressInfo(addressInfo);
+ Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false);
+ Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false);
+
+ ClientSession session = sessionFactory.createSession();
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(q1.getName());
+ ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+ ClientProducer producer = session.createProducer(sendAddress);
+ ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
+
+ m.getBodyBuffer().writeString("TestMessage");
+
+ producer.send(m);
+
+ int count = 0;
+ count = (consumer1.receive(1000) == null) ? count : count + 1;
+ count = (consumer2.receive(1000) == null) ? count : count + 1;
+ assertEquals(1, count);
+
+ q1.deleteQueue();
+ q2.deleteQueue();
+
+ System.out.println(consumeAddress);
+ }
+ }
+
+ @Test
+ public void testAnycastRoutingRoundRobin() throws Exception {
+
+ SimpleString address = new SimpleString("test.address");
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST);
+
+ server.createOrUpdateAddressInfo(addressInfo);
+ Queue q1 = server.createQueue(address, address.concat(".1"), null, true, false);
+ Queue q2 = server.createQueue(address, address.concat(".2"), null, true, false);
+ Queue q3 = server.createQueue(address, address.concat(".3"), null, true, false);
+
+ ClientSession session = sessionFactory.createSession();
+ session.start();
+
+ ClientProducer producer = session.createProducer(address);
+
+ ClientConsumer consumer1 = session.createConsumer(q1.getName());
+ ClientConsumer consumer2 = session.createConsumer(q2.getName());
+ ClientConsumer consumer3 = session.createConsumer(q3.getName());
+ List<ClientConsumer> consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[] {consumer1, consumer2, consumer3}));
+
+ List<String> messages = new ArrayList<>();
+ messages.add("Message1");
+ messages.add("Message2");
+ messages.add("Message3");
+
+ ClientMessage clientMessage;
+ for (String message : messages) {
+ clientMessage = session.createMessage(true);
+ clientMessage.getBodyBuffer().writeString(message);
+ producer.send(clientMessage);
+ }
+
+ String m;
+ for (ClientConsumer consumer : consumers) {
+ clientMessage = consumer.receive(1000);
+ m = clientMessage.getBodyBuffer().readString();
+ messages.remove(m);
+ }
+
+ assertTrue(messages.isEmpty());
+
+ // Check we don't receive more messages
+ int count = 0;
+ for (ClientConsumer consumer : consumers) {
+ count = (consumer.receive(1000) == null) ? count : count + 1;
+ }
+ assertEquals(0, count);
+ }
+
+
+
+ @Test
+ public void testMulticastRoutingBackwardsCompat() throws Exception {
+
+ SimpleString sendAddress = new SimpleString("test.address");
+
+ List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*");
+
+ for (String consumeAddress : testAddresses) {
+
+ // For each address, create 2 Queues with the same address, assert both queues receive message
+ Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false);
+ Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false);
+
+ ClientSession session = sessionFactory.createSession();
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(q1.getName());
+ ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+ ClientProducer producer = session.createProducer(sendAddress);
+ ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
+ m.getBodyBuffer().writeString("TestMessage");
+
+ producer.send(m);
+
+ assertNotNull(consumer1.receive(2000));
+ assertNotNull(consumer2.receive(2000));
+
+ q1.deleteQueue();
+ q2.deleteQueue();
+
+ System.out.println(consumeAddress);
+ }
+ }
+
+ @Test
+ public void testDeleteQueueOnNoConsumersTrue() {
+ fail("Not Implemented");
+ }
+
+ @Test
+ public void testDeleteQueueOnNoConsumersFalse() {
+ fail("Not Implemented");
+ }
+
+ @Test
+ public void testLimitOnMaxConsumers() {
+ fail("Not Implemented");
+ }
+
+ @Test
+ public void testUnlimitedMaxConsumers() {
+ fail("Not Implemented");
+ }
+
+ @Test
+ public void testDefaultMaxConsumersFromAddress() {
+ fail("Not Implemented");
+ }
+
+ @Test
+ public void testDefaultDeleteOnNoConsumersFromAddress() {
+ fail("Not Implemented");
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 83d28a1..2fd5915 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -353,7 +353,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
long txID = server.getStorageManager().generateID();
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
- LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
+ LocalQueueBinding newBinding = new LocalQueueBinding(server.getAddressInfo(QUEUE), new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
server.getStorageManager().addQueueBinding(txID, newBinding);
server.getStorageManager().commitBindings(txID);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
index 280596a..ec279ee 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
@@ -83,7 +83,7 @@ public class TopicCleanupTest extends JMSTestBase {
final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
- LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());
+ LocalQueueBinding binding = new LocalQueueBinding(server.getAddressInfo(queue.getAddress()), queue, server.getNodeID());
storage.addQueueBinding(txid, binding);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4de48304/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 9424fc3..512f0f2 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -65,6 +65,11 @@ public class FakePostOffice implements PostOffice {
return null;
}
+ @Override
+ public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
+ return null;
+ }
+
@Override
public AddressInfo removeAddressInfo(SimpleString address) {